websocket-server 是 EasyFK 框架中的 WebSocket 服务组件,基于 Netty 构建高性能 WebSocket 服务器,提供实时消息推送能力。支持多策略消息推送、频道订阅管理、JWT 认证、HMAC-SHA256 接口鉴权,以及基于 Disruptor 的高性能异步消息发送。
| Netty | WebSocket 服务器核心,处理连接、编解码、消息收发 |
|---|---|
| Caffeine | 高性能本地缓存,管理用户会话数据与活跃时间 |
| JWT | 用户身份认证与令牌验证 |
| HMAC-SHA256 / MD5 | 接口签名鉴权 |
| Spring Boot AutoConfiguration | 自动配置与 Bean 管理 |
| Fastjson2 | JSON 序列化/反序列化 |
dependencies {
api project(':component-cache:cache-caffeine')
api project(':component-websocket:websocket-api')
api('com.mcst:queue-disruptor')
api('com.mcst:easyfk-authority')
api('com.mcst:web-base')
api('io.netty:netty-all')
}父项目 component-websocket 统一依赖了 com.mcst:easyfk-core。
websocket-server/
├── build.gradle
└── src/main/
├── java/com/mcst/easyfk/websocket/
│ ├── config/
│ │ └── ServerConfig.java # Spring Boot 自动配置类
│ ├── enums/
│ │ └── EventType.java # 事件类型枚举
│ ├── handler/
│ │ ├── UserEventHandler.java # 用户事件处理器接口(业务方实现)
│ │ └── WebSocketHandler.java # WebSocket 核心处理器
│ ├── interceptor/
│ │ ├── DefaultAuthInterceptor.java # 默认认证拦截器
│ │ ├── HandshakeResult.java # 握手结果对象
│ │ └── WebSocketInterceptor.java # 拦截器接口
│ ├── manager/
│ │ ├── impl/
│ │ │ └── SocketManagerImpl.java # Socket 管理器实现
│ │ ├── ISocketManager.java # Socket 管理器接口
│ │ ├── LocalSubManager.java # 本地订阅管理器
│ │ └── WsUserDataManager.java # 用户会话数据管理器
│ ├── properties/
│ │ └── WebSocketProperties.java # 配置属性类
│ ├── sender/
│ │ ├── SessionSendDispatcher.java # 消息发送分发器(Disruptor 分片)
│ │ ├── SessionSendEvent.java # 发送事件对象
│ │ ├── SessionSendWorker.java # 实际发送工作器
│ │ ├── WebSocketMessageSender.java # 消息发送器(核心)
│ │ └── WsSendProcessor.java # Disruptor 消费处理器
│ ├── server/
│ │ ├── PushMessageServer.java # 消息推送服务入口
│ │ └── WebSocketServer.java # Netty WebSocket 服务器
│ ├── strategy/
│ │ ├── impl/
│ │ │ ├── ChannelCategoryPushStrategy.java # 频道+类别推送策略
│ │ │ ├── ChannelPushStrategy.java # 频道推送策略
│ │ │ ├── NotificationPushStrategy.java # 通知推送策略
│ │ │ └── UserPushStrategy.java # 用户推送策略
│ │ ├── MessagePushStrategy.java # 推送策略接口
│ │ └── MessagePushStrategySelector.java # 策略选择器
│ ├── util/
│ │ └── SecuritySignUtil.java # 安全签名工具类
│ └── vo/
│ ├── PushMessage.java # 推送消息对象
│ └── ResponseMessage.java # 响应消息对象
└── resources/META-INF/spring/
└── org.springframework.boot.autoconfigure.AutoConfiguration.imports文件: server/WebSocketServer.java
Netty WebSocket 服务器的启动与关闭管理。使用 @PostConstruct 在 Spring 容器启动后自动以守护线程启动 Netty 服务。
Pipeline 组成:
| `IdleStateHandler` | 空闲超时检测,超时时长由 `sessionTimeoutSeconds` 配置 |
|---|---|
| `ChunkedWriteHandler` | 分块写入处理器 |
| `HttpObjectAggregator` | HTTP 消息聚合器,最大内容长度由 `maxContentLength` 配置 |
| `WebSocketServerCompressionHandler` | WebSocket 消息压缩 |
| `WebSocketHandler` | 业务处理器(Sharable,全局单例) |
关键配置项:
| `SO_BACKLOG` | 1024 | 连接等待队列大小 |
|---|---|---|
| `SO_KEEPALIVE` | true | TCP 保活 |
| `TCP_NODELAY` | true | 禁用 Nagle 算法,减少延迟 |
| `WRITE_BUFFER_WATER_MARK` | 8KB / 16KB | 写缓冲区水位线 |
文件: handler/WebSocketHandler.java
@ChannelHandler.Sharable 标注,全局共享单例。负责处理 WebSocket 连接的完整生命周期。
客户端发起 HTTP 升级请求
↓
handleHttpRequest()
↓
执行拦截器链 beforeHandshake()(按 order 排序,正序执行)
↓ 全部通过
从拦截器属性获取 userSessionId
↓
socketManager.connectUser() 建立连接
↓
更新会话活跃时间
↓
WebSocket 协议握手
↓
发送欢迎消息
↓
执行拦截器链 afterHandshake()(逆序执行)客户端发送的 TextWebSocketFrame 被解析为 WsUserOptParam,根据 eventType 分类处理:
| `pong` | 更新会话活跃时间(心跳响应) |
|---|---|
| `login` | 验证 JWT Token,绑定用户信息到会话 |
| 其他 | 如开启鉴权则验证签名,然后交给 `UserEventHandler` 处理 |
消息处理通过独立的线程池 messageHandleExecutor 异步执行,不阻塞 Netty 的 I/O 线程。
public interface WebSocketInterceptor {
HandshakeResult beforeHandshake(ChannelHandlerContext ctx, FullHttpRequest request,
Map<String, Object> attributes);
default void afterHandshake(...) {}
default int getOrder() { return 0; }
}执行优先级: order = -100(最先执行)
处理流程:
1. 获取或生成 clientId(优先请求头 Client-ID > 查询参数 > 基于 User-Agent + IP 自动生成)
2. 提取 clientType(请求头 Client-Type,默认 APP)
3. 如开启接口鉴权,使用 HMAC-SHA256 验证连接签名
4. 提取 JWT Token 并验证(如 mustLogin=true 则 Token 必填)
5. 生成会话标识:MD5(clientType + "-" + clientId)
6. 将用户数据写入 WsUserDataManager
支持的请求参数提取方式(同时支持请求头与查询参数):
| Token | `Access-Token` | `Access-Token` |
|---|---|---|
| 客户端类型 | `Client-Type` | `Client-Type` |
| 随机数 | `Request-Nonce` | `Request-Nonce` |
| 签名 | `Reset-Sign` | `Reset-Sign` |
管理在线用户的 Channel 映射关系:
userSessionId ←→ Channel(双向映射)核心方法:
使用 Caffeine 缓存管理用户会话数据,支持以下两种索引:
| `WS_USER_DATA_CACHE` | userSessionId | WsUserData | 10万 | 访问后12小时 |
|---|
支持一个用户多个会话(多端登录/多标签页场景)。
管理用户的频道+类别订阅关系,使用 ConcurrentHashMap + CopyOnWriteArraySet 实现线程安全:
| `subscriptions` | channel:category | Set<userSessionId> | 正向索引 |
|---|---|---|---|
| `channelCache` | userSessionId | Channel | Channel 缓存(跳过 SocketManager 二次查找) |
支持功能:
PushMessageServer
↓ pushMessage(PushWsMessageParam)
MessagePushStrategySelector
↓ select() 选择策略
MessagePushStrategy(4种实现)
↓ push()
WebSocketMessageSender
├─ sendMessageToUser() → SessionSendDispatcher → Disruptor → SessionSendWorker
├─ sendMessageToUserByUserId() → SessionSendDispatcher → Disruptor → SessionSendWorker
├─ sendToSubscribers() → batchBroadcast()(直接批量写入,跳过 Disruptor)
├─ sendToChannelSubscribers() → batchBroadcast()
└─ broadcastToAll() → batchBroadcast()| `NotificationPushStrategy` | 0(最高) | channel = "notification" | 全员广播 |
|---|---|---|---|
| `UserPushStrategy` | 2 | 有 userSessionId 或 userId | 指定用户推送 |
| `ChannelPushStrategy` | 4 | 有 channel,无 userSessionId/userId/category | 频道全订阅者推送 |
用于单用户/少量用户的消息发送,保证消息可靠性:
三种性能优化方案:
1. 零拷贝: 使用 retainedDuplicate() 共享底层 ByteBuf,避免数据复制
2. Channel 直接获取: 从 LocalSubManager 直接获取 Channel 列表,跳过二次查找
3. 批量写入: batchBroadcast() 先批量 write(),再统一 flush(),减少系统调用
背压控制: 发送前检查 channel.isWritable(),不可写时跳过,防止消息积压。
文件: util/SecuritySignUtil.java
支持两种签名方式:
签名数据 = ClientType=xxx&ClientId=xxx&Nonce=xxx&Param=xxx&Secret=xxx
签名结果 = MD5(签名数据)签名数据 = ClientType=xxx&ClientId=xxx&Nonce=xxx&Param=xxx
签名结果 = HMAC-SHA256(签名数据, secret)Param 排序规则: 如果 Param 是 JSON 格式,则按照 key 的 ASCII 码排序后再参与签名。
配置前缀:easyfk.config.websocket
| `path` | `/ws` | WebSocket 访问路径 |
|---|---|---|
| `mustLogin` | `false` | 是否要求登录才能连接 |
| `securitySecret` | `easyfk@ws#2026!` | 接口鉴权密钥(生产环境务必修改) |
| `enableSecurity` | `false` | 是否开启接口鉴权 |
| `allowCrossOrigin` | `true` | 是否支持跨域 |
| `allowedOrigins` | `["*"]` | 允许的跨域来源 |
| `sessionTimeoutSeconds` | `1800` | 会话超时时间(秒,默认30分钟) |
| `heartbeatIntervalSeconds` | `30` | 心跳检测间隔(秒) |
| `maxSubscriptionsPerUser` | `100` | 单用户最大订阅数 |
| `maxFrameSize` | `65536` | WebSocket 帧最大大小(字节,64KB) |
| `maxContentLength` | `65536` | HTTP 最大内容长度(字节,64KB) |
| `bossThreads` | `1` | Boss 线程数(接受连接) |
|---|
| `sslEnabled` | `false` | 是否启用 SSL |
|---|---|---|
| `sslKeyPath` | - | SSL 私钥路径 |
| `allowedMethods` | `["GET","POST","PUT","DELETE","OPTIONS"]` | 允许的 HTTP 方法 |
|---|---|---|
| `exposedHeaders` | `[]` | 暴露的响应头 |
| `allowCredentials` | `true` | 是否允许携带凭证 |
| `maxAge` | `3600` | 预检请求缓存时间(秒) |
| `sendShardCount` | `32` | Disruptor 发送队列分片数 |
|---|---|---|
| `sendQueueBufferSize` | `131072` | 每个分片的 RingBuffer 大小(必须是2的幂) |
| `sendQueueTryTimeoutMs` | `50` | 入队最大等待时间(毫秒) |
| `sendQueueThreadPrefix` | `ws-send-` | 发送线程名前缀 |
| `sendQueueWaitStrategy` | `YIELDING` | Disruptor 等待策略(BLOCKING/YIELDING/BUSY_SPIN) |
| `broadcastParallelThreshold` | `100` | 广播并行阈值 |
| `heartbeatPoolCoreSize` | `max(16, CPU×2)` | 心跳线程池核心线程数 |
| `heartbeatPoolMaxSize` | `max(32, CPU×4)` | 心跳线程池最大线程数 |
| `heartbeatPoolQueueSize` | `50000` | 心跳线程池队列容量 |
| `messagePoolCoreSize` | `max(32, CPU×4)` | 消息处理线程池核心线程数 |
| `messagePoolMaxSize` | `max(64, CPU×8)` | 消息处理线程池最大线程数 |
| `messagePoolQueueSize` | `20000` | 消息处理线程池队列容量 |
easyfk:
config:
websocket:
path: /ws
port: 8081
mustLogin: false
enableSecurity: true
securitySecret: "your-production-secret-key"
sessionTimeoutSeconds: 1800
heartbeatIntervalSeconds: 30
performance:
sendShardCount: 64
sendQueueWorkersPerShard: 4
sendQueueBufferSize: 262144
sendQueueWaitStrategy: YIELDING业务方通过实现 UserEventHandler 接口来处理客户端发送的业务事件:
@Component
public class MyUserEventHandler implements UserEventHandler {
@Override
public BaseResult<?> handleUserEvent(WsUserOptParam param) {
String eventType = param.getEventType();
Map<String, Object> eventData = param.getEventData();
switch (eventType) {
case "subscribe":
// 处理订阅逻辑
return BRBuilder.successResult();
case "unsubscribe":
// 处理取消订阅逻辑
return BRBuilder.successResult();
default:
// 处理其他自定义事件
return BRBuilder.successResult();
}
}
@Override
public void clearUserSubscriptions(String clientId) {
// 用户断开连接时清理业务层订阅数据
}
}业务方可以实现 WebSocketInterceptor 添加自定义拦截逻辑:
@Component
public class IpWhitelistInterceptor implements WebSocketInterceptor {
@Override
public HandshakeResult beforeHandshake(ChannelHandlerContext ctx,
FullHttpRequest request,
Map<String, Object> attributes) {
String ip = getClientIp(ctx);
if (!isAllowed(ip)) {
return HandshakeResult.forbidden("IP_BLOCKED", "IP not in whitelist");
}
return HandshakeResult.success();
}
@Override
public int getOrder() {
return -200; // 在默认认证拦截器之前执行
}
}通过 PushMessageServer 推送消息:
@Resource
private PushMessageServer pushMessageServer;
// 推送给指定用户
PushWsMessageParam param = new PushWsMessageParam()
.setUserSessionId("user-session-id")
.setChannel("order")
.setCategory("fill")
.setData(Map.of("orderId", "12345", "status", "filled"));
pushMessageServer.pushMessage(param);
// 推送给频道+类别订阅者
PushWsMessageParam param2 = new PushWsMessageParam()
.setChannel("quote")
.setCategory("BTCUSDT")
.setData(Map.of("price", "50000.00", "volume", "123.45"));
pushMessageServer.pushMessage(param2);
// 发送系统通知
PushWsMessageParam param3 = new PushWsMessageParam()
.setChannel("notification")
.setData(Map.of("title", "系统维护通知", "content", "系统将于凌晨2点维护", "level", "IMPORTANT"));
pushMessageServer.pushMessage(param3);websocket-api 模块定义了 IPushWsMsgApi 接口:
public interface IPushWsMsgApi {
BaseResult<?> pushMessage(PushWsMessageParam param);
}业务方可通过 RPC 或 HTTP 调用此接口实现跨服务推送。
ServerConfig 通过 Spring Boot @AutoConfiguration 自动注册以下 Bean:
| `PushMessageServer` | 服务 | 消息推送服务入口 |
|---|---|---|
| `WebSocketHandler` | 处理器 | WebSocket 核心处理器 |
| `DefaultAuthInterceptor` | 拦截器 | 默认认证拦截器 |
| `SocketManagerImpl` | 管理器 | 在线用户管理 |
| `LocalSubManager` | 管理器 | 订阅管理 |
| `SessionSendWorker` | 发送器 | 消息发送工作器 |
| `SessionSendDispatcher` | 分发器 | Disruptor 分片分发器 |
| `WebSocketMessageSender` | 发送器 | 消息发送核心 |
| `NotificationPushStrategy` | 策略 | 通知推送策略 |
| `UserPushStrategy` | 策略 | 用户推送策略 |
| `ChannelCategoryPushStrategy` | 策略 | 频道+类别推送策略 |
| `ChannelPushStrategy` | 策略 | 频道推送策略 |
| `MessagePushStrategySelector` | 选择器 | 策略选择器 |
所有 Bean 均使用 @ConditionalOnMissingBean,业务方可通过自定义同类型 Bean 进行覆盖。
┌─────────────────────────────────────────────────────────────────┐
│ Netty Thread Model │
│ BossGroup (1 thread) → 接受连接 │
│ WorkerGroup (N threads) → I/O 读写、编解码 │
├─────────────────────────────────────────────────────────────────┤
│ 业务线程池 │
│ messageHandleExecutor → 消息业务处理(WebSocketHandler) │
│ heartbeatSendExecutor → 心跳 Ping 发送 │
│ heartbeatExecutor → 心跳定时检测调度(单线程) │
├─────────────────────────────────────────────────────────────────┤
│ Disruptor 发送队列 │
│ shardQueues[0..N-1] → 单用户消息异步发送 │
│ 每个分片有独立的 Worker 线程 │
├─────────────────────────────────────────────────────────────────┤
│ 直接广播 │
│ WebSocketMessageSender → 订阅推送/全员广播(跳过 Disruptor) │
│ 批量 write + 统一 flush │
├─────────────────────────────────────────────────────────────────┤
│ 监控线程 │
│ ws-send-monitor → 每10秒输出发送统计 │
└─────────────────────────────────────────────────────────────────┘| `channel` | String | 频道(如 kline, quote, order, notification, chat) |
|---|---|---|
| `groupId` | String | 群组ID |
| `userSessionId` | String | 用户会话ID(多个逗号分隔) |
| `userId` | String | 用户ID(多个逗号分隔) |
| `data` | Map<String, Object> | 推送数据 |
| `eventType` | String | 事件类型 |
|---|---|---|
| `token` | String | JWT Token |
| `nonce` | String | 随机数(签名验证) |
| `sign` | String | 签名(接口鉴权) |
| `NOTIFICATION` | notification | 通知频道 |
|---|---|---|
| `HEARTBEAT` | heartbeat | 心跳频道 |
| `BIZ` | biz | 业务频道 |
| `NORMAL` | normal | 普通通知 |
|---|---|---|
| `URGENT` | urgent | 紧急通知 |
| `LOGIN` | login | 登录 |
|---|---|---|
| `UNSUBSCRIBE` | unsubscribe | 取消订阅 |
| `PING` | ping | 心跳检测(服务端发送) |
| `PONG` | pong | 心跳响应(客户端发送) |
| `ONLINE` | online | 检查在线状态 |
easyfk-websocket — 实时双向通信,构建高效消息推送能力。