-
Notifications
You must be signed in to change notification settings - Fork 95
Description
Supersedes the same idea from #2917
The Problem:
At the moment XTriggers are run by the subprocess pool, consequences:
- Each XTrigger call involves a subprocess call to a script.
- Python is invoked for each xtrigger call.
- XTriggers are re-imported for each xtrigger call.
- Missing XTriggers [can] cause runtime errors
Botching Bodging the XTrigger loading mechanism for #3465 seems nasty.
Close #2917
The Proposal:
Convert XTriggers to asynchronous functions and call them directly from the main loop piggy-backing on the main loop plugin functionality introduced in #3492.
This will turn XTriggers into a blocking stage of the main loop (where they would effectively be running in an unlimited thread pool) something like this:
async def main_loop(self):
# ...
await asyncio.gather(*[
trigger(args)
for trigger in xtriggers
])
# ...
await asyncio.sleep()Consequences:
- This should be an efficiency boost as we are calling the XTrigger directly rather than going through
SubProcPool, Subprocess, Python andimportlib. - XTriggers would not be re-imported whilst the suite is running, if you want to change XTrigger code you need to restart your suite.
- XTriggers would get loaded on suite start, missing xtriggers could not cause runtime errors.
- XTriggers gain the logging, timing and standardisation offered by main loop plugins.
- The responsibility for performing non-blocking IO lies with the XTrigger function writer.
Bonus Marks:
If we move to asynchronous XTriggers we gain the ability to have long-running XTrigger functions with minimal overheads.
This means we can very easily and efficiently implement a push interface for XTriggers (in addition to the pre existing pill interface):
async def push(*args):
socket = ...
await msg = socket.recv(). # could take hours, doesn’t matter
return json.loads(msg)Super Bonus Marks:
Another long-lived interface which would also be fairly straight forward to implement is ‘yield’ I.e a single long-lived asynchronous function which yields cycle points and data as and when it becomes available:
async def yields(*args)
socket = ...
while True:
# listen for messages
msg = await socket.recv()
data = json.loads(msg)
# yield XTrigger values to Cylc Flow when they arrive
yield (
data[‘time’], # cycle point
data
)A nicer solution for Kafka / message brokers?
Hyper bonus marks:
Once you’ve made it as far as a yield interface you have effectively achieved the cycle driver interface I’ve been harking on about for the last two years. Use coroutines to provide a cycling interface rather than botching bodging external triggers into a cycling regime which doesn’t fit, they can become a first class cycling object in their own right:
async def drives(*args)
while True:
await data = event()
# kicks off a new cycle point at time()
# this provides a solution to observation type workflows where data doesn’t arrive on regular sequences
yield (time(), data)This is a little more involved, requiring a major abstraction of the cycling interface and classes and beyond the scope of Cylc8 but worth keeping in mind so as to keep doors open.
Questions:
- Push interface a good idea? The overhead is Just one thread per active XTrigger.
- Yield interface a good idea? Really nice for some applications. Awesome USP for Cylc.
- Is there any reason we would want XTriggers to be re-imported whilst the suite is running?
- Are we happy with an async implementation?
The backbone of the work has already been done with main loop plugins (XTriggers are just a special case) but are there any hitches @cylc/core ?
Pull requests welcome!