Skipping tasks that aggregate forecasts

Hello,

I wrote a suite for WRF forecast cycling, based on GFS data. The download of GFS data takes a long time, so every time I branch off a forecast (from the update runs) I split the forecast run into 15 chunks of a day each.
I download the forecast data for the first dat of the forecast (f000…f024), then run the forecast run for that 24hr period (I call that fcst00). In the mean time the input data for the next 24 hour of the forecast is being downloaded (f025…f048)and when that’s finished AND fcst00 is done I can run the next forecast day (fcst01) and so forth, up to fcst15.

The goal is to interleave the downloading of the GFS forecast with running the nested WRF forecast by breaking everything into daily chunks, and thus leapfrogging the downloading tasks and forecast run tasks.

Each new 24-hour forecast run produces a netcdf file containing a single day’s worth of data, but the final product is a netcdf file containing the entire timeline of the 15 day forecast, so I need to concatenate the netcdf files (Using ncrcat from NCO-tools).

When day 01 of the forecast is finished I concatenate fcst00 and fcst01 files, when day 2 of the forecast is finished I concatenate fcst00, fcst01 and fcst02, and so forth. When the run for the last forecast day is finished I concatenate all the fcst00, fcst01, fcst02, fcst03 … fcst15 files into the finished product.

Now my question: When the suite is ready to concatenate fcst00 … fcst15 (the final and last concatenation task for that forecast), but for some reason the earlier concatenation tasks haven’t run yet, I don’t need to run them. There’s no need to concatenate fcst00…fcst14 to produce a file that would be immediately overwritten by the longer output of concatenating fcst00…fcst15

What I called concatenation tasks above is called an archive task in the suite, it takes data from the cylc-run share folder and archives them to the shared storage.

The definition for the task is:

    {% for DAY in range(0,FORECAST_DAYS|int) %}
    [[fcst{{ '%02d' % DAY }}_wrf_archive]]
        inherit = FCST_WRF_ARCHIVE, FORECAST{{ '%02d' % DAY }}_RUN
        [[[environment]]]
            # for each domain, concatenate the forecast file up to that forecast-day
            {% for DOM in range(1,NDOMS+1) %}
            SOURCE_FILES_d{{ '%02d' % DOM }} = """
                ${ROSE_DATAC}/fcst00_postproc_files/$(rose date -c --offset={{CYCLING_INTVAL}} -f"%Y%m%dT%H%MZ")_wrfout_hourly_d{{ '%02d' % DOM }}_fd00.nc
                {% for D in range(1,DAY+1) %}
                ${ROSE_DATAC}/fcst{{ '%02d' % D }}_postproc_files/$(rose date -c --offset=PT{{D * 24}}H -f"%Y%m%dT%H%MZ")_wrfout_hourly_d{{ '%02d' % DOM }}_fd{{ '%02d' % D }}.nc
                {% endfor %}
            """
            TARGET_FILE_d{{ '%02d' % DOM }}=${EXT_ARCHIVE_DIR}/${ROSE_TASK_CYCLE_TIME}_wrfout_hourly_fcst_d{{ '%02d' % DOM }}.nc
            {% endfor %}
    {% endfor %}

So you can see how every new fcstXX_wrf_archive includes 1 more file in its SOURCE_FILES, thus producing an ever-growing timeline in the TARGET_FILE.

The graph for this is currently:

                {% for DAY in range(0,FORECAST_DAYS|int) %}
                fcst{{ '%02d' % DAY }}_wrf_run => fcst{{ '%02d' % DAY }}_wrf_postproc => fcst{{ '%02d' % DAY }}_wrf_archive  => housekeep
                {% endfor %}

                {% for DAY in range(1,FORECAST_DAYS|int) %}
                fcst{{ '%02d' % (DAY-1) }}_wrf_archive => fcst{{ '%02d' % DAY }}_wrf_archive
                {% endfor %}

The first part simply describes the sequence of run->post_process->archive, the second part says that each the fcstXX_wrf_archive task of a preceeding forecast day is the precondition for the fcstYY_wrf_archive task of the current forecast day. This was supposed to achieve that the tasks run in numerical order fcst00, fcst01… fcst15. And also I have to ensure that several archive tasks don’t run at the same time.

But I’d like a higher ranking fcstYY_wrf_archive task (when it succeeds) to suicide a lower-ranking wrf_archive task.

I tried to relace the 2nd chunk of the graph definition with this:

                {% for DAY in range(1,FORECAST_DAYS|int) %}
                fcst{{ '%02d' % DAY }}_wrf_archive:succeed => ! fcst{{ '%02d' % (DAY-1) }}_wrf_archive
                {% endfor %}

But this doesn’t work. The suicide trigger doesn’t ‘catch’.

What’s a better way of doing this? Thank you!

P.S. A similar problem arises when a forecasting suite is running behing real time (e.g. in catch-up mode). When the inputs for the forecast on DAY X are available, I no longer need to run the forecast for day X-1 (or X-2), as their output will be superseded by the forecast on day X which should better than what yesterdays data would produce. It’s sort of like what I described above - a task suiciding some previous tasks upon successful completion. Any ideas?

Suicide triggers will remove tasks from the graph, however, if they are already running they will be left running which would likely result in the next archive task running => multiple archive tasks running in parallel fighting over the same file.

So I think you will need to either, not suicide if the previous archive task is running, or, kill the previous archive task.

I think this is a problem that will have to be solved in the [runtime] rather than the [scheduling] section as we can do more advanced control and introspection from within a task than in the graph. Here’s an approach that uses a task to kill the previous archive task (if running), then resets its status to “expired”:

fcst<day> => archive<day>
# run the archive tasks in series
archive<day> => archive<day + 1>
# if the next archive task is able to run cancel the previous
fcst<day> => expire<day - 1>
[runtime]
    [[expire<day>]]
        script = """
            cylc kill \
                "$CYLC_WORKFLOW_ID" \
                "$CYLC_TASK_CYCLE_POINT/$TASK"
            cylc reset \
                "$CYLC_WORKFLOW_ID" \
                "$CYLC_TASK_CYCLE_POINT/$TASK" \
                -s "expired"
        """
        [[[environment]]]
            TASK = archive_day%(day)

There are three main commands you can use to mutate the workflow execution:

  • cylc remove - roughly equivalent to a manual suicide trigger
  • cylc kill - allows you to stop running tasks (no problem if the task isn’t running)
  • cylc reset - allows you to change the state of a task (e.g. to expired)

For workflow introspection there is the cylc suite-state command which allows you to query (or poll) the status of other tasks in the workflow (or in other workflows).

With different combinations of these four commands it should be possible to code the desired behaviour.

Note expired tasks remain visible in Rose Bush which makes it a little easier to tell what’s going on.

@fredw - note:

  • Suicide triggers in Cylc 7 are intended to get “waiting tasks” in the scheduler to remove themselves from play, if they cannot run because an alternate path was taken in the graph (if not removed, they would eventually stall the scheduler as it waits in vain for their prerequisites to be satisfied)
  • Suicide triggers are not needed in Cylc 8 :tada:, which only spawns tasks on demand

Thank you very much for pointing out the cylc command line operations for manually modifying the graph (and the idea to expire tasks instead of removing them).

That is very useful and I used your example as an inspiration for achieve what I needed:

My normal flow is wrf_run => wrf_postproc => wrf_archive, and each of those flows exists for each fcstXX day. Some of the archive operations can be expired if a more recent fcstXX_postproc is already available

So I introduced a new fcstXX_expire_prev_archive set of tasks and wrote this graph:

                {% for DAY in range(0,FORECAST_DAYS|int) %}
                fcst{{ '%02d' % DAY }}_wrf_run => fcst{{ '%02d' % DAY }}_wrf_postproc => fcst{{ '%02d' % DAY }}_wrf_archive & fcst{{ '%02d' % DAY }}_wrfout_archive
                fcst{{ '%02d' % DAY }}_wrf_archive | fcst{{ '%02d' % DAY }}_wrf_archive:expired => housekeep

                {% for DAY in range(1,FORECAST_DAYS|int) %}
                fcst{{ '%02d' % DAY }}_wrf_postproc => fcst{{ '%02d' % DAY }}_expire_prev_archive
                fcst{{ '%02d' % (DAY-1) }}_wrf_archive | fcst{{ '%02d' % (DAY-1) }}_wrf_archive:expired => fcst{{ '%02d' % DAY }}_wrf_archive
                fcst{{ '%02d' % (DAY-1) }}_wrf_run => fcst{{ '%02d' % DAY }}_wrf_run
                {% endfor %}

the [runtime] for the task fcstXX_expire_prev_archive is:

    {% for DAY in range(1,FORECAST_DAYS|int) %}
    [[fcst{{ '%02d' % DAY }}_expire_prev_archive]]
        script = """
            {% for D in range(1,DAY+1) %}
            cylc kill "$CYLC_SUITE_NAME" "$CYLC_TASK_CYCLE_POINT/fcst{{ '%02d' % (D-1) }}_wrf_archive"
            cylc reset "$CYLC_SUITE_NAME" "$CYLC_TASK_CYCLE_POINT/fcst{{ '%02d' % (D-1) }}_wrf_archive" -s "expired"
            {% endfor %}
        """
    {% endfor %}

so when e.g. fcst03_expire_prev_archive runs (because fcst03_postproc is finished) it will kill and expire all archive tasks from fcst00_wrf_archivefcst02_wrf_archive.

expanded in suite.rc.processed this looks like this:

    [[fcst03_expire_prev_archive]]
        script = """
            cylc kill "$CYLC_SUITE_NAME" "$CYLC_TASK_CYCLE_POINT/fcst00_wrf_archive"
            cylc reset "$CYLC_SUITE_NAME" "$CYLC_TASK_CYCLE_POINT/fcst00_wrf_archive" -s "expired"
            cylc kill "$CYLC_SUITE_NAME" "$CYLC_TASK_CYCLE_POINT/fcst01_wrf_archive"
            cylc reset "$CYLC_SUITE_NAME" "$CYLC_TASK_CYCLE_POINT/fcst01_wrf_archive" -s "expired"
            cylc kill "$CYLC_SUITE_NAME" "$CYLC_TASK_CYCLE_POINT/fcst02_wrf_archive"
            cylc reset "$CYLC_SUITE_NAME" "$CYLC_TASK_CYCLE_POINT/fcst02_wrf_archive" -s "expired"
        """

And the control flow was altered such that an expired fcstXX_wrf_archive task doesn’t hold up the housekeep at the end of each cycle. A slight modification to the above was needed to make sure that the last fcst15_wrf_archive cannot be expired to satisfy housekeep. This last archive task is the one that should never be killed/expired by a fcstXX_expire_prev_archive task.

Thanks a lot for your help! It’s very much appreciated :slight_smile:
Fred

Hilary - thanks for the explanation of suiciding, I need to go over my other suites and check that I’m doing this right. I suspect not.

I kind of dread porting to Cylc 8, it seems like it’ll be a lot of work

No, porting is pretty easy, honest! And we’re here to help.

Cylc 8 gets rid of a bunch of features and behaviours that are difficult to understand in Cylc 7 and earlier.

2 Likes