基于 ASP.NET Core 的高性能 WebSocket 服务端实现。
- 高性能: 使用
System.Threading.Channels实现发送队列,支持高并发发送,非阻塞。 - 低分配: 使用
ArrayPool<byte>和PooledMemoryStream优化内存使用,减少 GC 压力。 - 心跳机制: 使用
PeriodicTimer高效检测死链,可配置超时断开。 - 会话管理: 提供
IWebSocketSessionManager支持会话跟踪和广播功能。 - 全局唯一 ID: 使用
Ulid生成全局唯一的会话标识符。 - 会话元数据: 通过
IWebSocketSession.Items存储会话级别的自定义数据。 - 易用性: 提供
WebSocketHandler基类,只需关注业务逻辑。 - 集成: 无缝集成 ASP.NET Core 中间件管道。
- 现代化 API: 使用
TimeSpan配置时间参数,符合 .NET 设计规范。
继承 WebSocketHandler 并重写相关方法:
public class ChatHandler : WebSocketHandler
{
private readonly IWebSocketSessionManager _sessionManager;
public ChatHandler(IWebSocketSessionManager sessionManager)
{
_sessionManager = sessionManager;
}
public override async Task OnConnectedAsync(IWebSocketSession session)
{
Console.WriteLine($"Client connected: {session.Id}");
// 存储会话级别的数据
session.Items["ConnectedAt"] = DateTime.UtcNow;
session.Items["Username"] = "Anonymous";
await session.SendTextAsync("Welcome to the chat!");
// 广播给所有其他用户
await _sessionManager.BroadcastTextAsync($"User {session.Id} joined the chat!");
}
public override async Task OnDisconnectedAsync(IWebSocketSession session)
{
Console.WriteLine($"Client disconnected: {session.Id}");
await _sessionManager.BroadcastTextAsync($"User {session.Id} left the chat!");
}
public override async Task OnMessageAsync(IWebSocketSession session, WebSocketMessage message)
{
if (message.MessageType == WebSocketMessageType.Text)
{
var text = Encoding.UTF8.GetString(message.Data.Span);
Console.WriteLine($"Received from {session.Id}: {text}");
// 广播消息给所有连接的客户端
await _sessionManager.BroadcastTextAsync($"[{session.Id}]: {text}");
}
}
public override async Task OnErrorAsync(IWebSocketSession session, Exception exception)
{
Console.WriteLine($"Error on {session.Id}: {exception.Message}");
}
}在 Program.cs 中注册 Handler 和 SessionManager:
// 注册会话管理器(可选,但推荐用于广播和会话跟踪)
builder.Services.AddWebSocketSessionManager();
// 注册 Handler(通常为单例)
builder.Services.AddSingleton<ChatHandler>();在 Program.cs 中配置中间件管道:
var app = builder.Build();
app.UseWebSockets(); // 必须先启用 WebSockets
app.MapWebSocketHandler<ChatHandler>("/ws", new WebSocketSessionOptions
{
ReceiveBufferSize = 16384, // 接收缓冲区大小
SendQueueCapacity = 1000, // 发送队列容量
HeartbeatEnabled = true, // 启用心跳
HeartbeatInterval = TimeSpan.FromSeconds(30), // 心跳间隔
HeartbeatTimeout = TimeSpan.FromSeconds(10), // 心跳超时
HeartbeatMessageType = WebSocketMessageType.Binary, // 心跳消息类型
CloseTimeout = TimeSpan.FromSeconds(5) // 关闭超时
});
app.Run();| 组件 | 说明 |
|---|---|
IWebSocketSession |
代表一个客户端连接,提供 SendAsync、SendTextAsync、SendBinaryAsync 等方法,以及 Items 字典存储会话数据。 |
IWebSocketSessionManager |
会话管理器接口,提供 GetAllSessions()、GetSession(id)、BroadcastAsync() 等方法。 |
WebSocketSessionManager |
会话管理器默认实现,线程安全,支持会话跟踪和广播。 |
WebSocketHandler |
业务逻辑基类,处理连接、断开、消息、错误事件。 |
WebSocketSessionOptions |
会话配置选项,包括缓冲区大小、心跳设置等。 |
public interface IWebSocketSessionManager
{
// 获取活动会话数量
int Count { get; }
// 获取所有活动会话
IReadOnlyCollection<IWebSocketSession> GetAllSessions();
// 根据 ID 获取会话
IWebSocketSession? GetSession(string id);
// 向所有会话广播消息
Task BroadcastAsync(ReadOnlyMemory<byte> message, WebSocketMessageType messageType = WebSocketMessageType.Text, CancellationToken cancellationToken = default);
// 向所有会话广播文本消息
Task BroadcastTextAsync(string text, CancellationToken cancellationToken = default);
}- 零分配接收: 接收缓冲区从
ArrayPool<byte>租借,避免频繁分配。 - 池化内存流: 使用
PooledMemoryStream组装大消息,减少内存碎片。 - 零拷贝消息传递: 使用
ToArraySegment()直接引用内部缓冲区,避免额外复制。 - 高效心跳: 使用
PeriodicTimer替代Task.Delay,更高效且取消更及时。 - 内联优化: 关键路径使用
[MethodImpl(MethodImplOptions.AggressiveInlining)]优化。 - 全局唯一 ID: 使用
Ulid生成时间有序的全局唯一会话标识符。
| 属性 | 类型 | 默认值 | 说明 |
|---|---|---|---|
SendQueueCapacity |
int |
1000 | 发送队列容量 |
ReceiveBufferSize |
int |
16384 | 接收缓冲区大小(字节) |
HeartbeatEnabled |
bool |
true |
是否启用心跳 |
HeartbeatInterval |
TimeSpan |
30 秒 | 心跳间隔 |
HeartbeatTimeout |
TimeSpan |
10 秒 | 心跳超时 |
HeartbeatMessageType |
WebSocketMessageType |
Binary |
心跳消息类型(Binary 兼容大多数客户端) |
HeartbeatMessageFactory |
Func<ReadOnlyMemory<byte>>? |
"ping" | 心跳消息工厂,设为 null 禁用发送(仅超时检测) |
CloseTimeout |
TimeSpan |
5 秒 | 关闭超时 |