Skip to content

Commit 193235e

Browse files
committed
fix: compose SSE reload-abort with brotli compression
SSE streams with brotli compression were not being cancelled on hot reload because the two concerns were in mutually exclusive branches. Restructure stream building as composable layers: reload-abort first, then optional brotli compression on top.
1 parent 0de10fc commit 193235e

File tree

3 files changed

+109
-16
lines changed

3 files changed

+109
-16
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ Or from a file:
133133
$ http-nu :3001 ./serve.nu
134134
```
135135

136-
Try the [live examples](https://http-nu.cross.stream/examples/) or run them locally with the [examples hub](examples/README.md):
136+
Try the [live examples](https://http-nu.cross.stream/examples/) or run them
137+
locally with the [examples hub](examples/README.md):
137138

138139
```bash
139140
$ http-nu --datastar :3001 examples/serve.nu

src/handler.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use std::net::SocketAddr;
2+
use std::pin::Pin;
23
use std::sync::Arc;
34
use std::time::Instant;
45

56
use arc_swap::ArcSwap;
6-
use futures_util::StreamExt;
7+
use futures_util::{Stream, StreamExt};
78
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full, StreamBody};
89
use hyper::body::{Bytes, Frame};
910
use tokio_stream::wrappers::ReceiverStream;
@@ -475,30 +476,31 @@ async fn build_normal_response(
475476
}
476477
}
477478
ResponseTransport::Stream(rx) => {
478-
if use_brotli {
479-
compression::compress_stream(rx)
480-
} else if is_sse {
479+
// Layer 1: base byte stream, with reload-abort for SSE
480+
let byte_stream: Pin<Box<dyn Stream<Item = Vec<u8>> + Send + Sync>> = if is_sse {
481481
// SSE streams abort on reload (error triggers client retry)
482-
let stream = futures_util::stream::try_unfold(
482+
Box::pin(futures_util::stream::unfold(
483483
(ReceiverStream::new(rx), reload_token),
484484
|(mut data_rx, token)| async move {
485485
tokio::select! {
486486
biased;
487-
_ = token.cancelled() => {
488-
Err(std::io::Error::other("reload").into())
489-
}
487+
_ = token.cancelled() => None,
490488
item = StreamExt::next(&mut data_rx) => {
491-
match item {
492-
Some(data) => Ok(Some((Frame::data(Bytes::from(data)), (data_rx, token)))),
493-
None => Ok(None),
494-
}
489+
item.map(|data| (data, (data_rx, token)))
495490
}
496491
}
497492
},
498-
);
499-
BodyExt::boxed(StreamBody::new(stream))
493+
))
494+
} else {
495+
Box::pin(ReceiverStream::new(rx))
496+
};
497+
498+
// Layer 2: optionally compress, then frame
499+
if use_brotli {
500+
let brotli = compression::BrotliStream::new(byte_stream);
501+
BodyExt::boxed(StreamBody::new(brotli))
500502
} else {
501-
let stream = ReceiverStream::new(rx).map(|data| Ok(Frame::data(Bytes::from(data))));
503+
let stream = byte_stream.map(|data| Ok(Frame::data(Bytes::from(data))));
502504
BodyExt::boxed(StreamBody::new(stream))
503505
}
504506
}

tests/server_test.rs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2109,6 +2109,96 @@ async fn test_sse_cancelled_on_hot_reload() {
21092109
child.kill().await.ok();
21102110
}
21112111

2112+
#[tokio::test]
2113+
async fn test_sse_cancelled_on_hot_reload_with_brotli() {
2114+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
2115+
2116+
// Spawn server with an SSE endpoint that streams indefinitely
2117+
let (mut child, mut stdin, addr_rx) = TestServerWithStdin::spawn("127.0.0.1:0", false);
2118+
2119+
// Send initial SSE script
2120+
let sse_script = r#"{|req|
2121+
1..100 | each {|i|
2122+
sleep 100ms
2123+
{data: $"event-($i)"}
2124+
} | to sse
2125+
}"#;
2126+
stdin.write_all(sse_script.as_bytes()).await.unwrap();
2127+
stdin.write_all(b"\0").await.unwrap();
2128+
stdin.flush().await.unwrap();
2129+
2130+
let address = tokio::time::timeout(std::time::Duration::from_secs(5), addr_rx)
2131+
.await
2132+
.expect("Server didn't start")
2133+
.expect("Channel closed");
2134+
2135+
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2136+
2137+
// Start curl with brotli compression
2138+
let mut sse_child = tokio::process::Command::new("curl")
2139+
.arg("-sN")
2140+
.arg("-H")
2141+
.arg("Accept-Encoding: br")
2142+
.arg(&address)
2143+
.stdout(std::process::Stdio::piped())
2144+
.spawn()
2145+
.expect("Failed to start curl");
2146+
2147+
let stdout = sse_child.stdout.take().expect("Failed to get stdout");
2148+
let mut reader = stdout;
2149+
2150+
// Read some data to confirm SSE is streaming
2151+
let mut buf = vec![0u8; 256];
2152+
let n = tokio::time::timeout(std::time::Duration::from_secs(2), reader.read(&mut buf))
2153+
.await
2154+
.expect("Timeout reading initial SSE data")
2155+
.expect("Read error");
2156+
assert!(n > 0, "Should have received SSE data");
2157+
2158+
// Trigger hot reload
2159+
let new_script = r#"{|req| "reloaded"}"#;
2160+
stdin.write_all(new_script.as_bytes()).await.unwrap();
2161+
stdin.write_all(b"\0").await.unwrap();
2162+
stdin.flush().await.unwrap();
2163+
2164+
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2165+
2166+
// After reload, the compressed SSE stream should end.
2167+
// Drain remaining data (brotli FINISH frame) and check for EOF.
2168+
let stream_ended = tokio::time::timeout(std::time::Duration::from_secs(2), async {
2169+
loop {
2170+
match reader.read(&mut buf).await {
2171+
Ok(0) => return true, // EOF - stream ended
2172+
Ok(_) => continue, // Drain remaining data (e.g. brotli FINISH frame)
2173+
Err(_) => return true, // Read error
2174+
}
2175+
}
2176+
})
2177+
.await;
2178+
2179+
assert!(
2180+
matches!(stream_ended, Ok(true)),
2181+
"Compressed SSE stream should stop after reload"
2182+
);
2183+
2184+
sse_child.kill().await.ok();
2185+
2186+
// Verify the new handler works
2187+
let output = std::process::Command::new("curl")
2188+
.arg("-s")
2189+
.arg(&address)
2190+
.output()
2191+
.expect("curl failed");
2192+
2193+
assert_eq!(
2194+
String::from_utf8_lossy(&output.stdout).trim(),
2195+
"reloaded",
2196+
"New handler should be active after reload"
2197+
);
2198+
2199+
child.kill().await.ok();
2200+
}
2201+
21122202
/// Tests that --topic with -w loads a handler from the store, serves a placeholder
21132203
/// when the topic is empty, reloads when the topic is appended, and reloads again
21142204
/// when the topic is updated.

0 commit comments

Comments
 (0)