How to rewind workflow (not to a checkpoint)

Hi,

Say you have a model workflow and a separate workflow to process the models outputs. The model hours are chunked together or could be hourly to process in parallel.

e.g.

[[HHH000]]
[[HHH012]]
[[HHH024]]
[[HHH036]]
[[HHH048]]
[[HHH060]]
...
[[HHH240]]

Forecast hours have been processed to HHH060 inclusive, and the model has output say 63 hours of data. The model then crashes and rewinds itself changing things like halo size, timestep, etc for recovery to get past the instability. It has rewound to forecast hour 36 and will resume from there.

Because the model has been rewound and has changed its calculations, we need to rewind the postprocessing suite too otherwise there will most likely be a discontinuity at forecast hour 61 onwards (significant jumps in parameters, negative accumulations, etc). We do this in Cylc7 by code like below (there is a bit more, but you can get the idea).

< hold the suite >
relevant_blocks=()
for f in $HHH_BLOCK_HOURS; do
    if ((10#$f >= RESET_FROM_HOUR)); then
        relevant_blocks+=("$f")
    fi
done
min_fhr=$(echo "${relevant_blocks[*]}" | xargs -n1 | sort -un | head -1)

# Kill the tasks/families
for f in "${relevant_blocks[@]}"; do
    # Greater than or equal to out reset hour, so kill all tasks in the family
    # It may try to kill HHH blocks that do not exist, but it is only a small
    # overhead involved and does not lead to failures
    cylc kill \
        "$CYLC_SUITE_NAME" \
        "$CYLC_TASK_CYCLE_POINT/HHH_${f}_$member"
done

# Reset the state of the families
for f in $FHRS; do
    cylc reset \
        --state=waiting \
        "$CYLC_SUITE_NAME" \
        "$CYLC_TASK_CYCLE_POINT/HHH_${f}_$member"
done
< release the suite >

From what I can see from quick checks, the tasks disappear from the task pool quite rapidly, so the above approach won’t work (just reseting families to waiting). As I’m still learning all the tools and approaches in Cylc8, is there a good way to do similar for Cylc8?

Thanks.

1 Like

To rewind/rerun all or part of a workflow at Cylc 8, use concurrent flows.

https://cylc.github.io/cylc-doc/stable/html/user-guide/running-workflows/reflow.html#concurrent-flows

E.G:

# re-run from the selected task onwards
$ cylc trigger workflow//cycle/task --flow=new

Thanks. I thought I had seen something related to this. I’ll explore it tomorrow.

Does this work if you have specified --no-run-name when starting the workflow? I couldn’t see that stated in the docs.

Also, when rewinding a section like described above, the original flow would need to essentially stop so the new one can take over. Would it need to be

cylc hold workflow
# kill the relevant active tasks
cylc trigger workflow//cycle/namespace --flow=new
cylc stop --flow="$CYLC_TASK_FLOW_NUMBERS"

Afterwards, what should we expect? A new ~/cylc-run/workflow/run2 directory?

How about if we don’t want to stop the original workflow but, we do want one part of it not to proceed whilst this new flow takes over that part? For example, we are processing data from multiple ensemble members. Ensemble member 5 needs to be rewound, but the other N members do not. We want to rewind ensemble member 5, but let all other members continue as they are. Then, when all members have completed their processing (e.g. final_task<member> => calculate_probabilities), can triggers go across flows?

Yes, the installation run name is not related to concurrent flows.

Afterwards, what should we expect? A new ~/cylc-run/workflow/run2 directory?

No, concurrent flows allow you to run multiple logical executions of the graph under the same scheduler.

They can be used to rerun all or part of a workflow in a similar-ish way to what you were doing before with cylc reset -s waiting. When a subsequent flow re-runs a task it will create a new job submission, but not a new workflow installation.

cylc trigger workflow//cycle/namespace --flow=new
cylc stop --flow="$CYLC_TASK_FLOW_NUMBERS"

Yes.

–flow=INT Stop flow number INT from spawning more tasks. The
scheduler will shut down if it is the only flow.

A flow is an advancing front which works its way through your graph. Stopping a flow from spawning more tasks halts the advance of this front.

How about if we don’t want to stop the original workflow but, we do want one part of it not to proceed whilst this new flow takes over that part?

Hold the bit of the workflow you don’t want to advance:

$ cylc hold <workflow>//<cycle>/<family-or-glob>

can triggers go across flows

When flows meet, they combine:

  • Each task has associated flow numbers.
  • When a task completes an output, it spawns any downstream tasks which depend on that output.
  • When tasks are spawned they inherit the flow numbers from the task which spawns them.

If a task has two dependencies and one of them is in flow 1 and the other in flow 2, then the task will inherit both flow numbers i.e. {1, 2}.

Flows do not overrun

Hold the bit of the workflow you don’t want to advance:

$ cylc hold <workflow>//<cycle>/<family-or-glob>

I’m not sure this is working how I expect it to.

I am:

  1. Launch the workflow
  2. Hold the task which would rewind the workflow
  3. Let the workflow advance a bit
  4. Release the rewind task

Rewind tasks do:

  1. holds the processing (cylc hold name "//$CYLC_TASK_CYCLE_POINT/RUN_$member")
  2. finds the list of families to rewind (e.g. HHH_024_$member, HHH_036_$member, etc)
  3. Kill those families
  4. Perform the original hold again for good measure

Next task cleans up some directories on the remote end (e.g. so rose-bunch can run properly).

The third task triggers the first task in the flow for the earliest family to be reset (e.g. wait_024_008), e.g. cylc trigger --flow=new name "//$CYLC_TASK_CYCLE_POINT/wait_${min_fhr}_$member"

This seems to start a new flow, but, here is where my new confusion happens. What I see is many tasks are held from the original flow, The new flow keeps the held state perhaps? So, if I had in cylc release name "//$CYLC_TASK_CYCLE_POINT/RUN_$member" I might reasonably expect things to run smoothly. However, instead, things run well in advance of the new flow, but I can understand that, the original flow still exists. However, I can’t do cylc stop --flow=1 because there are multiple members and that would stop their processing.

How do I stop flow=1 from proceeding for part of the graph only? The docs about rewinding talk about stopping a flow, but I do not want a flow stopped, only select entries in the flow.

The new flow keeps the held state perhaps?

Yes.

However, instead, things run well in advance of the new flow,

It sounds like you’re trying to do something very complex. You might want to consider holding all tasks (`cylc hold ‘//*’) so you can selectively release only the ones you want to run.

How do I stop flow=1 from proceeding for part of the graph only?

The cylc remove command can remove selected tasks from the workflow, however, at present it can only remove spawned inactive tasks. We are working on addressing this limitation, remove: extend beyond the task pool · Issue #5643 · cylc/cylc-flow · GitHub.

Note that a task re-running in a new flow will have an incremented submit number, so the new job logs will not overwrite the original ones. (However, I presume you are referring to job data files, not the Cylc job logs).

Relatedly, we plan to make cylc hold flow-specific Task hold/release needs to be flow-specific · Issue #4277 · cylc/cylc-flow · GitHub

When that’s done, you can selectively hold all tasks in flow 1, and the target fraction of those tasks can then be removed (if you want that part of flow 1 to stop dead), or left to merge with flow 2 when that catches up.

For the moment, as Oliver says, you can do it less conveniently by hold all tasks first.

@TomC - can I suggest to write a quick dummy workflow with a graph structure that minimally reproduces your use case, then you can use that to see how to manipulate multiple flows as you want without the additional complexities of a real workflow. Potentially, you could post the dummy config for us to help out too.

By holding all tasks, and releasing ones I want to run, that would require releasing them only when flow2 has caught up to flow1 for each task. I don’t think that is manageable.

I think I need at least the cylc hold --flow=1 type of functionality. so flow1 can be held, and superseded by flow2 as it comes across them.

I have just tried:

cylc hold <top level>
cylc kill <appropriate families>
cylc remove <appropriate families>
cylc release <top level>
cylc trigger <first task in flow>

But that didn’t work for me either as the previous held state exists for the removed tasks when they are eventually re-inserted in the pool.

The workflow (in its current state) I’m using to hack around with this is:

#!jinja2
{% set REWIND_BLOCK = '012' %}
{% set REWIND_MEMBER = '002' %}
{% set REWIND_TRIGGER = 'a' %}
[task parameters]
    block = 0, 12, 24, 36, 48, 60
    member = 1..3
    [[templates]]
        block = _%(block)03d
        member = _%(member)03d

[scheduling]
    initial cycle point = 20200101T0000Z
    final cycle point = 20200101T0000Z
    [[graph]]
        P1D = """
            wait<block-1, member> => wait<block, member>
            wait<block, member> => a<block, member>
                => b<block, member>
                => c<block, member> & d<block, member>
            c<block, member> => e<block, member>
            d<block, member> & e<block, member>
                => archive<block, member>
                => finalise<member>
                => cleanup

            # Force trigger of rewind for member 2
            {{REWIND_TRIGGER}}_{{REWIND_BLOCK}}_{{REWIND_MEMBER}} => kill_and_hold_{{REWIND_MEMBER}}
                => clean_remote_work_dirs_{{REWIND_MEMBER}}
                => rewind_{{REWIND_MEMBER}}
        """

[runtime]
    [[root]]
        script = sleep $((RANDOM%3+2))
        pre-script = set -x

    [[kill_and_hold<member>]]
        script = """
if [[ $CYLC_TASK_FLOW_NUMBERS != 1 ]]; then
# Just make this abort if not the first flow to stop an infinite loop forming
return 1
fi
cylc hold "$CYLC_WORKFLOW_NAME" "//20200101T0000Z/RUN_TASKS_{{REWIND_MEMBER}}"
# Hardcoded list of blocks to kill
for t in HHH_024 HHH_036 HHH_048 HHH_060 finalise; do
    tasks+=("//20200101T0000Z/${t}_{{REWIND_MEMBER}}")
done
# Kill/hold a couple of times in case there is some race/edge condition
cylc kill "$CYLC_WORKFLOW_NAME" "${tasks[@]}"
sleep 1
cylc hold "$CYLC_WORKFLOW_NAME" "//20200101T0000Z/RUN_TASKS_{{REWIND_MEMBER}}"
sleep 1
cylc kill "$CYLC_WORKFLOW_NAME" "${tasks[@]}"
sleep 1
cylc hold "$CYLC_WORKFLOW_NAME" "//20200101T0000Z/RUN_TASKS_{{REWIND_MEMBER}}"

# Remove all flow one tasks for the tasks we have tried to kill
cylc remove "$CYLC_WORKFLOW_NAME" "${tasks[@]}"

"""

    [[clean_remote_work_dirs<member>]]
        # remove old CYLC_TASK_WORK_DIR to ensure rose-bunch reruns tasks

    [[rewind<member>]]
        script = """
cylc trigger --flow=new "$CYLC_WORKFLOW_NAME" "//20200101T0000Z/wait_{{REWIND_BLOCK}}_{{REWIND_MEMBER}}"
cylc release "$CYLC_WORKFLOW_NAME" "//20200101T0000Z/RUN_TASKS_{{REWIND_MEMBER}}"
"""


    [[RUN_TASKS<member>]]

    [[HHH<block, member>]]
        inherit = RUN_TASKS<member>

{% for task in ('wait', 'a', 'b', 'c', 'd', 'e', 'archive') %}
    {% for member in ('001', '002', '003') %}
    [[{{ task }}<block>_{{member}}]]
        inherit = HHH<block>_{{member}}
    {% endfor %}
{% endfor %}


    [[finalise<member>]]
        inherit = RUN_TASKS<member>

    [[cleanup]]

I don’t understand exactly what you’re trying to do, but take a look at this simple example:

[scheduler]
    allow implicit tasks = True
[scheduling]
    [[graph]]
        R1 = """
            a => b & c
            c => d & e
            e => f & g
        """
[runtime]
    [[root]]
        script = "sleep 5"
    [[c]]
        script = """
            cylc hold ${CYLC_WORKFLOW_ID}//1/e
        """

tom-1

Run this, and wait for the task e to end up waiting and held, with c and d finished.

Now, start a new flow at a:

$ cylc trigger --flow=new tom//1/a

The new flow (2) will propagate through the graph, re-running a, b, c, and d. When it tries to spawn e it will find that e already exists in the task pool (waiting and held, from flow 1) and will merge with it.

Now release e to finish the workflow (e, f, and g):

$ cylc release tom//1/e

If you look at the scheduler log, you’ll see that e, f, and g continue with flow numbers {1,2}, reflecting what happened leading up to that point.

$  cylc log tom | grep '=> running'
2023-08-16T14:47:53+12:00 INFO - [1/a submitted job:01 flows:1] => running
2023-08-16T14:48:00+12:00 INFO - [1/b submitted job:01 flows:1] => running
2023-08-16T14:48:00+12:00 INFO - [1/c submitted job:01 flows:1] => running
2023-08-16T14:48:04+12:00 INFO - [1/d submitted job:01 flows:1] => running
2023-08-16T14:49:39+12:00 INFO - [1/a submitted job:02 flows:2] => running
2023-08-16T14:49:47+12:00 INFO - [1/c submitted job:02 flows:2] => running
2023-08-16T14:49:48+12:00 INFO - [1/b submitted job:02 flows:2] => running
2023-08-16T14:49:50+12:00 INFO - [1/d submitted job:02 flows:2] => running
2023-08-16T14:50:35+12:00 INFO - [1/e submitted job:01 flows:1,2] => running
2023-08-16T14:50:42+12:00 INFO - [1/f submitted job:01 flows:1,2] => running
2023-08-16T14:50:42+12:00 INFO - [1/g submitted job:01 flows:1,2] => running

A few comments:

  • I wonder if you’re complicating things by trying to “remove” tasks in the original flow when maybe you just need to hold them and wait for the new flow to catch up and merge before carrying on as normal?
  • The timing of hold and release operations is important, and hold only works on future tasks if you target specific task IDs (globbing only applies to the current task pool).
  • In more complex workflows, watch out for off-flow prerequisites - i.e. dependence on tasks outside of your manually triggered new flow. You’ll have to use cylc set-outputs to satisfy those manually (note this is about to get easier, wait for 8.3)

Also:

  • carefully choose points in the graph at which to hold and start flows, it is much easier at bottlenecks in the graph
  • if your new flow needs to kick off with multiple tasks, you can use cylc trigger more than once, but after the first time you’ll need to target the new flow number explicitly, e.g. --flow=2 (to avoid starting flow 3).

Note also, we intend to carefully document how to handle use cases like this when the aforementioned 8.3 developments (to do with manual interventions of all kinds) are done. So you’re a little bit ahead of the game here!

Remove was just added to try that approach as an experiment.

Yes, this is part of my problem. I can’t release a whole family because then all tasks for flow1 keep going along. But, if I don’t release, then other tasks can’t run when they are ready to be triggered. I could wait until flow2 has caught up to flow1 everywhere before releasing, but I’m unsure how to do that.

What are you unsure about? If all flow 1 tasks are held, flow 2 will continue until it merges with the held tasks, and nothing will be running. At that point, release them all.

Are you attempting to reproduce an intervention that you were able to perform with Cylc 7, or just experimenting with the new flow capabilities of Cylc 8?

The former seems unlikely, since “rewinding” a workflow without restarting the scheduler at a checkpoint (or at a cycle point) was near impossible if you had to go beyond the current task pool.

Yes. This. In Cylc7, the task pool was, for all intents and purposes, the current cycle as there was no n=0 limit. I don’t care about future cycles. We held the appropriate parent family. Noting in our case, nothing is was removed from the task pool until the end of a cycle. If tasks were being removed from the pool, more complexity would be needed by inserting tasks, but that is not a hard command.

How does one check if there are running tasks under a specific family cleanly? We do not want to pause the whole workflow as that just causes a buildup of tasks and extra load when they all start running at once when released. The checking for everything being paused, conceptually for me, would mean looping in the script for an indeterminate period of time (5 minutes? 30 minutes? More? - it depends on how long the tasks take, but should it time out after a while in case there is a problem and fail?).

OK, apologies, I just went back and read your original post more carefully.

the task pool was, for all intents and purposes, the current cycle …

In general that is not true. However I accept that in your case - presumably a clock-limited real time workflow that is not running behind - it was near enough to true that you could deal with it.

If there is any spread over cycles, as is often the case in a research context, the Cylc 7 task pool is not so simple, and pragmatically any significant rewind (even within one cycle) meant warm-starting the scheduler at an earlier cycle point.

We held the appropriate parent family. Noting in our case, nothing is was removed from the task pool until the end of a cycle. If tasks were being removed from the pool, more complexity would be needed by inserting tasks, but that is not a hard command.

Roger that. Hmmm, lemme have a think…

I can only imagine. Thankfully for me, I largely deal with realtime.

I’m happy to park this discussion for now. My guess is the hold tasks based on the flow will do what I’m hoping for because then I can just hold flow1, and (I’m guessing) flow2 will just clobber flow1’s hold when it gets to that point?

@TomC - is it the case that holding all family members individually (as opposed to using the family name, which as discussed above only matches in the current task pool) would be sufficient, if there’s a way to figure out automatically what the member names are?

Another simple example:

[scheduling]
    [[graph]]
        R1 = """
            pre => FAM:succeed-all => post
            a1 => a3
            a2 => a4
        """
[runtime]
   [[pre, post]]
   [[FAM]]
   [[a1, a2, a3, a4, a5]]
      inherit = FAM
      script = "sleep 20"

The members of FAM (a1, …, a5) have some internal dependencies so they don’t all run at the same time, which seems to be what you’ve described above.

Knowing what the member names are, I can run this workflow, and successfully hold the family as follows either before pre is finished, or while the first members are running:

for t in a1 a2 a3 a4 a5
    cylc hold tom//1/$t
end

(That syntax is fish - my interactive shell - but you get the idea).

Then the question is, how to figure out what the member names are without hard-wiring them.

The cylc list command can print information about the family inheritance hierarchy, perhaps not in the best format for parsing, but it would do:

$  cylc list --tree -b tom
root
 ├─FAM
 │ ├─a1
 │ ├─a2
 │ ├─a3
 │ ├─a4
 │ └─a5
 ├─bar
 └─foo
$ cylc list --mro tom
a1   a1 FAM root
a2   a2 FAM root
a3   a3 FAM root
a4   a4 FAM root
a5   a5 FAM root
bar  bar root
foo  foo root

The --mro option prints the “method resolution order” (linearized multiple inheritance hierarchy as nicked from the Python language by Cylc) and clearly shows that a1 (etc.) is a member of FAM.

Note that you don’t have to distinguish between running and waiting (or yet to be spawned) members, just target them all.

We might have a better solution in future, but for the moment does that help?

Yes, that might be sufficient to do what I’m trying to do. But, as this isn’t something I need to figure out right now, I think my best option is to wait for Cylc 8.3.0 and explore the new functionality it will be providing rather than hack something in which can hopefully be much simplified with new commands.

1 Like

@TomC - after talking with your colleague Xiao last week, I came back to take a closer look at this. (I probably should have done so before, sorry, but it’s not exactly a minimal example that can be understood at a glance :grin:)

Good news, this really is much easier in Cylc 8. In this sort of scenario, knowing what you had to do in Cylc 7 is probably an impediment!

At 8.3.0 you will be able to rewind and replay your sub-graph with just four commands:

# GLOB pattern to match run_002 tasks but not in block 000:
ID_PT=${CYLC_WORKFLOW_ID}//${CYLC_TASK_CYCLE_POINT}
GLOB="${ID_PT}/HHH_0[1-9]?_002"

# 1. hold to prevent matching queued tasks from submitting
cylc hold "$GLOB"

# 2. kill (and hold) matching active tasks
cylc kill "$GLOB"

#   (wait a little for async job kills to happen, then...) 
# 3. remove matching tasks (now failed or waiting) in the active window
cylc remove "$GLOB"

# 4. trigger the new "rewind" flow:
cylc trigger --flow=new ${ID_PT}/wait_012_002

That’s all. The new flow 2 merges with waiting finalise_002 task from flow 1 (spawned by block 000, which we don’t want to rewind), and things proceed as normal from there.

I’ve test-run this multiple times on my dev branch, with your example workflow config above - all good :tada:

It’s a bit trickier, but not much, with the current release. I can explain how to do that if you need it, but hopefully 8.3.0 will be along soon enough.