-
Notifications
You must be signed in to change notification settings - Fork 152
Make async functions mappable #493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
02c5e25
1683bfc
aa122bd
5c24950
39bcbd8
2506a92
34509c7
ea57a59
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ Stream | |
| filter | ||
| flatten | ||
| map | ||
| map_async | ||
| partition | ||
| rate_limit | ||
| scatter | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -718,6 +718,86 @@ def update(self, x, who=None, metadata=None): | |
| return self._emit(result, metadata=metadata) | ||
|
|
||
|
|
||
| @Stream.register_api() | ||
| class map_async(Stream): | ||
| """ Apply an async function to every element in the stream, preserving order | ||
| even when evaluating multiple inputs in parallel. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| func: async callable | ||
| *args : | ||
| The arguments to pass to the function. | ||
| parallelism: | ||
| The maximum number of parallel Tasks for evaluating func, default value is 1 | ||
| **kwargs: | ||
| Keyword arguments to pass to func | ||
|
|
||
| Examples | ||
| -------- | ||
| >>> async def mult(x, factor=1): | ||
| ... return factor*x | ||
| >>> async def run(): | ||
| ... source = Stream(asynchronous=True) | ||
| ... source.map_async(mult, factor=2).sink(print) | ||
| ... for i in range(5): | ||
| ... await source.emit(i) | ||
| >>> asyncio.run(run()) | ||
| 0 | ||
| 2 | ||
| 4 | ||
| 6 | ||
| 8 | ||
| """ | ||
| def __init__(self, upstream, func, *args, parallelism=1, **kwargs): | ||
| self.func = func | ||
| stream_name = kwargs.pop('stream_name', None) | ||
| self.kwargs = kwargs | ||
| self.args = args | ||
| self.work_queue = asyncio.Queue(maxsize=parallelism) | ||
|
|
||
| Stream.__init__(self, upstream, stream_name=stream_name, ensure_io_loop=True) | ||
| self.work_task = self._create_task(self.work_callback()) | ||
|
|
||
| def update(self, x, who=None, metadata=None): | ||
| return self._create_task(self._insert_job(x, metadata)) | ||
|
|
||
| def _create_task(self, coro): | ||
| if gen.is_future(coro): | ||
| return coro | ||
| return self.loop.asyncio_loop.create_task(coro) | ||
|
|
||
| async def work_callback(self): | ||
| while True: | ||
| try: | ||
| task, metadata = await self.work_queue.get() | ||
| self.work_queue.task_done() | ||
| result = await task | ||
| except Exception as e: | ||
| logger.exception(e) | ||
| raise | ||
| else: | ||
| results = self._emit(result, metadata=metadata) | ||
| if results: | ||
| await asyncio.gather(*results) | ||
| self._release_refs(metadata) | ||
|
|
||
| async def _wait_for_work_slot(self): | ||
| while self.work_queue.full(): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was worried this would end up a busy loop eating the CPU - but if the queue is full, there must be coroutines waiting, so the sleep below will always yiel;d the loop to something else, right? I think, then, that this is fine.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, Sleep always yields so any other tasks on the loop have priority for next schedule slot, not just the ones held in the work queue here, but we are guaranteed to have at least one of those since the queue is full and we have the work callback. If the work items are all long running and they are blocked (say on IO) and the queue is full then back pressure will propagate upsteam via the Task enclosing |
||
| await asyncio.sleep(0) | ||
|
|
||
| async def _insert_job(self, x, metadata): | ||
| try: | ||
| await self._wait_for_work_slot() | ||
| coro = self.func(x, *self.args, **self.kwargs) | ||
| task = self._create_task(coro) | ||
| await self.work_queue.put((task, metadata)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is a race possible with the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not within the semantics of traditional interpreter. In free-threaded mode, maybe? The asyncio Queue is not thread safe but within an event loop (which must run entirely within a single thread) the get/put pair will not yield the event loop until the internal state of the Queue is consistent and they have achieved the requested action. If they could not complete the action in the current state, they block themselves on a Future that can only complete once the complementary action resolves entirely and once that Future comes back with a result, they do not yield the loop until they are done modifying the internal deque. |
||
| self._retain_refs(metadata) | ||
| except Exception as e: | ||
| logger.exception(e) | ||
| raise | ||
|
|
||
|
|
||
| @Stream.register_api() | ||
| class starmap(Stream): | ||
| """ Apply a function to every element in the stream, splayed out | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ | |
| from dask.dataframe.utils import assert_eq | ||
| import numpy as np | ||
| import pandas as pd | ||
| from flaky import flaky | ||
| from tornado import gen | ||
|
|
||
| from streamz import Stream | ||
|
|
@@ -570,6 +571,7 @@ def test_cumulative_aggregations(op, getter, stream): | |
| assert_eq(pd.concat(L), expected) | ||
|
|
||
|
|
||
| @flaky(max_runs=3, min_passes=1) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I noticed this one leaving an upstream around on occasion. It might be related to GC changes in 3.13+. |
||
| @gen_test() | ||
| def test_gc(): | ||
| sdf = sd.Random(freq='5ms', interval='100ms') | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only way to exit the loop. There should probably be a stop() method, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timed_window,timed_window_unique,delay,buffer, andlatestall use the samewhile True: ...construct for their work callback.Fun Fact: the event loop itself only holds a weak reference to any task so when the enclosing node is GCed, the underlying task can be swept away as long as it is not currently running. Once the queue starves it will get stuck waiting on an item that will never come and never schedule back in.
https://docs.python.org/3.14/library/asyncio-task.html#asyncio.create_task
I'm not actually sure that raising the exception is correct as it will kill the worker task and clog the stream.
mapraises the exception from update which should blow up the entire stream directly, right?