diff --git a/Cargo.toml b/Cargo.toml index 139af78..7cd61ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ package.rust-version = "1.83" [features] async-std-runtime = ["async-std"] +smol-runtime = ["smol", "task-local", "async-executor"] attributes = ["pyo3-async-runtimes-macros"] testing = ["clap", "inventory"] tokio-runtime = ["tokio"] @@ -32,16 +33,21 @@ unstable-streams = [ "futures-util/sink", "futures-channel/sink", ] -default = [] +default = ["smol-runtime"] [package.metadata.docs.rs] -features = ["attributes", "testing", "async-std-runtime", "tokio-runtime"] +features = ["attributes", "testing", "async-std-runtime", "tokio-runtime", "smol-runtime"] [[example]] name = "async_std" path = "examples/async_std.rs" required-features = ["attributes", "async-std-runtime"] +[[example]] +name = "smol" +path = "examples/smol.rs" +required-features = ["attributes", "smol-runtime"] + [[example]] name = "tokio" path = "examples/tokio.rs" @@ -70,6 +76,18 @@ path = "pytests/test_async_std_run_forever.rs" harness = false required-features = ["async-std-runtime", "testing"] +[[test]] +name = "test_smol_asyncio" +path = "pytests/test_smol_asyncio.rs" +harness = false +required-features = ["smol-runtime", "testing", "attributes"] + +[[test]] +name = "test_smol_run_forever" +path = "pytests/test_smol_run_forever.rs" +harness = false +required-features = ["async-std-runtime", "testing"] + [[test]] name = "test_tokio_current_thread_asyncio" path = "pytests/test_tokio_current_thread_asyncio.rs" @@ -100,6 +118,12 @@ path = "pytests/test_async_std_uvloop.rs" harness = false required-features = ["async-std-runtime", "testing"] +[[test]] +name = "test_smol_uvloop" +path = "pytests/test_smol_uvloop.rs" +harness = false +required-features = ["smol-runtime", "testing"] + [[test]] name = "test_tokio_current_thread_uvloop" path = "pytests/test_tokio_current_thread_uvloop.rs" @@ -121,6 +145,7 @@ required-features = ["async-std-runtime", "testing"] [dependencies] async-channel = { version = "2.3", optional = true } +async-executor = { version = "1.14.0", optional = true } clap = { version = "4.5", optional = true } futures-channel = "0.3" futures-util = "0.3" @@ -129,6 +154,7 @@ once_cell = "1.14" pin-project-lite = "0.2" pyo3 = "0.28" pyo3-async-runtimes-macros = { path = "pyo3-async-runtimes-macros", version = "=0.28.0", optional = true } +task-local = { version = "0.1.0", optional = true } [dev-dependencies] pyo3 = { version = "0.28", features = ["macros"] } @@ -138,6 +164,10 @@ version = "1.12" features = ["unstable"] optional = true +[dependencies.smol] +version = "2.0.2" +optional = true + [dependencies.tokio] version = "1.13" features = ["rt", "rt-multi-thread", "time"] diff --git a/examples/smol.rs b/examples/smol.rs new file mode 100644 index 0000000..8636256 --- /dev/null +++ b/examples/smol.rs @@ -0,0 +1,17 @@ +use pyo3::prelude::*; + +#[pyo3_async_runtimes::smol::main] +async fn main() -> PyResult<()> { + let fut = Python::attach(|py| { + let asyncio = py.import("asyncio")?; + + // convert asyncio.sleep into a Rust Future + pyo3_async_runtimes::smol::into_future(asyncio.call_method1("sleep", (1,))?) + })?; + + println!("sleeping for 1s"); + fut.await?; + println!("done"); + + Ok(()) +} diff --git a/pyo3-async-runtimes-macros/src/lib.rs b/pyo3-async-runtimes-macros/src/lib.rs index da7f3df..7506636 100644 --- a/pyo3-async-runtimes-macros/src/lib.rs +++ b/pyo3-async-runtimes-macros/src/lib.rs @@ -64,6 +64,62 @@ pub fn async_std_main(_attr: TokenStream, item: TokenStream) -> TokenStream { result.into() } +/// Enables an async main function that uses the async-std runtime. +/// +/// # Examples +/// +/// ```ignore +/// #[pyo3_async_runtimes::smol::main] +/// async fn main() -> PyResult<()> { +/// Ok(()) +/// } +/// ``` +#[cfg(not(test))] // NOTE: exporting main breaks tests, we should file an issue. +#[proc_macro_attribute] +pub fn smol_main(_attr: TokenStream, item: TokenStream) -> TokenStream { + let input = syn::parse_macro_input!(item as syn::ItemFn); + + let ret = &input.sig.output; + let inputs = &input.sig.inputs; + let name = &input.sig.ident; + let body = &input.block; + let attrs = &input.attrs; + let vis = &input.vis; + + if name != "main" { + return TokenStream::from(quote_spanned! { name.span() => + compile_error!("only the main function can be tagged with #[async_std::main]"), + }); + } + + if input.sig.asyncness.is_none() { + return TokenStream::from(quote_spanned! { input.span() => + compile_error!("the async keyword is missing from the function declaration"), + }); + } + + let result = quote! { + #vis fn main() { + #(#attrs)* + async fn main(#inputs) #ret { + #body + } + + pyo3::Python::initialize(); + + pyo3::Python::attach(|py| { + pyo3_async_runtimes::smol::run(py, main()) + .map_err(|e| { + e.print_and_set_sys_last_vars(py); + }) + .unwrap(); + }); + } + }; + + result.into() +} + /// Enables an async main function that uses the tokio runtime. /// /// # Arguments @@ -198,6 +254,103 @@ pub fn async_std_test(_attr: TokenStream, item: TokenStream) -> TokenStream { result.into() } +/// Registers an `async-std` test with the `pyo3-asyncio` test harness. +/// +/// This attribute is meant to mirror the `#[test]` attribute and allow you to mark a function for +/// testing within an integration test. Like the `#[smol::test]` attribute, it will accept +/// `async` test functions, but it will also accept blocking functions as well. +/// +/// # Examples +/// ```ignore +/// use std::{time::Duration, thread}; +/// +/// use pyo3::prelude::*; +/// +/// // async test function +/// #[pyo3_async_runtimes::smol::test] +/// async fn test_async_sleep() -> PyResult<()> { +/// smol::Timer::after(Duration::from_secs(1)).await; +/// Ok(()) +/// } +/// +/// // blocking test function +/// #[pyo3_async_runtimes::smol::test] +/// fn test_blocking_sleep() -> PyResult<()> { +/// thread::sleep(Duration::from_secs(1)); +/// Ok(()) +/// } +/// +/// // blocking test functions can optionally accept an event_loop parameter +/// #[pyo3_async_runtimes::smol::test] +/// fn test_blocking_sleep_with_event_loop(event_loop: Py) -> PyResult<()> { +/// thread::sleep(Duration::from_secs(1)); +/// Ok(()) +/// } +/// ``` +#[cfg(not(test))] // NOTE: exporting main breaks tests, we should file an issue. +#[proc_macro_attribute] +pub fn smol_test(_attr: TokenStream, item: TokenStream) -> TokenStream { + let input = syn::parse_macro_input!(item as syn::ItemFn); + + let sig = &input.sig; + let name = &input.sig.ident; + let body = &input.block; + let vis = &input.vis; + + let fn_impl = if input.sig.asyncness.is_none() { + // Optionally pass an event_loop parameter to blocking tasks + let task = if sig.inputs.is_empty() { + quote! { + Box::pin(pyo3_async_runtimes::smol::re_exports::spawn_blocking(move || { + #name() + })) + } + } else { + quote! { + let event_loop = pyo3::Python::attach(|py| { + pyo3_async_runtimes::smol::get_current_loop(py).unwrap().into() + }); + Box::pin(pyo3_async_runtimes::smol::re_exports::spawn_blocking(move || { + #name(event_loop) + })) + } + }; + + quote! { + #vis fn #name() -> std::pin::Pin> + Send>> { + #sig { + #body + } + + #task + } + } + } else { + quote! { + #vis fn #name() -> std::pin::Pin> + Send>> { + #sig { + #body + } + + Box::pin(#name()) + } + } + }; + + let result = quote! { + #fn_impl + + pyo3_async_runtimes::inventory::submit! { + pyo3_async_runtimes::testing::Test { + name: concat!(std::module_path!(), "::", stringify!(#name)), + test_fn: &#name + } + } + }; + + result.into() +} + /// Registers a `tokio` test with the `pyo3-asyncio` test harness. /// /// This attribute is meant to mirror the `#[test]` attribute and allow you to mark a function for diff --git a/pytests/common copy/mod.rs b/pytests/common copy/mod.rs new file mode 100644 index 0000000..9989037 --- /dev/null +++ b/pytests/common copy/mod.rs @@ -0,0 +1,65 @@ +use std::ffi::CString; +use std::{thread, time::Duration}; + +use pyo3::prelude::*; +use pyo3_async_runtimes::TaskLocals; + +pub(super) const TEST_MOD: &str = r#" +import asyncio + +async def py_sleep(duration): + await asyncio.sleep(duration) + +async def sleep_for_1s(sleep_for): + await sleep_for(1) +"#; + +pub(super) async fn test_into_future(event_loop: Py) -> PyResult<()> { + let fut = Python::attach(|py| { + let test_mod = PyModule::from_code( + py, + &CString::new(TEST_MOD).unwrap(), + &CString::new("test_rust_coroutine/test_mod.py").unwrap(), + &CString::new("test_mod").unwrap(), + )?; + + pyo3_async_runtimes::into_future_with_locals( + &TaskLocals::new(event_loop.into_bound(py)), + test_mod.call_method1("py_sleep", (1,))?, + ) + })?; + + fut.await?; + + Ok(()) +} + +pub(super) fn test_blocking_sleep() -> PyResult<()> { + thread::sleep(Duration::from_secs(1)); + Ok(()) +} + +pub(super) async fn test_other_awaitables(event_loop: Py) -> PyResult<()> { + let fut = Python::attach(|py| { + let functools = py.import("functools")?; + let time = py.import("time")?; + + // spawn a blocking sleep in the threadpool executor - returns a task, not a coroutine + let task = event_loop.bind(py).call_method1( + "run_in_executor", + ( + py.None(), + functools.call_method1("partial", (time.getattr("sleep")?, 1))?, + ), + )?; + + pyo3_async_runtimes::into_future_with_locals( + &TaskLocals::new(event_loop.into_bound(py)), + task, + ) + })?; + + fut.await?; + + Ok(()) +} diff --git a/pytests/test_smol_asyncio.rs b/pytests/test_smol_asyncio.rs new file mode 100644 index 0000000..fdcce4e --- /dev/null +++ b/pytests/test_smol_asyncio.rs @@ -0,0 +1,419 @@ +mod common; + +use std::ffi::CString; +use std::{ + rc::Rc, + sync::{Arc, Mutex}, + time::Duration, +}; + +use pyo3::{ + prelude::*, + types::{IntoPyDict, PyType}, + wrap_pyfunction, wrap_pymodule, +}; +use pyo3_async_runtimes::TaskLocals; + +#[cfg(feature = "unstable-streams")] +use futures_util::stream::{StreamExt, TryStreamExt}; + +#[pyfunction] +fn sleep<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult> { + let secs = secs.extract()?; + + pyo3_async_runtimes::smol::future_into_py(py, async move { + smol::Timer::after(Duration::from_secs(secs)).await; + Ok(()) + }) +} + +#[pyo3_async_runtimes::smol::test] +async fn test_future_into_py() -> PyResult<()> { + let fut = Python::attach(|py| { + let sleeper_mod = PyModule::new(py, "rust_sleeper")?; + + sleeper_mod.add_wrapped(wrap_pyfunction!(sleep))?; + + let test_mod = PyModule::from_code( + py, + &CString::new(common::TEST_MOD).unwrap(), + &CString::new("test_future_into_py_mod.py").unwrap(), + &CString::new("test_future_into_py_mod").unwrap(), + )?; + + pyo3_async_runtimes::smol::into_future( + test_mod.call_method1("sleep_for_1s", (sleeper_mod.getattr("sleep")?,))?, + ) + })?; + + fut.await?; + + Ok(()) +} + +#[pyo3_async_runtimes::smol::test] +async fn test_async_sleep() -> PyResult<()> { + let asyncio = Python::attach(|py| py.import("asyncio").map(Py::::from))?; + + smol::Timer::after(Duration::from_secs(1)).await; + + Python::attach(|py| { + pyo3_async_runtimes::smol::into_future(asyncio.bind(py).call_method1("sleep", (1.0,))?) + })? + .await?; + + Ok(()) +} + +#[pyo3_async_runtimes::smol::test] +fn test_blocking_sleep() -> PyResult<()> { + common::test_blocking_sleep() +} + +#[pyo3_async_runtimes::smol::test] +async fn test_into_future() -> PyResult<()> { + common::test_into_future(Python::attach(|py| { + pyo3_async_runtimes::smol::get_current_loop(py) + .unwrap() + .into() + })) + .await +} + +#[pyo3_async_runtimes::smol::test] +async fn test_other_awaitables() -> PyResult<()> { + common::test_other_awaitables(Python::attach(|py| { + pyo3_async_runtimes::smol::get_current_loop(py) + .unwrap() + .into() + })) + .await +} + +#[pyo3_async_runtimes::smol::test] +async fn test_panic() -> PyResult<()> { + let fut = Python::attach(|py| -> PyResult<_> { + pyo3_async_runtimes::smol::into_future( + pyo3_async_runtimes::smol::future_into_py::<_, ()>(py, async { + panic!("this panic was intentional!") + })?, + ) + })?; + + match fut.await { + Ok(_) => panic!("coroutine should panic"), + Err(e) => Python::attach(|py| { + if e.is_instance_of::(py) { + Ok(()) + } else { + panic!("expected RustPanic err") + } + }), + } +} + +#[pyo3_async_runtimes::smol::test] +async fn test_local_future_into_py() -> PyResult<()> { + Python::attach(|py| { + let non_send_secs = Rc::new(1); + + #[allow(deprecated)] + let py_future = pyo3_async_runtimes::smol::local_future_into_py(py, async move { + smol::Timer::after(Duration::from_secs(*non_send_secs)).await; + Ok(()) + })?; + + pyo3_async_runtimes::smol::into_future(py_future) + })? + .await?; + + Ok(()) +} + +#[pyo3_async_runtimes::smol::test] +async fn test_cancel() -> PyResult<()> { + let completed = Arc::new(Mutex::new(false)); + + let py_future = Python::attach(|py| -> PyResult> { + let completed = Arc::clone(&completed); + Ok( + pyo3_async_runtimes::smol::future_into_py(py, async move { + smol::Timer::after(Duration::from_secs(1)).await; + *completed.lock().unwrap() = true; + + Ok(()) + })? + .into(), + ) + })?; + + if let Err(e) = Python::attach(|py| -> PyResult<_> { + py_future.bind(py).call_method0("cancel")?; + pyo3_async_runtimes::smol::into_future(py_future.into_bound(py)) + })? + .await + { + Python::attach(|py| -> PyResult<()> { + assert!(e.value(py).is_instance( + py.import("asyncio")? + .getattr("CancelledError")? + .cast::() + .unwrap() + )?); + Ok(()) + })?; + } else { + panic!("expected CancelledError"); + } + + smol::Timer::after(Duration::from_secs(1)).await; + if *completed.lock().unwrap() { + panic!("future still completed") + } + + Ok(()) +} + +#[cfg(feature = "unstable-streams")] +const ASYNC_STD_TEST_MOD: &str = r#" +import asyncio + +async def gen(): + for i in range(10): + await asyncio.sleep(0.1) + yield i +"#; + +#[cfg(feature = "unstable-streams")] +#[pyo3_async_runtimes::smol::test] +async fn test_async_gen_v1() -> PyResult<()> { + let stream = Python::attach(|py| { + let test_mod = PyModule::from_code( + py, + &CString::new(ASYNC_STD_TEST_MOD).unwrap(), + &CString::new("test_rust_coroutine/async_std_test_mod.py").unwrap(), + &CString::new("async_std_test_mod").unwrap(), + )?; + + pyo3_async_runtimes::smol::into_stream_v1(test_mod.call_method0("gen")?) + })?; + + let vals = stream + .map(|item| Python::attach(|py| -> PyResult { item?.bind(py).extract() })) + .try_collect::>() + .await?; + + assert_eq!((0..10).collect::>(), vals); + + Ok(()) +} + +#[pyo3_async_runtimes::smol::test] +fn test_local_cancel(event_loop: Py) -> PyResult<()> { + let locals = Python::attach(|py| -> PyResult { + TaskLocals::new(event_loop.into_bound(py)).copy_context(py) + })?; + smol::block_on(pyo3_async_runtimes::smol::scope_local(locals, async { + let completed = Arc::new(Mutex::new(false)); + + let py_future = Python::attach(|py| -> PyResult> { + let completed = Arc::clone(&completed); + Ok( + pyo3_async_runtimes::smol::future_into_py(py, async move { + smol::Timer::after(Duration::from_secs(1)).await; + *completed.lock().unwrap() = true; + + Ok(()) + })? + .into(), + ) + })?; + + if let Err(e) = Python::attach(|py| -> PyResult<_> { + py_future.bind(py).call_method0("cancel")?; + pyo3_async_runtimes::smol::into_future(py_future.into_bound(py)) + })? + .await + { + Python::attach(|py| -> PyResult<()> { + assert!(e.value(py).is_instance( + py.import("asyncio")? + .getattr("CancelledError")? + .cast::() + .unwrap() + )?); + Ok(()) + })?; + } else { + panic!("expected CancelledError"); + } + + smol::Timer::after(Duration::from_secs(1)).await; + if *completed.lock().unwrap() { + panic!("future still completed") + } + + Ok(()) + })) +} + +/// This module is implemented in Rust. +#[pymodule] +fn test_mod(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { + #![allow(deprecated)] + #[pyfunction(name = "sleep")] + fn sleep_(py: Python) -> PyResult> { + pyo3_async_runtimes::smol::future_into_py(py, async move { + smol::Timer::after(Duration::from_millis(500)).await; + Ok(()) + }) + } + + m.add_function(wrap_pyfunction!(sleep_, m)?)?; + + Ok(()) +} + +const MULTI_ASYNCIO_CODE: &str = r#" +async def main(): + return await test_mod.sleep() + +asyncio.new_event_loop().run_until_complete(main()) +"#; + +#[pyo3_async_runtimes::smol::test] +fn test_multiple_asyncio_run() -> PyResult<()> { + Python::attach(|py| { + pyo3_async_runtimes::smol::run(py, async move { + smol::Timer::after(Duration::from_millis(500)).await; + Ok(()) + })?; + pyo3_async_runtimes::smol::run(py, async move { + smol::Timer::after(Duration::from_millis(500)).await; + Ok(()) + })?; + + let d = [ + ("asyncio", py.import("asyncio")?.into()), + ("test_mod", wrap_pymodule!(test_mod)(py)), + ] + .into_py_dict(py)?; + + py.run(&CString::new(MULTI_ASYNCIO_CODE).unwrap(), Some(&d), None)?; + py.run(&CString::new(MULTI_ASYNCIO_CODE).unwrap(), Some(&d), None)?; + Ok(()) + }) +} + +#[pymodule] +fn cvars_mod(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { + #![allow(deprecated)] + #[pyfunction] + pub(crate) fn async_callback(py: Python, callback: Py) -> PyResult> { + pyo3_async_runtimes::smol::future_into_py(py, async move { + Python::attach(|py| { + pyo3_async_runtimes::smol::into_future(callback.bind(py).call0()?) + })? + .await?; + + Ok(()) + }) + } + + m.add_function(wrap_pyfunction!(async_callback, m)?)?; + + Ok(()) +} + +#[cfg(feature = "unstable-streams")] +#[pyo3_async_runtimes::smol::test] +async fn test_async_gen_v2() -> PyResult<()> { + let stream = Python::attach(|py| { + let test_mod = PyModule::from_code( + py, + &CString::new(ASYNC_STD_TEST_MOD).unwrap(), + &CString::new("test_rust_coroutine/async_std_test_mod.py").unwrap(), + &CString::new("async_std_test_mod").unwrap(), + )?; + + pyo3_async_runtimes::smol::into_stream_v2(test_mod.call_method0("gen")?) + })?; + + let vals = stream + .map(|item| Python::attach(|py| -> PyResult { item.bind(py).extract() })) + .try_collect::>() + .await?; + + assert_eq!((0..10).collect::>(), vals); + + Ok(()) +} + +#[cfg(feature = "unstable-streams")] +const ASYNC_STD_TEST_MOD_FASTGEN: &str = r#" + +async def gen(): + for i in range(1000): + yield i +"#; + +#[cfg(feature = "unstable-streams")] +#[pyo3_async_runtimes::smol::test] +async fn test_async_gen_full_buffer() -> PyResult<()> { + let stream = Python::attach(|py| { + let test_mod = PyModule::from_code( + py, + &CString::new(ASYNC_STD_TEST_MOD_FASTGEN).unwrap(), + &CString::new("test_rust_coroutine/async_std_test_mod.py").unwrap(), + &CString::new("async_std_test_mod").unwrap(), + )?; + + pyo3_async_runtimes::smol::into_stream_v2(test_mod.call_method0("gen")?) + })?; + + let vals = stream + .map(|item| Python::attach(|py| -> PyResult { item.bind(py).extract() })) + .try_collect::>() + .await?; + + assert_eq!((0..1000).collect::>(), vals); + + Ok(()) +} + +const CONTEXTVARS_CODE: &str = r#" +cx = contextvars.ContextVar("cx") + +async def contextvars_test(): + assert cx.get() == "foobar" + +async def main(): + cx.set("foobar") + await cvars_mod.async_callback(contextvars_test) + +asyncio.run(main()) +"#; + +#[pyo3_async_runtimes::smol::test] +fn test_contextvars() -> PyResult<()> { + Python::attach(|py| { + let d = [ + ("asyncio", py.import("asyncio")?.into()), + ("contextvars", py.import("contextvars")?.into()), + ("cvars_mod", wrap_pymodule!(cvars_mod)(py)), + ] + .into_py_dict(py)?; + + py.run(&CString::new(CONTEXTVARS_CODE).unwrap(), Some(&d), None)?; + py.run(&CString::new(CONTEXTVARS_CODE).unwrap(), Some(&d), None)?; + Ok(()) + }) +} + +fn main() -> pyo3::PyResult<()> { + Python::initialize(); + + Python::attach(|py| { + pyo3_async_runtimes::smol::run(py, pyo3_async_runtimes::testing::main()) + }) +} diff --git a/pytests/test_smol_run_forever.rs b/pytests/test_smol_run_forever.rs new file mode 100644 index 0000000..5625871 --- /dev/null +++ b/pytests/test_smol_run_forever.rs @@ -0,0 +1,48 @@ +use std::time::Duration; + +use pyo3::prelude::*; + +fn dump_err(py: Python, e: PyErr) { + // We can't display Python exceptions via std::fmt::Display, + // so print the error here manually. + e.print_and_set_sys_last_vars(py); +} + +fn main() { + Python::initialize(); + + Python::attach(|py| { + let asyncio = py.import("asyncio")?; + + let event_loop = asyncio.call_method0("new_event_loop")?; + asyncio.call_method1("set_event_loop", (&event_loop,))?; + + let event_loop_hdl: Py = event_loop.clone().into(); + + smol::spawn(async move { + smol::Timer::after(Duration::from_secs(1)).await; + + Python::attach(|py| { + event_loop_hdl + .bind(py) + .call_method1( + "call_soon_threadsafe", + (event_loop_hdl + .bind(py) + .getattr("stop") + .map_err(|e| dump_err(py, e)) + .unwrap(),), + ) + .map_err(|e| dump_err(py, e)) + .unwrap(); + }) + }); + + event_loop.call_method0("run_forever")?; + + println!("test test_async_std_run_forever ... ok"); + Ok(()) + }) + .map_err(|e| Python::attach(|py| dump_err(py, e))) + .unwrap() +} diff --git a/pytests/test_smol_uvloop.rs b/pytests/test_smol_uvloop.rs new file mode 100644 index 0000000..ce5ebd9 --- /dev/null +++ b/pytests/test_smol_uvloop.rs @@ -0,0 +1,43 @@ +#[cfg(not(target_os = "windows"))] +fn main() -> pyo3::PyResult<()> { + use pyo3::{prelude::*, types::PyType}; + Python::initialize(); + + Python::attach(|py| { + // uvloop not supported on the free-threaded build yet + // https://github.com/MagicStack/uvloop/issues/642 + let sysconfig = py.import("sysconfig")?; + let is_freethreaded = sysconfig.call_method1("get_config_var", ("Py_GIL_DISABLED",))?; + if is_freethreaded.is_truthy()? { + return Ok(()); + } + + // uvloop not yet supported on 3.14 + if py.version_info() >= (3, 14) { + return Ok(()); + } + + let uvloop = py.import("uvloop")?; + uvloop.call_method0("install")?; + + // store a reference for the assertion + let uvloop: Py = uvloop.into(); + + pyo3_async_runtimes::smol::run(py, async move { + // verify that we are on a uvloop.Loop + Python::attach(|py| -> PyResult<()> { + assert!(pyo3_async_runtimes::smol::get_current_loop(py)? + .is_instance(uvloop.bind(py).getattr("Loop")?.cast::().unwrap())?); + Ok(()) + })?; + + smol::Timer::after(std::time::Duration::from_secs(1)).await; + + println!("test test_smol_uvloop ... ok"); + Ok(()) + }) + }) +} + +#[cfg(target_os = "windows")] +fn main() {} diff --git a/src/async_std.rs b/src/async_std.rs index 9824eab..bdc3df8 100644 --- a/src/async_std.rs +++ b/src/async_std.rs @@ -584,9 +584,9 @@ pub fn into_future( /// ``` #[cfg(feature = "unstable-streams")] pub fn into_stream_v1( - gen: Bound<'_, PyAny>, + r#gen: Bound<'_, PyAny>, ) -> PyResult>> + 'static> { - generic::into_stream_v1::(gen) + generic::into_stream_v1::(r#gen) } /// unstable-streams Convert an async generator into a stream @@ -646,9 +646,9 @@ pub fn into_stream_v1( #[cfg(feature = "unstable-streams")] pub fn into_stream_with_locals_v1( locals: TaskLocals, - gen: Bound<'_, PyAny>, + r#gen: Bound<'_, PyAny>, ) -> PyResult>> + 'static> { - generic::into_stream_with_locals_v1::(locals, gen) + generic::into_stream_with_locals_v1::(locals, r#gen) } /// unstable-streams Convert an async generator into a stream @@ -708,9 +708,9 @@ pub fn into_stream_with_locals_v1( #[cfg(feature = "unstable-streams")] pub fn into_stream_with_locals_v2( locals: TaskLocals, - gen: Bound<'_, PyAny>, + r#gen: Bound<'_, PyAny>, ) -> PyResult> + 'static> { - generic::into_stream_with_locals_v2::(locals, gen) + generic::into_stream_with_locals_v2::(locals, r#gen) } /// unstable-streams Convert an async generator into a stream @@ -765,7 +765,7 @@ pub fn into_stream_with_locals_v2( /// ``` #[cfg(feature = "unstable-streams")] pub fn into_stream_v2( - gen: Bound<'_, PyAny>, + r#gen: Bound<'_, PyAny>, ) -> PyResult> + 'static> { - generic::into_stream_v2::(gen) + generic::into_stream_v2::(r#gen) } diff --git a/src/lib.rs b/src/lib.rs index 4ba5961..d34c867 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -351,12 +351,15 @@ pub use inventory; #[cfg(feature = "testing")] pub mod testing; -#[cfg(feature = "async-std")] +#[cfg(feature = "async-std-runtime")] pub mod async_std; #[cfg(feature = "tokio-runtime")] pub mod tokio; +#[cfg(feature = "smol-runtime")] +pub mod smol; + /// Errors and exceptions related to PyO3 Asyncio pub mod err; @@ -389,6 +392,7 @@ pub mod doc_test { #[cfg(all( feature = "async-std-runtime", feature = "tokio-runtime", + feature = "smol-runtime", feature = "attributes" ))] doctest!("../README.md", readme_md); diff --git a/src/smol.rs b/src/smol.rs new file mode 100644 index 0000000..5280b2f --- /dev/null +++ b/src/smol.rs @@ -0,0 +1,778 @@ +//! async-std-runtime PyO3 Asyncio functions specific to the async-std runtime +//! +//! Items marked with +//! unstable-streams +//! +//! are only available when the `unstable-streams` Cargo feature is enabled: +//! +//! ```toml +//! [dependencies.pyo3-async-runtimes] +//! version = "0.24" +//! features = ["unstable-streams"] +//! ``` + +use futures_util::future::FutureExt; +use pyo3::prelude::*; +use std::{ + any::Any, + cell::RefCell, + future::Future, + panic::{self, AssertUnwindSafe}, + pin::Pin, +}; +use task_local::task_local; + +use crate::{ + generic::{self, ContextExt, JoinError, LocalContextExt, Runtime, SpawnLocalExt}, + TaskLocals, +}; + +/// attributes +/// re-exports for macros +#[cfg(feature = "attributes")] +pub mod re_exports { + /// re-export spawn_blocking for use in `#[test]` macro without external dependency + pub use smol::unblock as spawn_blocking; +} + +/// attributes Provides the boilerplate for the `async-std` runtime and runs an async fn as main +#[cfg(feature = "attributes")] +pub use pyo3_async_runtimes_macros::smol_main as main; + +/// attributes +/// testing +/// Registers an `async-std` test with the `pyo3-asyncio` test harness +#[cfg(all(feature = "attributes", feature = "testing"))] +pub use pyo3_async_runtimes_macros::smol_test as test; + +struct SmolJoinErr(Box); + +impl JoinError for SmolJoinErr { + fn is_panic(&self) -> bool { + true + } + fn into_panic(self) -> Box { + self.0 + } +} + +task_local! { + static TASK_LOCALS: RefCell>; +} + +struct SmolRuntime; + +impl Runtime for SmolRuntime { + type JoinError = SmolJoinErr; + type JoinHandle = smol::Task>; + + fn spawn(fut: F) -> Self::JoinHandle + where + F: Future + Send + 'static, + { + smol::spawn(async move { + AssertUnwindSafe(fut) + .catch_unwind() + .await + .map_err(SmolJoinErr) + }) + } + + fn spawn_blocking(f: F) -> Self::JoinHandle + where + F: FnOnce() + Send + 'static, + { + smol::unblock(move || { + panic::catch_unwind(AssertUnwindSafe(f)).map_err(|e| SmolJoinErr(Box::new(e))) + }) + } +} + +impl ContextExt for SmolRuntime { + fn scope(locals: TaskLocals, fut: F) -> Pin + Send>> + where + F: Future + Send + 'static, + { + let old = TASK_LOCALS.with(|c| c.replace(Some(locals))); + Box::pin(async move { + let result = fut.await; + TASK_LOCALS.with(|c| c.replace(old)); + result + }) + } + + fn get_task_locals() -> Option { + TASK_LOCALS + .try_with(|c| c.borrow().as_ref().map(|locals| locals.clone())) + .unwrap_or_default() + } +} + +impl SpawnLocalExt for SmolRuntime { + fn spawn_local(fut: F) -> Self::JoinHandle + where + F: Future + 'static, + { + let executor = async_executor::LocalExecutor::new(); + executor.spawn(async move { + fut.await; + Ok(()) + }) + } +} + +impl LocalContextExt for SmolRuntime { + fn scope_local(locals: TaskLocals, fut: F) -> Pin>> + where + F: Future + 'static, + { + let old = TASK_LOCALS.with(|c| c.replace(Some(locals))); + Box::pin(async move { + let result = fut.await; + TASK_LOCALS.with(|c| c.replace(old)); + result + }) + } +} + +/// Set the task local event loop for the given future +pub async fn scope(locals: TaskLocals, fut: F) -> R +where + F: Future + Send + 'static, +{ + SmolRuntime::scope(locals, fut).await +} + +/// Set the task local event loop for the given !Send future +pub async fn scope_local(locals: TaskLocals, fut: F) -> R +where + F: Future + 'static, +{ + SmolRuntime::scope_local(locals, fut).await +} + +/// Get the current event loop from either Python or Rust async task local context +/// +/// This function first checks if the runtime has a task-local reference to the Python event loop. +/// If not, it calls [`get_running_loop`](`crate::get_running_loop`) to get the event loop +/// associated with the current OS thread. +pub fn get_current_loop(py: Python) -> PyResult> { + generic::get_current_loop::(py) +} + +/// Either copy the task locals from the current task OR get the current running loop and +/// contextvars from Python. +pub fn get_current_locals(py: Python) -> PyResult { + generic::get_current_locals::(py) +} + +/// Run the event loop until the given Future completes +/// +/// The event loop runs until the given future is complete. +/// +/// After this function returns, the event loop can be resumed with [`run_until_complete`] +/// +/// # Arguments +/// * `event_loop` - The Python event loop that should run the future +/// * `fut` - The future to drive to completion +/// +/// # Examples +/// +/// ``` +/// # use std::time::Duration; +/// # +/// # use pyo3::prelude::*; +/// # +/// # Python::initialize(); +/// # +/// # Python::attach(|py| -> PyResult<()> { +/// # let event_loop = py.import("asyncio")?.call_method0("new_event_loop")?; +/// pyo3_async_runtimes::async_std::run_until_complete(event_loop, async move { +/// async_std::task::sleep(Duration::from_secs(1)).await; +/// Ok(()) +/// })?; +/// # Ok(()) +/// # }).unwrap(); +/// ``` +pub fn run_until_complete(event_loop: Bound, fut: F) -> PyResult +where + F: Future> + Send + 'static, + T: Send + Sync + 'static, +{ + generic::run_until_complete::(&event_loop, fut) +} + +/// Run the event loop until the given Future completes +/// +/// # Arguments +/// * `py` - The current PyO3 GIL guard +/// * `fut` - The future to drive to completion +/// +/// # Examples +/// +/// ```no_run +/// # use std::time::Duration; +/// # +/// # use pyo3::prelude::*; +/// # +/// fn main() { +/// // call this or use pyo3 0.14 "auto-initialize" feature +/// Python::initialize(); +/// +/// Python::attach(|py| { +/// pyo3_async_runtimes::async_std::run(py, async move { +/// async_std::task::sleep(Duration::from_secs(1)).await; +/// Ok(()) +/// }) +/// .map_err(|e| { +/// e.print_and_set_sys_last_vars(py); +/// }) +/// .unwrap(); +/// }) +/// } +/// ``` +pub fn run(py: Python, fut: F) -> PyResult +where + F: Future> + Send + 'static, + T: Send + Sync + 'static, +{ + generic::run::(py, fut) +} + +/// Convert a Rust Future into a Python awaitable +/// +/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`, +/// the Rust future will be cancelled as well (new behaviour in `v0.15`). +/// +/// Python `contextvars` are preserved when calling async Python functions within the Rust future +/// via [`into_future`] (new behaviour in `v0.15`). +/// +/// > Although `contextvars` are preserved for async Python functions, synchronous functions will +/// > unfortunately fail to resolve them when called within the Rust future. This is because the +/// > function is being called from a Rust thread, not inside an actual Python coroutine context. +/// > +/// > As a workaround, you can get the `contextvars` from the current task locals using +/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your +/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the +/// > synchronous function, and restore the previous context when it returns or raises an exception. +/// +/// # Arguments +/// * `py` - PyO3 GIL guard +/// * `locals` - The task locals for the given future +/// * `fut` - The Rust future to be converted +/// +/// # Examples +/// +/// ``` +/// use std::time::Duration; +/// +/// use pyo3::prelude::*; +/// +/// /// Awaitable sleep function +/// #[pyfunction] +/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult> { +/// let secs = secs.extract()?; +/// pyo3_async_runtimes::async_std::future_into_py_with_locals( +/// py, +/// pyo3_async_runtimes::async_std::get_current_locals(py)?, +/// async move { +/// async_std::task::sleep(Duration::from_secs(secs)).await; +/// Python::attach(|py| Ok(py.None())) +/// } +/// ) +/// } +/// ``` +pub fn future_into_py_with_locals( + py: Python, + locals: TaskLocals, + fut: F, +) -> PyResult> +where + F: Future> + Send + 'static, + T: for<'py> IntoPyObject<'py> + Send + 'static, +{ + generic::future_into_py_with_locals::(py, locals, fut) +} + +/// Convert a Rust Future into a Python awaitable +/// +/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`, +/// the Rust future will be cancelled as well (new behaviour in `v0.15`). +/// +/// Python `contextvars` are preserved when calling async Python functions within the Rust future +/// via [`into_future`] (new behaviour in `v0.15`). +/// +/// > Although `contextvars` are preserved for async Python functions, synchronous functions will +/// > unfortunately fail to resolve them when called within the Rust future. This is because the +/// > function is being called from a Rust thread, not inside an actual Python coroutine context. +/// > +/// > As a workaround, you can get the `contextvars` from the current task locals using +/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your +/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the +/// > synchronous function, and restore the previous context when it returns or raises an exception. +/// +/// # Arguments +/// * `py` - The current PyO3 GIL guard +/// * `fut` - The Rust future to be converted +/// +/// # Examples +/// +/// ``` +/// use std::time::Duration; +/// +/// use pyo3::prelude::*; +/// +/// /// Awaitable sleep function +/// #[pyfunction] +/// fn sleep_for<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult> { +/// let secs = secs.extract()?; +/// pyo3_async_runtimes::async_std::future_into_py(py, async move { +/// async_std::task::sleep(Duration::from_secs(secs)).await; +/// Ok(()) +/// }) +/// } +/// ``` +pub fn future_into_py(py: Python, fut: F) -> PyResult> +where + F: Future> + Send + 'static, + T: for<'py> IntoPyObject<'py> + Send + 'static, +{ + generic::future_into_py::(py, fut) +} + +/// Convert a `!Send` Rust Future into a Python awaitable +/// +/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`, +/// the Rust future will be cancelled as well (new behaviour in `v0.15`). +/// +/// Python `contextvars` are preserved when calling async Python functions within the Rust future +/// via [`into_future`] (new behaviour in `v0.15`). +/// +/// > Although `contextvars` are preserved for async Python functions, synchronous functions will +/// > unfortunately fail to resolve them when called within the Rust future. This is because the +/// > function is being called from a Rust thread, not inside an actual Python coroutine context. +/// > +/// > As a workaround, you can get the `contextvars` from the current task locals using +/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your +/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the +/// > synchronous function, and restore the previous context when it returns or raises an exception. +/// +/// # Arguments +/// * `py` - PyO3 GIL guard +/// * `locals` - The task locals for the given future +/// * `fut` - The Rust future to be converted +/// +/// # Examples +/// +/// ``` +/// use std::{rc::Rc, time::Duration}; +/// +/// use pyo3::prelude::*; +/// +/// /// Awaitable non-send sleep function +/// #[pyfunction] +/// fn sleep_for(py: Python, secs: u64) -> PyResult> { +/// // Rc is non-send so it cannot be passed into pyo3_async_runtimes::async_std::future_into_py +/// let secs = Rc::new(secs); +/// Ok(pyo3_async_runtimes::async_std::local_future_into_py_with_locals( +/// py, +/// pyo3_async_runtimes::async_std::get_current_locals(py)?, +/// async move { +/// async_std::task::sleep(Duration::from_secs(*secs)).await; +/// Python::attach(|py| Ok(py.None())) +/// } +/// )?.into()) +/// } +/// +/// # #[cfg(all(feature = "async-std-runtime", feature = "attributes"))] +/// #[pyo3_async_runtimes::async_std::main] +/// async fn main() -> PyResult<()> { +/// Python::attach(|py| { +/// let py_future = sleep_for(py, 1)?; +/// pyo3_async_runtimes::async_std::into_future(py_future) +/// })? +/// .await?; +/// +/// Ok(()) +/// } +/// # #[cfg(not(all(feature = "async-std-runtime", feature = "attributes")))] +/// # fn main() {} +/// ``` +#[deprecated( + since = "0.18.0", + note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)" +)] +#[allow(deprecated)] +pub fn local_future_into_py_with_locals( + py: Python, + locals: TaskLocals, + fut: F, +) -> PyResult> +where + F: Future> + 'static, + T: for<'py> IntoPyObject<'py>, +{ + generic::local_future_into_py_with_locals::(py, locals, fut) +} + +/// Convert a `!Send` Rust Future into a Python awaitable +/// +/// If the `asyncio.Future` returned by this conversion is cancelled via `asyncio.Future.cancel`, +/// the Rust future will be cancelled as well (new behaviour in `v0.15`). +/// +/// Python `contextvars` are preserved when calling async Python functions within the Rust future +/// via [`into_future`] (new behaviour in `v0.15`). +/// +/// > Although `contextvars` are preserved for async Python functions, synchronous functions will +/// > unfortunately fail to resolve them when called within the Rust future. This is because the +/// > function is being called from a Rust thread, not inside an actual Python coroutine context. +/// > +/// > As a workaround, you can get the `contextvars` from the current task locals using +/// > [`get_current_locals`] and [`TaskLocals::context`](`crate::TaskLocals::context`), then wrap your +/// > synchronous function in a call to `contextvars.Context.run`. This will set the context, call the +/// > synchronous function, and restore the previous context when it returns or raises an exception. +/// +/// # Arguments +/// * `py` - The current PyO3 GIL guard +/// * `fut` - The Rust future to be converted +/// +/// # Examples +/// +/// ``` +/// use std::{rc::Rc, time::Duration}; +/// +/// use pyo3::prelude::*; +/// +/// /// Awaitable non-send sleep function +/// #[pyfunction] +/// fn sleep_for(py: Python, secs: u64) -> PyResult> { +/// // Rc is non-send so it cannot be passed into pyo3_async_runtimes::async_std::future_into_py +/// let secs = Rc::new(secs); +/// pyo3_async_runtimes::async_std::local_future_into_py(py, async move { +/// async_std::task::sleep(Duration::from_secs(*secs)).await; +/// Ok(()) +/// }) +/// } +/// +/// # #[cfg(all(feature = "async-std-runtime", feature = "attributes"))] +/// #[pyo3_async_runtimes::async_std::main] +/// async fn main() -> PyResult<()> { +/// Python::attach(|py| { +/// let py_future = sleep_for(py, 1)?; +/// pyo3_async_runtimes::async_std::into_future(py_future) +/// })? +/// .await?; +/// +/// Ok(()) +/// } +/// # #[cfg(not(all(feature = "async-std-runtime", feature = "attributes")))] +/// # fn main() {} +/// ``` +#[deprecated( + since = "0.18.0", + note = "Questionable whether these conversions have real-world utility (see https://github.com/awestlake87/pyo3-asyncio/issues/59#issuecomment-1008038497 and let me know if you disagree!)" +)] +#[allow(deprecated)] +pub fn local_future_into_py(py: Python, fut: F) -> PyResult> +where + F: Future> + 'static, + T: for<'py> IntoPyObject<'py>, +{ + generic::local_future_into_py::(py, fut) +} + +/// Convert a Python `awaitable` into a Rust Future +/// +/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A +/// completion handler sends the result of this Task through a +/// `futures_channel::oneshot::Sender>>` and the future returned by this function +/// simply awaits the result through the `futures_channel::oneshot::Receiver>>`. +/// +/// # Arguments +/// * `awaitable` - The Python `awaitable` to be converted +/// +/// # Examples +/// +/// ``` +/// use std::time::Duration; +/// use std::ffi::CString; +/// +/// use pyo3::prelude::*; +/// +/// const PYTHON_CODE: &'static str = r#" +/// import asyncio +/// +/// async def py_sleep(duration): +/// await asyncio.sleep(duration) +/// "#; +/// +/// async fn py_sleep(seconds: f32) -> PyResult<()> { +/// let test_mod = Python::attach(|py| -> PyResult> { +/// Ok( +/// PyModule::from_code( +/// py, +/// &CString::new(PYTHON_CODE).unwrap(), +/// &CString::new("test_into_future/test_mod.py").unwrap(), +/// &CString::new("test_mod").unwrap() +/// )? +/// .into() +/// ) +/// })?; +/// +/// Python::attach(|py| { +/// pyo3_async_runtimes::async_std::into_future( +/// test_mod +/// .call_method1(py, "py_sleep", (seconds,))? +/// .into_bound(py), +/// ) +/// })? +/// .await?; +/// Ok(()) +/// } +/// ``` +pub fn into_future( + awaitable: Bound, +) -> PyResult>> + Send> { + generic::into_future::(awaitable) +} + +/// unstable-streams Convert an async generator into a stream +/// +/// **This API is marked as unstable** and is only available when the +/// `unstable-streams` crate feature is enabled. This comes with no +/// stability guarantees, and could be changed or removed at any time. +/// +/// # Arguments +/// * `gen` - The Python async generator to be converted +/// +/// # Examples +/// ``` +/// use pyo3::prelude::*; +/// use futures_util::stream::{StreamExt, TryStreamExt}; +/// use std::ffi::CString; +/// +/// const TEST_MOD: &str = r#" +/// import asyncio +/// +/// async def gen(): +/// for i in range(10): +/// await asyncio.sleep(0.1) +/// yield i +/// "#; +/// +/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))] +/// # #[pyo3_async_runtimes::async_std::main] +/// # async fn main() -> PyResult<()> { +/// let stream = Python::attach(|py| { +/// let test_mod = PyModule::from_code( +/// py, +/// &CString::new(TEST_MOD).unwrap(), +/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(), +/// &CString::new("test_mod").unwrap(), +/// )?; +/// +/// pyo3_async_runtimes::async_std::into_stream_v1(test_mod.call_method0("gen")?) +/// })?; +/// +/// let vals = stream +/// .map(|item| Python::attach(|py| -> PyResult { Ok(item?.bind(py).extract()?) })) +/// .try_collect::>() +/// .await?; +/// +/// assert_eq!((0..10).collect::>(), vals); +/// +/// Ok(()) +/// # } +/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))] +/// # fn main() {} +/// ``` +#[cfg(feature = "unstable-streams")] +pub fn into_stream_v1( + r#gen: Bound<'_, PyAny>, +) -> PyResult>> + 'static> { + generic::into_stream_v1::(r#gen) +} + +/// unstable-streams Convert an async generator into a stream +/// +/// **This API is marked as unstable** and is only available when the +/// `unstable-streams` crate feature is enabled. This comes with no +/// stability guarantees, and could be changed or removed at any time. +/// +/// # Arguments +/// * `locals` - The current task locals +/// * `gen` - The Python async generator to be converted +/// +/// # Examples +/// ``` +/// use pyo3::prelude::*; +/// use futures_util::stream::{StreamExt, TryStreamExt}; +/// use std::ffi::CString; +/// +/// const TEST_MOD: &str = r#" +/// import asyncio +/// +/// async def gen(): +/// for i in range(10): +/// await asyncio.sleep(0.1) +/// yield i +/// "#; +/// +/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))] +/// # #[pyo3_async_runtimes::async_std::main] +/// # async fn main() -> PyResult<()> { +/// let stream = Python::attach(|py| { +/// let test_mod = PyModule::from_code( +/// py, +/// &CString::new(TEST_MOD).unwrap(), +/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(), +/// &CString::new("test_mod").unwrap(), +/// )?; +/// +/// pyo3_async_runtimes::async_std::into_stream_with_locals_v1( +/// pyo3_async_runtimes::async_std::get_current_locals(py)?, +/// test_mod.call_method0("gen")? +/// ) +/// })?; +/// +/// let vals = stream +/// .map(|item| Python::attach(|py| -> PyResult { Ok(item?.bind(py).extract()?) })) +/// .try_collect::>() +/// .await?; +/// +/// assert_eq!((0..10).collect::>(), vals); +/// +/// Ok(()) +/// # } +/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))] +/// # fn main() {} +/// ``` +#[cfg(feature = "unstable-streams")] +pub fn into_stream_with_locals_v1( + locals: TaskLocals, + r#gen: Bound<'_, PyAny>, +) -> PyResult>> + 'static> { + generic::into_stream_with_locals_v1::(locals, r#gen) +} + +/// unstable-streams Convert an async generator into a stream +/// +/// **This API is marked as unstable** and is only available when the +/// `unstable-streams` crate feature is enabled. This comes with no +/// stability guarantees, and could be changed or removed at any time. +/// +/// # Arguments +/// * `locals` - The current task locals +/// * `gen` - The Python async generator to be converted +/// +/// # Examples +/// ``` +/// use pyo3::prelude::*; +/// use futures_util::stream::{StreamExt, TryStreamExt}; +/// use std::ffi::CString; +/// +/// const TEST_MOD: &str = r#" +/// import asyncio +/// +/// async def gen(): +/// for i in range(10): +/// await asyncio.sleep(0.1) +/// yield i +/// "#; +/// +/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))] +/// # #[pyo3_async_runtimes::async_std::main] +/// # async fn main() -> PyResult<()> { +/// let stream = Python::attach(|py| { +/// let test_mod = PyModule::from_code( +/// py, +/// &CString::new(TEST_MOD).unwrap(), +/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(), +/// &CString::new("test_mod").unwrap(), +/// )?; +/// +/// pyo3_async_runtimes::async_std::into_stream_with_locals_v2( +/// pyo3_async_runtimes::async_std::get_current_locals(py)?, +/// test_mod.call_method0("gen")? +/// ) +/// })?; +/// +/// let vals = stream +/// .map(|item| Python::attach(|py| -> PyResult { Ok(item.bind(py).extract()?) })) +/// .try_collect::>() +/// .await?; +/// +/// assert_eq!((0..10).collect::>(), vals); +/// +/// Ok(()) +/// # } +/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))] +/// # fn main() {} +/// ``` +#[cfg(feature = "unstable-streams")] +pub fn into_stream_with_locals_v2( + locals: TaskLocals, + r#gen: Bound<'_, PyAny>, +) -> PyResult> + 'static> { + generic::into_stream_with_locals_v2::(locals, r#gen) +} + +/// unstable-streams Convert an async generator into a stream +/// +/// **This API is marked as unstable** and is only available when the +/// `unstable-streams` crate feature is enabled. This comes with no +/// stability guarantees, and could be changed or removed at any time. +/// +/// # Arguments +/// * `gen` - The Python async generator to be converted +/// +/// # Examples +/// ``` +/// use pyo3::prelude::*; +/// use futures_util::stream::{StreamExt, TryStreamExt}; +/// use std::ffi::CString; +/// +/// const TEST_MOD: &str = r#" +/// import asyncio +/// +/// async def gen(): +/// for i in range(10): +/// await asyncio.sleep(0.1) +/// yield i +/// "#; +/// +/// # #[cfg(all(feature = "unstable-streams", feature = "attributes"))] +/// # #[pyo3_async_runtimes::async_std::main] +/// # async fn main() -> PyResult<()> { +/// let stream = Python::attach(|py| { +/// let test_mod = PyModule::from_code( +/// py, +/// &CString::new(TEST_MOD).unwrap(), +/// &CString::new("test_rust_coroutine/test_mod.py").unwrap(), +/// &CString::new("test_mod").unwrap(), +/// )?; +/// +/// pyo3_async_runtimes::async_std::into_stream_v2(test_mod.call_method0("gen")?) +/// })?; +/// +/// let vals = stream +/// .map(|item| Python::attach(|py| -> PyResult { Ok(item.bind(py).extract()?) })) +/// .try_collect::>() +/// .await?; +/// +/// assert_eq!((0..10).collect::>(), vals); +/// +/// Ok(()) +/// # } +/// # #[cfg(not(all(feature = "unstable-streams", feature = "attributes")))] +/// # fn main() {} +/// ``` +#[cfg(feature = "unstable-streams")] +pub fn into_stream_v2( + r#gen: Bound<'_, PyAny>, +) -> PyResult> + 'static> { + generic::into_stream_v2::(r#gen) +}