mq-kafka 是 EasyFK 框架中基于 Apache Kafka 的消息队列组件。该模块基于 Spring Kafka,提供统一的消息发送 API(同步/异步)、双序列化通道(JSON 字符串 + Protobuf 二进制)、完整的生产者/消费者自动配置、Kafka Streams 流处理支持,以及丰富的 Streams 工具类和 Serde 工厂,适用于高吞吐量消息传递、事件驱动架构、实时流处理等场景。
<dependency>
<groupId>com.mcst</groupId>
<artifactId>mq-kafka</artifactId>
</dependency>dependencies {
implementation 'com.mcst:mq-kafka'
}> 版本号由框架统一 BOM 管理,无需手动指定。
该模块会自动传递引入以下依赖:
模块通过 easyfk.config.mq.kafka 前缀控制开关:
| `enable-kafka` | Boolean | `false` | 启用 Kafka 消息队列(**必须设为 `true`**) |
|---|
easyfk:
config:
mq:
kafka:
enable-kafka: true
spring:
kafka:
bootstrap-servers: 192.168.1.100:9092,192.168.1.101:9092
consumer:
group-id: my-app-group
auto-offset-reset: earliest
enable-auto-commit: true
max-poll-records: 500
session-timeout-ms: 30000
heartbeat-interval-ms: 10000
concurrency: 3
ack-mode: batch
isolation-level: read_committed
producer:
retries: 3
batch-size: 16384
linger-ms: 5
buffer-memory: 33554432
acks: all
enable-idempotence: true
compression-type: snappy| `producer.retries` | `3` | 发送失败重试次数 |
|---|---|---|
| `producer.linger-ms` | `5` | 批量等待时间(毫秒) |
| `producer.buffer-memory` | `33554432` | 缓冲区大小(32MB) |
| `producer.acks` | `all` | 确认级别(`0`/`1`/`all`) |
| `producer.enable-idempotence` | `true` | 启用幂等性 |
| `producer.compression-type` | `snappy` | 压缩类型(`none`/`gzip`/`snappy`/`lz4`/`zstd`) |
| `producer.request-timeout-ms` | `30000` | 请求超时时间 |
| `producer.delivery-timeout-ms` | `120000` | 传输超时时间 |
| `producer.max-in-flight-requests-per-connection` | `5` | 每个连接最大在途请求数 |
| `consumer.group-id` | `default-group` | 消费者组 ID |
|---|---|---|
| `consumer.enable-auto-commit` | `true` | 是否自动提交偏移量 |
| `consumer.max-poll-records` | `500` | 单次拉取最大记录数 |
| `consumer.session-timeout-ms` | `30000` | 会话超时时间 |
| `consumer.heartbeat-interval-ms` | `10000` | 心跳间隔 |
| `consumer.concurrency` | `3` | 并发消费者数量 |
| `consumer.ack-mode` | `auto` | ACK 模式 |
| `consumer.isolation-level` | `read_committed` | 事务隔离级别 |
| `consumer.fetch-min-bytes` | `1024` | 最小拉取字节数 |
| `consumer.fetch-max-wait-ms` | `500` | 最大拉取等待时间 |
| `auto` / `batch` | 自动批量确认(默认) |
|---|---|
| `manual` | 手动确认 |
| `manual_immediate` | 手动立即确认 |
| `count` | 按计数确认 |
| `time` | 按时间确认 |
| `count_time` | 按计数和时间确认 |
easyfk:
config:
mq:
kafka:
enable-kafka: true
enable-streams: true
streams:
application-id: my-streams-app
state-dir: /tmp/kafka-streams
processing-guarantee: at_least_once
commit-interval-ms: 30000
num-stream-threads: 2
default-deserialization-exception-handler: logAndContinue
default-production-exception-handler: fail
retry-backoff-ms: 100
reconnect-backoff-ms: 50| `application-id` | `streams-app` | Streams 应用标识 |
|---|---|---|
| `processing-guarantee` | `at_least_once` | 处理保证(`at_least_once`/`exactly_once`/`exactly_once_v2`) |
| `commit-interval-ms` | `30000` | 提交间隔(毫秒) |
| `num-stream-threads` | `2` | 流处理线程数 |
| `default-deserialization-exception-handler` | `logAndContinue` | 反序列化异常处理(`logAndContinue`/`logAndFail`) |
| `default-production-exception-handler` | `fail` | 生产异常处理(`fail`/`continue`) |
@Service
public class OrderService {
@Resource
private KafkaProducer kafkaProducer;
}| `syncSendMessage(message)` | 同步发送(阻塞等待确认,30 秒超时) |
|---|
| `syncSendProtobufMessage(message)` | 同步发送 Protobuf 消息 |
|---|
CommonMessage<T> 是 JSON 字符串消息的载体,继承自 BaseMessage:
| `topic` | String | 目标 Topic |
|---|---|---|
| `messageId` | String | 消息 ID(为空时自动生成雪花 ID) |
| `messageKey` | String | 分区路由 Key |
| `partition` | Integer | 指定分区(优先级高于 messageKey) |
| `tags` | String | 消息标签 |
| `sendTimestamp` | Long | 发送时间戳(为空时自动填充) |
| `properties` | Map | 自定义 Header 属性 |
ProtobufMessage<T> 是 Protobuf 二进制消息的载体,属性与 CommonMessage 一致,data 类型为 com.google.protobuf.Message。
CommonMessage<OrderDTO> message = new CommonMessage<>();
message.setTopic("order-topic");
message.setData(orderDTO);
message.setMessageKey(orderDTO.getOrderId());
message.setTags("ORDER_CREATED");
kafkaProducer.syncSendMessage(message);CommonMessage<String> message = new CommonMessage<>();
message.setTopic("notification-topic");
message.setData("用户注册成功");
message.setMessageKey(userId);
kafkaProducer.asyncSendMessage(message);// 假设已定义 OrderProto.Order Protobuf 消息
OrderProto.Order order = OrderProto.Order.newBuilder()
.setOrderId("ORD_001")
.setAmount(9999)
.build();
ProtobufMessage<OrderProto.Order> message = new ProtobufMessage<>();
message.setTopic("order-protobuf-topic");
message.setData(order);
message.setMessageKey("ORD_001");
kafkaProducer.syncSendProtobufMessage(message);CommonMessage<String> message = new CommonMessage<>();
message.setTopic("event-topic");
message.setData("event data");
message.setTags("USER_EVENT");
Map<String, String> props = new HashMap<>();
props.put("source", "order-service");
props.put("traceId", "trace_12345");
message.setProperties(props);
kafkaProducer.syncSendMessage(message);@Component
public class OrderConsumer {
@KafkaListener(topics = "order-topic", containerFactory = "listenerContainerFactory")
public void onMessage(ConsumerRecord<String, String> record) {
OrderDTO order = JSONObject.parseObject(record.value(), OrderDTO.class);
// 处理订单
}
}@Component
public class OrderProtobufConsumer {
@KafkaListener(topics = "order-protobuf-topic", containerFactory = "protobufKafkaListenerContainerFactory")
public void onMessage(ConsumerRecord<String, byte[]> record) {
OrderProto.Order order = ProtobufUtils.deserialize(record.value(), OrderProto.Order.class);
// 处理订单
}
}@Resource
private KafkaMessageUtil kafkaMessageUtil;
@KafkaListener(topics = "order-topic", containerFactory = "listenerContainerFactory")
public void onMessage(ConsumerRecord<String, String> record) {
// 提取消息体
OrderDTO order = kafkaMessageUtil.getMessageBodyFromStringRecord(record, OrderDTO.class);
// 提取 Header 信息
Map<String, String> headers = kafkaMessageUtil.extractHeaders(record);
String messageId = headers.get("messageId");
String tags = headers.get("tags");
}@Component
public class OrderStreamProcessor extends BaseStreamProcessor {
@Override
protected String getProcessorName() {
return "order-stream-processor";
}
@Override
protected String[] getInputTopics() {
return new String[]{"order-topic"};
}
@Override
protected String[] getOutputTopics() {
return new String[]{"order-result-topic"};
}
@Override
protected void buildTopology(StreamsBuilder builder) {
builder.<String, String>stream("order-topic")
.filter((key, value) -> value != null && !value.isEmpty())
.mapValues(value -> {
// 处理逻辑
return processOrder(value);
})
.to("order-result-topic");
}
}@Override
protected void buildTopology(StreamsBuilder builder) {
builder.stream("input-topic", Consumed.with(SerdeFactory.string(), SerdeFactory.byteArray()))
.mapValues(value -> {
// 处理逻辑
return transform(value);
})
.to("output-topic", Produced.with(SerdeFactory.string(), SerdeFactory.byteArray()));
}| `SerdeFactory.string()` | `Serde<String>` | 字符串 |
|---|---|---|
| `SerdeFactory.integer()` | `Serde<Integer>` | Integer |
| `SerdeFactory.doubleValue()` | `Serde<Double>` | Double |
| `SerdeFactory.floatValue()` | `Serde<Float>` | Float |
| `SerdeFactory.byteArray()` | `Serde<byte[]>` | 字节数组 |
| `SerdeFactory.jsonMessage()` | `Serde<CommonMessage>` | JSON 消息 |
| `SerdeFactory.protobufMessage(clazz)` | `Serde<ProtobufMessage>` | Protobuf 消息 |
| `SerdeFactory.baseMessage(clazz)` | `Serde<BaseMessage>` | 基础消息 |
Streams 启用后,StreamProcessorRegistry 自动注册所有 BaseStreamProcessor 实例:
@Resource
private StreamProcessorRegistry registry;
// 获取所有处理器名称
List<String> names = registry.getAllProcessorNames();
// 获取处理器数量
int count = registry.getProcessorCount();
// 获取所有处理器状态
List<ProcessorStatus> statuses = registry.getAllProcessorStatus();
// 获取统计信息
ProcessorStatistics stats = registry.getStatistics();| `extractKeyValue(message)` | 从 BaseMessage 提取 KeyValue |
|---|---|
| `jsonMessageDataExtractor()` | 提取 CommonMessage 中的 data |
| `protobufMessageDataExtractor()` | 提取 ProtobufMessage 中的 data |
| `topicFilter(topic)` | 按 Topic 过滤 |
| `timeRangeFilter(start, end)` | 按时间范围过滤 |
| `topicGrouper()` | 按 Topic 分组 |
| `jsonMessageWrapper(topic)` | 包装为 CommonMessage |
| `protobufMessageWrapper(topic)` | 包装为 ProtobufMessage |
| `KafkaMqConfig` | `enable-kafka = true` | 核心自动配置,注册生产者/消费者工厂和模板 |
|---|
自动注册的 Bean:
| `kafkaTemplate` | `KafkaTemplate<String, String>` | JSON 字符串消息模板 |
|---|---|---|
| `kafkaProducer` | `KafkaProducer` | 消息生产者 |
| `producerFactory` | `ProducerFactory<String, String>` | 字符串生产者工厂 |
| `protobufProducerFactory` | `ProducerFactory<String, byte[]>` | Protobuf 生产者工厂 |
| `consumerFactory` | `ConsumerFactory<String, String>` | 字符串消费者工厂 |
| `protobufConsumerFactory` | `ConsumerFactory<String, byte[]>` | Protobuf 消费者工厂 |
| `listenerContainerFactory` | `ConcurrentKafkaListenerContainerFactory` | 字符串监听容器工厂 |
| `protobufKafkaListenerContainerFactory` | `ConcurrentKafkaListenerContainerFactory` | Protobuf 监听容器工厂 |
com.mcst.easyfk.mq.kafka
├── config
│ ├── KafkaMqConfig.java # 核心自动配置(生产者/消费者工厂)
│ └── KafkaStreamsConfig.java # Kafka Streams 自动配置
├── producer
│ └── KafkaProducer.java # 消息生产者(同步/异步 + JSON/Protobuf)
├── properties
│ └── KafkaMqProperties.java # 配置属性类
├── streams
│ ├── processor
│ │ ├── BaseStreamProcessor.java # 流处理器抽象基类
│ │ └── StreamProcessorRegistry.java # 处理器注册管理器
│ └── serde
│ ├── BaseMessageSerde.java # BaseMessage 序列化器
│ ├── ProtobufMessageSerde.java # Protobuf 消息序列化器
│ └── SerdeFactory.java # Serde 工厂类
└── util
└── KafkaMessageUtil.java # 消息处理工具类1. 合理选择发送模式:需要可靠性保证用 syncSendMessage,追求吞吐量用 asyncSendMessage。
2. 善用 messageKey:相同 key 的消息会路由到同一分区,保证局部有序性。
3. 启用幂等性:enable-idempotence: true(默认已启用),防止网络抖动导致重复消息。
4. 压缩优化:默认使用 snappy 压缩,平衡压缩率与速度。大消息可考虑 lz4 或 zstd。
5. 消费者并发:concurrency 设置应不超过 Topic 分区数,否则多余的消费者空闲。
6. Protobuf 优先:高吞吐量场景推荐使用 Protobuf 序列化,体积更小、序列化更快。
7. Streams 异常处理:生产环境建议 defaultDeserializationExceptionHandler 设为 logAndContinue,避免单条异常消息阻塞流处理。
8. 流处理器命名:每个 BaseStreamProcessor 使用唯一的 getProcessorName(),便于监控和排查。
9. ACK 模式选择:高可靠场景用 manual 或 manual_immediate,高吞吐场景用 batch(默认)。
easyfk-mq-kafka — 高吞吐流式数据平台集成方案。