Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Stream
filter
flatten
map
map_async
partition
rate_limit
scatter
Expand Down
25 changes: 23 additions & 2 deletions docs/source/async.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ This would also work with async-await syntax in Python 3

.. code-block:: python

import asyncio
from streamz import Stream
from tornado.ioloop import IOLoop

async def f():
source = Stream(asynchronous=True) # tell the stream we're working asynchronously
Expand All @@ -82,7 +82,28 @@ This would also work with async-await syntax in Python 3
for x in range(10):
await source.emit(x)

IOLoop().run_sync(f)
asyncio.run(f())

When working asynchronously, we can also map asynchronous functions.

.. code-block:: python

async def increment_async(x):
""" A "long-running" increment function

Simulates a function that does real asyncio work.
"""
await asyncio.sleep(0.1)
return x + 1

async def f_inc():
source = Stream(asynchronous=True) # tell the stream we're working asynchronously
source.map_async(increment_async).rate_limit(0.500).sink(write)

for x in range(10):
await source.emit(x)

asyncio.run(f_inc())


Event Loop on a Separate Thread
Expand Down
80 changes: 80 additions & 0 deletions streamz/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,86 @@ def update(self, x, who=None, metadata=None):
return self._emit(result, metadata=metadata)


@Stream.register_api()
class map_async(Stream):
""" Apply an async function to every element in the stream, preserving order
even when evaluating multiple inputs in parallel.

Parameters
----------
func: async callable
*args :
The arguments to pass to the function.
parallelism:
The maximum number of parallel Tasks for evaluating func, default value is 1
**kwargs:
Keyword arguments to pass to func

Examples
--------
>>> async def mult(x, factor=1):
... return factor*x
>>> async def run():
... source = Stream(asynchronous=True)
... source.map_async(mult, factor=2).sink(print)
... for i in range(5):
... await source.emit(i)
>>> asyncio.run(run())
0
2
4
6
8
"""
def __init__(self, upstream, func, *args, parallelism=1, **kwargs):
self.func = func
stream_name = kwargs.pop('stream_name', None)
self.kwargs = kwargs
self.args = args
self.work_queue = asyncio.Queue(maxsize=parallelism)

Stream.__init__(self, upstream, stream_name=stream_name, ensure_io_loop=True)
self.work_task = self._create_task(self.work_callback())

def update(self, x, who=None, metadata=None):
return self._create_task(self._insert_job(x, metadata))

def _create_task(self, coro):
if gen.is_future(coro):
return coro
return self.loop.asyncio_loop.create_task(coro)

async def work_callback(self):
while True:
try:
task, metadata = await self.work_queue.get()
self.work_queue.task_done()
result = await task
except Exception as e:
logger.exception(e)
raise
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only way to exit the loop. There should probably be a stop() method, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timed_window, timed_window_unique, delay, buffer, and latest all use the same while True: ... construct for their work callback.

Fun Fact: the event loop itself only holds a weak reference to any task so when the enclosing node is GCed, the underlying task can be swept away as long as it is not currently running. Once the queue starves it will get stuck waiting on an item that will never come and never schedule back in.

https://docs.python.org/3.14/library/asyncio-task.html#asyncio.create_task

I'm not actually sure that raising the exception is correct as it will kill the worker task and clog the stream. map raises the exception from update which should blow up the entire stream directly, right?

else:
results = self._emit(result, metadata=metadata)
if results:
await asyncio.gather(*results)
self._release_refs(metadata)

async def _wait_for_work_slot(self):
while self.work_queue.full():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was worried this would end up a busy loop eating the CPU - but if the queue is full, there must be coroutines waiting, so the sleep below will always yiel;d the loop to something else, right? I think, then, that this is fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, await asyncio.sleep(0) is the defined way to yield the loop:
https://docs.python.org/3.14/library/asyncio-task.html#asyncio.sleep

Sleep always yields so any other tasks on the loop have priority for next schedule slot, not just the ones held in the work queue here, but we are guaranteed to have at least one of those since the queue is full and we have the work callback. If the work items are all long running and they are blocked (say on IO) and the queue is full then back pressure will propagate upsteam via the Task enclosing _insert_job which cannot progress until this loop exits and eventually the only task on the loop can make progress is technically this one, but it will always immediately yield the loop so as soon as any other task can make progress, that task will take the loop.

await asyncio.sleep(0)

async def _insert_job(self, x, metadata):
try:
await self._wait_for_work_slot()
coro = self.func(x, *self.args, **self.kwargs)
task = self._create_task(coro)
await self.work_queue.put((task, metadata))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a race possible with the await here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not within the semantics of traditional interpreter. In free-threaded mode, maybe? The asyncio Queue is not thread safe but within an event loop (which must run entirely within a single thread) the get/put pair will not yield the event loop until the internal state of the Queue is consistent and they have achieved the requested action. If they could not complete the action in the current state, they block themselves on a Future that can only complete once the complementary action resolves entirely and once that Future comes back with a result, they do not yield the loop until they are done modifying the internal deque.

self._retain_refs(metadata)
except Exception as e:
logger.exception(e)
raise


@Stream.register_api()
class starmap(Stream):
""" Apply a function to every element in the stream, splayed out
Expand Down
2 changes: 2 additions & 0 deletions streamz/dataframe/tests/test_dataframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dask.dataframe.utils import assert_eq
import numpy as np
import pandas as pd
from flaky import flaky
from tornado import gen

from streamz import Stream
Expand Down Expand Up @@ -570,6 +571,7 @@ def test_cumulative_aggregations(op, getter, stream):
assert_eq(pd.concat(L), expected)


@flaky(max_runs=3, min_passes=1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed this one leaving an upstream around on occasion. It might be related to GC changes in 3.13+.

@gen_test()
def test_gc():
sdf = sd.Random(freq='5ms', interval='100ms')
Expand Down
50 changes: 50 additions & 0 deletions streamz/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,56 @@ def add(x=0, y=0):
assert L[0] == 11


@gen_test()
def test_map_async_tornado():
@gen.coroutine
def add_tor(x=0, y=0):
return x + y

async def add_native(x=0, y=0):
await asyncio.sleep(0.1)
return x + y

source = Stream(asynchronous=True)
L = source.map_async(add_tor, y=1).map_async(add_native, parallelism=2, y=2).buffer(1).sink_to_list()

start = time()
yield source.emit(0)
yield source.emit(1)
yield source.emit(2)

def fail_func():
assert L == [3, 4, 5]

yield await_for(lambda: L == [3, 4, 5], 1, fail_func=fail_func)
assert (time() - start) == pytest.approx(0.1, abs=4e-3)


@pytest.mark.asyncio
async def test_map_async():
@gen.coroutine
def add_tor(x=0, y=0):
return x + y

async def add_native(x=0, y=0):
await asyncio.sleep(0.1)
return x + y

source = Stream(asynchronous=True)
L = source.map_async(add_tor, y=1).map_async(add_native, parallelism=2, y=2).sink_to_list()

start = time()
await source.emit(0)
await source.emit(1)
await source.emit(2)

def fail_func():
assert L == [3, 4, 5]

await await_for(lambda: L == [3, 4, 5], 1, fail_func=fail_func)
assert (time() - start) == pytest.approx(0.1, abs=4e-3)


def test_map_args():
source = Stream()
L = source.map(operator.add, 10).sink_to_list()
Expand Down
Loading