Skip to content

Commit c716844

Browse files
authored
feat: websocket crate (#28)
1 parent 6addb68 commit c716844

File tree

15 files changed

+1256
-7
lines changed

15 files changed

+1256
-7
lines changed

.github/workflows/ci.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ jobs:
2929
- name: "Clippy"
3030
cmd: clippy
3131
args: --workspace --all-features --all-targets -- -D warnings
32-
rust: stable
32+
rust: nightly
3333
- name: "Formatting"
3434
cmd: fmt
3535
args: --all -- --check
36-
rust: stable
36+
rust: nightly
3737
- name: "Tests"
3838
cmd: nextest
3939
args: run --workspace --all-features --retries 3

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ full = [
2323
"geoip",
2424
"metrics",
2525
"rate_limit",
26+
"websocket"
2627
]
2728
alloc = ["dep:alloc"]
2829
analytics = ["dep:analytics"]
@@ -35,6 +36,7 @@ metrics_exporter_prometheus = ["metrics/exporter_prometheus"]
3536
alloc_metrics = ["alloc/metrics"]
3637
profiler = ["alloc/profiler"]
3738
rate_limit = ["dep:rate_limit"]
39+
websocket = ["dep:websocket"]
3840

3941
[workspace.dependencies]
4042
aws-sdk-s3 = "1.21.0"
@@ -47,6 +49,7 @@ future = { path = "./crates/future", optional = true }
4749
geoip = { path = "./crates/geoip", optional = true }
4850
metrics = { package = "wc_metrics", path = "./crates/metrics", optional = true }
4951
rate_limit = { path = "./crates/rate_limit", optional = true }
52+
websocket = { path = "./crates/websocket", optional = true }
5053

5154
[dev-dependencies]
5255
anyhow = "1"

clippy.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
await-holding-invalid-types = [
2-
"tracing::trace::Entered",
3-
"tracing::trace::EnteredSpan",
2+
"tracing::span::Entered",
3+
"tracing::span::EnteredSpan",
44
]
5+
absolute-paths-max-segments = 4

crates/websocket/Cargo.toml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
[package]
2+
name = "websocket"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[features]
7+
default = []
8+
full = ["axum", "tungstenite", "warp", "json"]
9+
axum = ["dep:axum"]
10+
tungstenite = ["dep:tungstenite", "dep:tokio-tungstenite"]
11+
warp = ["dep:warp"]
12+
json = ["dep:serde", "dep:serde_json"]
13+
14+
[dependencies]
15+
axum = { optional = true, version = "0.8", features = ["ws"] }
16+
tokio-tungstenite = { optional = true, version = "0.28" }
17+
tungstenite = { optional = true, version = "0.28" }
18+
warp = { optional = true, version = "0.4", features = ["websocket"] }
19+
serde = { optional = true, version = "1.0" }
20+
serde_json = { optional = true, version = "1.0" }
21+
futures-concurrency = "7.6"
22+
futures-timer = "3.0"
23+
futures-util = "0.3"
24+
pin-project = "1.1"
25+
tap = "1.0"
26+
thiserror = "2.0"
27+
tokio = { version = "1.48", default-features = false, features = ["rt", "time"] }
28+
tokio-stream = "0.1"
29+
tokio-util = "0.7"
30+
enum-as-inner = "0.6"
31+
bytes = "1.11"
32+
derive_more = { version = "2.1", features = ["into", "from"] }
33+
34+
[dev-dependencies]
35+
tokio = { version = "1.48", features = ["macros"] }
36+
serde = { version = "1.0", features = ["derive"] }
37+
tokio-tungstenite = "0.28"
38+
websocket = { path = ".", features = ["tungstenite", "json"] }

crates/websocket/src/backend.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#[cfg(feature = "axum")]
2+
mod axum;
3+
#[cfg(feature = "tungstenite")]
4+
mod tungstenite;
5+
#[cfg(feature = "warp")]
6+
mod warp;
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
use {
2+
crate::{Backend, CloseFrame, Message},
3+
axum::{
4+
Error as NativeError,
5+
body::Bytes,
6+
extract::ws::{
7+
CloseFrame as NativeCloseFrame,
8+
Message as NativeMessage,
9+
Utf8Bytes,
10+
WebSocket as NativeWebSocket,
11+
},
12+
},
13+
};
14+
15+
impl Backend for NativeWebSocket {
16+
type Error = NativeError;
17+
type Message = NativeMessage;
18+
type Transport = Self;
19+
20+
fn into_transport(self) -> Self::Transport {
21+
self
22+
}
23+
24+
fn encode_message(msg: Message) -> Self::Message {
25+
match msg {
26+
Message::Binary(data) => NativeMessage::binary(data),
27+
Message::Text(data) => NativeMessage::text(data),
28+
Message::Ping(data) => NativeMessage::Ping(data),
29+
Message::Pong(data) => NativeMessage::Pong(data),
30+
Message::Close(msg) => {
31+
let frame = msg.map(|frame| NativeCloseFrame {
32+
code: frame.code,
33+
reason: frame.reason.into(),
34+
});
35+
36+
NativeMessage::Close(frame)
37+
}
38+
}
39+
}
40+
41+
fn decode_message(msg: Self::Message) -> Message {
42+
match msg {
43+
NativeMessage::Binary(data) => Message::Binary(data),
44+
NativeMessage::Text(data) => Message::Text(bytes_to_string(data)),
45+
NativeMessage::Ping(data) => Message::Ping(data),
46+
NativeMessage::Pong(data) => Message::Pong(data),
47+
NativeMessage::Close(frame) => {
48+
let frame = frame.map(|frame| CloseFrame {
49+
code: frame.code,
50+
reason: bytes_to_string(frame.reason),
51+
});
52+
53+
Message::Close(frame)
54+
}
55+
}
56+
}
57+
}
58+
59+
fn bytes_to_string(data: Utf8Bytes) -> String {
60+
String::from_utf8(Bytes::from(data).into()).unwrap_or_default()
61+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use {
2+
crate::{Backend, CloseFrame, Message},
3+
bytes::Bytes,
4+
tokio::io::{AsyncRead, AsyncWrite},
5+
tokio_tungstenite::WebSocketStream,
6+
tungstenite::{
7+
Error as NativeError,
8+
Message as NativeMessage,
9+
Utf8Bytes,
10+
protocol::CloseFrame as NativeCloseFrame,
11+
},
12+
};
13+
14+
impl<T> Backend for WebSocketStream<T>
15+
where
16+
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
17+
{
18+
type Error = NativeError;
19+
type Message = NativeMessage;
20+
type Transport = Self;
21+
22+
fn into_transport(self) -> Self::Transport {
23+
self
24+
}
25+
26+
fn encode_message(msg: Message) -> Self::Message {
27+
match msg {
28+
Message::Binary(data) => NativeMessage::binary(data),
29+
Message::Text(data) => NativeMessage::text(data),
30+
Message::Ping(data) => NativeMessage::Ping(data),
31+
Message::Pong(data) => NativeMessage::Pong(data),
32+
Message::Close(msg) => {
33+
let frame = msg.map(|frame| NativeCloseFrame {
34+
code: frame.code.into(),
35+
reason: frame.reason.into(),
36+
});
37+
38+
NativeMessage::Close(frame)
39+
}
40+
}
41+
}
42+
43+
fn decode_message(msg: Self::Message) -> Message {
44+
match msg {
45+
NativeMessage::Binary(data) => Message::Binary(data),
46+
NativeMessage::Text(data) => Message::Text(bytes_to_string(data)),
47+
NativeMessage::Ping(data) => Message::Ping(data),
48+
NativeMessage::Pong(data) => Message::Pong(data),
49+
NativeMessage::Close(frame) => {
50+
let frame = frame.map(|frame| CloseFrame {
51+
code: frame.code.into(),
52+
reason: bytes_to_string(frame.reason),
53+
});
54+
55+
Message::Close(frame)
56+
}
57+
NativeMessage::Frame(_) => Message::Close(None),
58+
}
59+
}
60+
}
61+
62+
fn bytes_to_string(data: Utf8Bytes) -> String {
63+
String::from_utf8(Bytes::from(data).into()).unwrap_or_default()
64+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use {
2+
crate::{Backend, CloseFrame, Message},
3+
warp::{
4+
Error as NativeError,
5+
ws::{Message as NativeMessage, WebSocket as NativeWebSocket},
6+
},
7+
};
8+
9+
impl Backend for NativeWebSocket {
10+
type Error = NativeError;
11+
type Message = NativeMessage;
12+
type Transport = Self;
13+
14+
fn into_transport(self) -> Self::Transport {
15+
self
16+
}
17+
18+
fn encode_message(msg: Message) -> Self::Message {
19+
match msg {
20+
Message::Binary(data) => NativeMessage::binary(data),
21+
Message::Text(data) => NativeMessage::text(data),
22+
Message::Ping(data) => NativeMessage::ping(data),
23+
Message::Pong(data) => NativeMessage::pong(data),
24+
Message::Close(msg) => match msg {
25+
Some(msg) => NativeMessage::close_with(msg.code, msg.reason),
26+
None => NativeMessage::close(),
27+
},
28+
}
29+
}
30+
31+
fn decode_message(msg: Self::Message) -> Message {
32+
if msg.is_binary() {
33+
Message::Binary(msg.into_bytes())
34+
} else if msg.is_text() {
35+
Message::Text(String::from_utf8(msg.into_bytes().into()).unwrap_or_default())
36+
} else if msg.is_ping() {
37+
Message::Ping(msg.into_bytes())
38+
} else if msg.is_pong() {
39+
Message::Pong(msg.into_bytes())
40+
} else {
41+
let msg = msg.close_frame().map(|(code, msg)| CloseFrame {
42+
code,
43+
reason: msg.to_owned(),
44+
});
45+
46+
Message::Close(msg)
47+
}
48+
}
49+
}

0 commit comments

Comments
 (0)