I’m revisiting sequential tasks in the context of setting up some cron-like suites to handle a few tasks. Specifically, I have a data processing application that runs at a particular cycling frequency, but when things get backed up, it may run for longer than the cycling interval (e.g. cycles PT5M but may take up to PT15M to run). Because each instance processes whatever data is present, the intervening cycles don’t need to run, so I expire them. This leads to a suite that looks like:
[cylc]
UTC mode = True
[scheduling]
initial cycle point = now
[[special tasks]]
sequential = foo
clock-trigger = foo(PT0M)
clock-expire = foo(PT1M)
[[dependencies]]
[[[PT1M]]]
graph = """ foo"""
[runtime]
[[foo]]
script = """ sleep 150 """
On its face, this seems to do exactly what I need but when executed, this doesn’t actually work. Task foo
at the initial cycle point runs properly, downstream tasks expire properly, but once a task is expired every task thereafter ends up expired because the sequential line ends up translating to a dependency of foo[-PT1M]:succeeded => foo
. It appears that the “sequential” logic does not handle intervening task expirations. This led me to the next “I only want one of these running at once” solution: an internal queue of size one.
[cylc]
UTC mode = True
[scheduling]
initial cycle point = now
[[queues]]
[[[limit_1]]]
members = foo
limit = 1
[[special tasks]]
clock-trigger = foo(PT0M)
clock-expire = foo(PT1M)
[[dependencies]]
[[[PT1M]]]
graph = """ foo"""
[runtime]
[[foo]]
script = """ sleep 150 """
This “works”, except that in the real case (not this contrived example), I don’t have just one of these tasks, and the queue semantics means that I have to have a separate, single-member length-1 queue for each task.
Is there a better solution for this? Ideally the “sequential” configuration would handle this, but as I started piecing it out I couldn’t see an easy/fast solution to make that efficiently work. Perhaps someone here has better ideas.
The other option I considered would be an xtrigger that would check the suite to see if an instance of that task was already running:
[cylc]
UTC mode = True
[scheduling]
initial cycle point = now
[[xtriggers]]
sequential = sequential(name=%(name)s)
[[special tasks]]
clock-trigger = foo(PT0M)
clock-expire = foo(PT1M)
[[dependencies]]
[[[PT1M]]]
graph = """ @sequential => foo"""
[runtime]
[[foo]]
script = """ sleep 150 """
where the “sequential” xtrigger function polls the current suite for all instances of task %(name)s
and returns True
if none of them are in a running state (e.g. submitted, running, retrying).