What went wrong
When the Restate context is used in a concurrent stream, the stream is never completed.
- With
buffered(1) Everything works correctly.
- With
buffered(3): The first 3 items are processed, and then the stream freezes indefinitely.
Minimal example
Tested with:
- restate-sdk
0.7 and 0.8
restatedev/restate:1.6.1
- on Windows
Cargo.toml
[package]
name = "minimal_example"
version = "0.1.0"
edition = "2024"
[dependencies]
futures = "0.3"
restate-sdk = "0.7"
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing-subscriber = "0.3"
src/main.rs
use futures::{StreamExt, stream};
use restate_sdk::prelude::*;
use std::{convert::Infallible, time::Duration};
use tokio::time::sleep;
use tracing::info;
#[restate_sdk::service]
trait MinimalExample {
async fn process_working() -> Result<(), Infallible>;
async fn process_not_working() -> Result<(), Infallible>;
}
struct MinimalExampleImpl;
impl MinimalExample for MinimalExampleImpl {
async fn process_working(&self, ctx: Context<'_>) -> Result<(), Infallible> {
let _res = stream::iter(1..10)
.map(|i| context_run(&ctx, i))
.buffered(1)
.collect::<Vec<_>>()
.await;
Ok(())
}
async fn process_not_working(&self, ctx: Context<'_>) -> Result<(), Infallible> {
let _res = stream::iter(1..10)
.map(|i| context_run(&ctx, i))
.buffered(3)
.collect::<Vec<_>>()
.await;
Ok(())
}
}
async fn context_run(ctx: &Context<'_>, param: i32) -> Result<i32, TerminalError> {
ctx.run(|| async move {
info!("{param}");
sleep(Duration::from_millis(700)).await;
Ok(param)
})
.name(format!("context_operation-{param}"))
.await
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
HttpServer::new(Endpoint::builder().bind(MinimalExampleImpl.serve()).build())
.listen_and_serve("0.0.0.0:9080".parse().unwrap())
.await;
}
What went wrong
When the Restate context is used in a concurrent stream, the stream is never completed.
buffered(1)Everything works correctly.buffered(3): The first 3 items are processed, and then the stream freezes indefinitely.Minimal example
Tested with:
0.7and0.8restatedev/restate:1.6.1Cargo.tomlsrc/main.rs