Skip to content

Concurrent use of Context in stream causes a freeze #89

@FeJe

Description

@FeJe

What went wrong

When the Restate context is used in a concurrent stream, the stream is never completed.

  • With buffered(1) Everything works correctly.
  • With buffered(3): The first 3 items are processed, and then the stream freezes indefinitely.

Minimal example

Tested with:

  • restate-sdk 0.7 and 0.8
  • restatedev/restate:1.6.1
  • on Windows

Cargo.toml

[package]
name = "minimal_example"
version = "0.1.0"
edition = "2024"

[dependencies]
futures = "0.3"
restate-sdk = "0.7"
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing-subscriber = "0.3"

src/main.rs

use futures::{StreamExt, stream};
use restate_sdk::prelude::*;
use std::{convert::Infallible, time::Duration};
use tokio::time::sleep;
use tracing::info;

#[restate_sdk::service]
trait MinimalExample {
    async fn process_working() -> Result<(), Infallible>;
    async fn process_not_working() -> Result<(), Infallible>;
}

struct MinimalExampleImpl;

impl MinimalExample for MinimalExampleImpl {
    async fn process_working(&self, ctx: Context<'_>) -> Result<(), Infallible> {
        let _res = stream::iter(1..10)
            .map(|i| context_run(&ctx, i))
            .buffered(1)
            .collect::<Vec<_>>()
            .await;
        Ok(())
    }

    async fn process_not_working(&self, ctx: Context<'_>) -> Result<(), Infallible> {
        let _res = stream::iter(1..10)
            .map(|i| context_run(&ctx, i))
            .buffered(3)
            .collect::<Vec<_>>()
            .await;
        Ok(())
    }
}

async fn context_run(ctx: &Context<'_>, param: i32) -> Result<i32, TerminalError> {
    ctx.run(|| async move {
        info!("{param}");
        sleep(Duration::from_millis(700)).await;
        Ok(param)
    })
    .name(format!("context_operation-{param}"))
    .await
}

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();
    HttpServer::new(Endpoint::builder().bind(MinimalExampleImpl.serve()).build())
        .listen_and_serve("0.0.0.0:9080".parse().unwrap())
        .await;
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions