Skip to content

xtriggers: re-implement as async functions #3497

@oliver-sanders

Description

@oliver-sanders

Supersedes the same idea from #2917

The Problem:

At the moment XTriggers are run by the subprocess pool, consequences:

  • Each XTrigger call involves a subprocess call to a script.
  • Python is invoked for each xtrigger call.
  • XTriggers are re-imported for each xtrigger call.
  • Missing XTriggers [can] cause runtime errors

Botching Bodging the XTrigger loading mechanism for #3465 seems nasty.

Close #2917

The Proposal:

Convert XTriggers to asynchronous functions and call them directly from the main loop piggy-backing on the main loop plugin functionality introduced in #3492.

This will turn XTriggers into a blocking stage of the main loop (where they would effectively be running in an unlimited thread pool) something like this:

async def main_loop(self):
    # ...
    await asyncio.gather(*[
        trigger(args)
        for trigger in xtriggers
    ])
    # ...
    await asyncio.sleep()

Consequences:

  • This should be an efficiency boost as we are calling the XTrigger directly rather than going through SubProcPool, Subprocess, Python and importlib.
  • XTriggers would not be re-imported whilst the suite is running, if you want to change XTrigger code you need to restart your suite.
  • XTriggers would get loaded on suite start, missing xtriggers could not cause runtime errors.
  • XTriggers gain the logging, timing and standardisation offered by main loop plugins.
  • The responsibility for performing non-blocking IO lies with the XTrigger function writer.

Bonus Marks:

If we move to asynchronous XTriggers we gain the ability to have long-running XTrigger functions with minimal overheads.

This means we can very easily and efficiently implement a push interface for XTriggers (in addition to the pre existing pill interface):

async def push(*args):
    socket = ...
    await msg = socket.recv(). # could take hours, doesn’t matter
    return json.loads(msg)

Super Bonus Marks:

Another long-lived interface which would also be fairly straight forward to implement is ‘yield’ I.e a single long-lived asynchronous function which yields cycle points and data as and when it becomes available:

async def yields(*args)
    socket = ...
    while True:
        # listen for messages
        msg = await socket.recv()
        data = json.loads(msg)
        # yield XTrigger values to Cylc Flow when they arrive
        yield (
            data[‘time’], # cycle point
            data
        )

A nicer solution for Kafka / message brokers?

Hyper bonus marks:

Once you’ve made it as far as a yield interface you have effectively achieved the cycle driver interface I’ve been harking on about for the last two years. Use coroutines to provide a cycling interface rather than botching bodging external triggers into a cycling regime which doesn’t fit, they can become a first class cycling object in their own right:

async def drives(*args)
    while True:
        await data = event()
        # kicks off a new cycle point at time()
        # this provides a solution to observation type workflows where data doesn’t arrive on regular sequences
        yield (time(), data)

This is a little more involved, requiring a major abstraction of the cycling interface and classes and beyond the scope of Cylc8 but worth keeping in mind so as to keep doors open.

Questions:

  • Push interface a good idea? The overhead is Just one thread per active XTrigger.
  • Yield interface a good idea? Really nice for some applications. Awesome USP for Cylc.
  • Is there any reason we would want XTriggers to be re-imported whilst the suite is running?
  • Are we happy with an async implementation?

The backbone of the work has already been done with main loop plugins (XTriggers are just a special case) but are there any hitches @cylc/core ?

Pull requests welcome!

Metadata

Metadata

Assignees

No one assigned

    Labels

    efficiencyFor notable efficiency improvements

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions