EBEasyBuild Docs
文档/后端/Chronicle Queue

Chronicle Queue 组件开发手册

高性能持久化消息队列 · 零 GC · 微秒级延迟阅读时间 ~25 min

本手册面向使用 EasyFK 框架的开发人员,详细介绍 chronicle-queue 组件的接入方式、配置说明、API 用法、最佳实践及常见问题。

一、快速开始

1.1 引入依赖

Gradle 方式(在业务模块的 build.gradle 中添加):

groovy
dependencies {
    implementation("com.mcst:chronicle-queue")
}

Maven 方式(在业务模块的 pom.xml 中添加):

xml
<dependency>
    <groupId>com.mcst</groupId>
    <artifactId>chronicle-queue</artifactId>
</dependency>
TIP
版本由 EasyFK BOM(com.mcst:easyfk-dependencies)统一管理,无需手动指定版本号。如果项目未引入 BOM,需在 Maven 中先声明:
xml
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.mcst</groupId>
            <artifactId>easyfk-dependencies</artifactId>
            <version>${easyfk.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

1.2 最小化配置

零配置即可使用。组件提供了合理的默认值:

配置项默认值说明
存储路径./chronicle-queues队列文件存储根目录
滚动周期FAST_DAILY每天滚动生成新文件
块大小0(使用 Chronicle 默认)内存映射文件块大小

如果默认值满足需求,不需要任何 YAML 配置,直接注入 ChronicleQueueTemplate 即可开始使用。

1.3 第一个示例

java
import com.easyfk.chronicle.queue.template.ChronicleQueueTemplate;
import jakarta.annotation.Resource;
import org.springframework.stereotype.Service;

@Service
public class MyService {

    @Resource
    private ChronicleQueueTemplate chronicleQueueTemplate;

    public void demo() {
        long index = chronicleQueueTemplate.writeText("my-queue", "Hello Chronicle Queue!");
        String message = chronicleQueueTemplate.readText("my-queue", "my-consumer");
        System.out.println("写入索引: " + index);
        System.out.println("读取到: " + message);
    }
}
TIP
关键点:队列 my-queue 不需要预先创建,首次使用时会自动创建并初始化。

二、配置详解

2.1 全局默认配置

yaml
easyfk:
  config:
    storage:
      queue:
        default-path: ./chronicle-queues
        default-roll-cycle: FAST_DAILY
        default-block-size: 0
        create-directories-on-startup: true

2.2 多队列预定义

yaml
easyfk:
  config:
    storage:
      queue:
        default-path: /data/chronicle-queues
        default-roll-cycle: FAST_DAILY
        queues:
          - queue-name: order-events
            path: /data/chronicle-queues/orders
            roll-cycle: FAST_HOURLY
            block-size: 67108864
          - queue-name: log-buffer
            roll-cycle: FIVE_MINUTELY
          - queue-name: metrics
            roll-cycle: TEN_MINUTELY

字段说明:

字段必填说明
queue-name队列唯一标识,用于后续 API 调用
path自定义存储路径,不配置则为 defaultPath/queueName
roll-cycle覆盖全局默认滚动周期
block-size覆盖全局默认块大小,0 表示使用全局默认

2.3 配置优先级

plaintext
队列定义中的 path / rollCycle / blockSize
        ↓ 未配置则使用
全局默认 defaultPath / defaultRollCycle / defaultBlockSize
        ↓ 未配置则使用
内置默认值 (./chronicle-queues / FAST_DAILY / Chronicle 内部默认)

2.4 滚动周期说明

滚动周期切割频率适用场景
FAST_DAILY每天默认推荐,适合大多数场景
FAST_HOURLY每小时高吞吐场景,便于按小时归档清理
FIVE_MINUTELY每 5 分钟超高吞吐,需要更细粒度文件切割
TEN_MINUTELY每 10 分钟高吞吐场景
TWENTY_MINUTELY每 20 分钟中高吞吐场景
HALF_HOURLY每 30 分钟中等吞吐场景
WEEKLY每周低吞吐场景,减少文件数量
TIP
选型建议:写入频率越高,应选择越短的滚动周期,以避免单个文件过大。大多数场景使用默认的 FAST_DAILY 即可。

三、核心 API 详解

组件提供两个核心类,推荐优先使用 ChronicleQueueTemplate

3.1 ChronicleQueueTemplate(推荐)

java
@Resource
private ChronicleQueueTemplate chronicleQueueTemplate;

3.1.1 写入方法

方法参数返回值说明
writeText(queueName, text)队列名, 文本long(索引)写入纯文本消息
writeDocument(queueName, writer)队列名, Consumer<WireOut>long(索引)写入结构化数据
writeKeyValue(queueName, key, value)队列名, 键, 值long(索引)写入键值对

3.1.2 读取方法(命名 Tailer,自动记录位置)

方法参数返回值说明
readText(queueName, tailerId)队列名, 消费者IDString 或 null读取下一条文本(RESUME 策略)
readText(queueName, tailerId, strategy)队列名, 消费者ID, 策略String 或 null指定策略读取文本
readDocument(queueName, tailerId, reader)队列名, 消费者ID, 读取函数T 或 null读取结构化数据(RESUME 策略)
readDocument(queueName, tailerId, strategy, reader)队列名, 消费者ID, 策略, 读取函数T 或 null指定策略读取结构化数据
readTextBatch(queueName, tailerId, maxCount)队列名, 消费者ID, 最大条数List<String>批量读取文本(RESUME 策略)
readTextBatch(queueName, tailerId, maxCount, strategy)队列名, 消费者ID, 最大条数, 策略List<String>指定策略批量读取
readDocumentBatch(queueName, tailerId, maxCount, reader)队列名, 消费者ID, 最大条数, 读取函数List<T>批量读取结构化数据
readDocumentBatch(queueName, tailerId, maxCount, strategy, reader)全部参数List<T>指定策略批量读取结构化数据

3.1.3 查看方法(不影响消费位置)

方法参数返回值说明
peekLastText(queueName)队列名String 或 null查看最后一条消息
peekFirstText(queueName)队列名String 或 null查看第一条消息
peekTextAtIndex(queueName, index)队列名, 索引String 或 null查看指定索引消息

3.1.4 位置控制方法

方法参数返回值说明
toEnd(queueName, tailerId)队列名, 消费者IDvoid跳到队列末尾
toStart(queueName, tailerId)队列名, 消费者IDvoid跳到队列开头
moveToIndex(queueName, tailerId, index)队列名, 消费者ID, 索引boolean移动到指定索引
getTailerIndex(queueName, tailerId)队列名, 消费者IDlong获取当前位置索引

3.1.5 队列信息方法

方法参数返回值说明
queueExists(queueName)队列名boolean检查队列是否存在
getAllQueueNames()Set<String>获取所有队列名称

3.1.6 原生对象访问(高级)

方法参数返回值说明
getAppender(queueName)队列名ExcerptAppender获取原生写入器
createTailer(queueName)队列名ExcerptTailer创建匿名读取器
createTailer(queueName, tailerId)队列名, 消费者IDExcerptTailer创建命名读取器
getQueue(queueName)队列名ChronicleQueue获取底层队列实例

3.2 ChronicleQueueManager(高级)

java
@Resource
private ChronicleQueueManager chronicleQueueManager;
方法说明
getOrCreateQueue(queueName)获取或创建队列(默认配置)
getOrCreateQueue(queueName, path, rollCycle, blockSize)获取或创建队列(自定义配置)
registerQueue(queueName, queue)注册外部创建的队列实例
getQueue(queueName)获取已有队列,不存在返回 null
removeQueue(queueName)移除并关闭队列
containsQueue(queueName)检查队列是否存在
getQueueNames()获取所有队列名称
getAppender(queueName)获取写入器
createTailer(queueName)创建匿名读取器
createTailer(queueName, tailerId)创建命名读取器
closeAll()关闭所有队列

四、消费策略详解

4.1 四种消费策略

TailerStartStrategy 枚举控制命名 Tailer 的起始读取位置,概念类似于 Kafka 的 auto.offset.reset

START — 每次从头开始

java
String msg = template.readText("my-queue", "replay-consumer", TailerStartStrategy.START);

使用场景:数据重放、全量统计、测试验证。

END — 每次从尾开始

java
String msg = template.readText("my-queue", "realtime-consumer", TailerStartStrategy.END);

使用场景:实时监控面板、只关注增量数据。

RESUME — 恢复位置,首次从头(默认)

java
String msg = template.readText("my-queue", "normal-consumer");
// 等价于
String msg = template.readText("my-queue", "normal-consumer", TailerStartStrategy.RESUME);

使用场景:常规业务消费,确保不丢消息,类似 Kafka auto.offset.reset=earliest

RESUME_LATEST — 恢复位置,首次从尾

java
String msg = template.readText("my-queue", "new-consumer", TailerStartStrategy.RESUME_LATEST);

使用场景:新上线的监控服务,不需要处理历史积压,只关心新消息。类似 Kafka auto.offset.reset=latest

4.2 策略选型指南

plaintext
是否需要回溯历史数据?
├── 是 → 每次都需要全量重放吗?
│   ├── 是 → START
│   └── 否 → RESUME(首次从头,之后续读)
└── 否 → 是新消费者还是已有消费者?
    ├── 新消费者,不关心历史 → RESUME_LATEST
    └── 只要实时数据,不需要续读 → END

4.3 多消费者模式

java
// 消费者 A:处理订单
String orderMsg = template.readText("events", "order-processor");

// 消费者 B:记录审计日志(独立进度)
String auditMsg = template.readText("events", "audit-logger");

// 消费者 C:实时统计(只关注新数据)
String statMsg = template.readText("events", "stat-collector", TailerStartStrategy.RESUME_LATEST);
注意
重要tailerId 是消费位置的唯一标识。相同的 tailerId 共享同一个读取位置,不同 tailerId 完全独立。

五、典型使用场景

5.1 写入文本消息

java
long index = template.writeText("order-events", "ORDER_CREATED:12345");
log.info("消息已写入,索引: {}", index);

5.2 写入结构化数据

java
long index = template.writeDocument("user-events", (WireOut wire) -> {
    wire.write("eventType").text("USER_LOGIN");
    wire.write("userId").int64(10001L);
    wire.write("username").text("zhangsan");
    wire.write("loginTime").int64(System.currentTimeMillis());
    wire.write("ip").text("192.168.1.100");
});

5.3 写入键值对

java
template.writeKeyValue("config-changes", "db.maxPoolSize", "50");
template.writeKeyValue("config-changes", "cache.ttl", "3600");

5.4 单条消费

java
String msg = template.readText("order-events", "order-handler");
if (msg != null) {
    processOrder(msg);
}

5.5 批量消费

java
List<String> messages = template.readTextBatch("order-events", "batch-handler", 100);
if (!messages.isEmpty()) {
    log.info("本批次读取 {} 条消息", messages.size());
    messages.forEach(this::processOrder);
}

结合定时任务实现持续消费:

java
@Scheduled(fixedDelay = 100)
public void consumeOrders() {
    List<String> batch = template.readTextBatch("order-events", "scheduled-consumer", 200);
    batch.forEach(this::processOrder);
}

5.6 结构化数据消费

java
UserLoginEvent event = template.readDocument("user-events", "event-handler", wire -> {
    String eventType = wire.read("eventType").text();
    long userId = wire.read("userId").int64();
    String username = wire.read("username").text();
    long loginTime = wire.read("loginTime").int64();
    String ip = wire.read("ip").text();
    return new UserLoginEvent(eventType, userId, username, loginTime, ip);
});

List<UserLoginEvent> events = template.readDocumentBatch(
    "user-events", "batch-event-handler", 50,
    wire -> {
        return new UserLoginEvent(
            wire.read("eventType").text(),
            wire.read("userId").int64(),
            wire.read("username").text(),
            wire.read("loginTime").int64(),
            wire.read("ip").text()
        );
    }
);

5.7 查看消息(不移动消费位置)

java
String latest = template.peekLastText("order-events");
String earliest = template.peekFirstText("order-events");
String specific = template.peekTextAtIndex("order-events", 86400000000001L);
TIP
典型用途:管理后台展示队列最新/最早消息、调试时检查特定消息内容。

5.8 消费位置控制

java
template.toEnd("order-events", "my-consumer");
template.toStart("order-events", "my-consumer");
boolean success = template.moveToIndex("order-events", "my-consumer", targetIndex);
long currentIndex = template.getTailerIndex("order-events", "my-consumer");

5.9 高级用法:直接操作原生对象

java
ExcerptAppender appender = template.getAppender("high-freq-queue");
for (int i = 0; i < 1_000_000; i++) {
    appender.writeText("message-" + i);
}

ExcerptTailer tailer = template.createTailer("high-freq-queue", "raw-consumer");
while (true) {
    String text = tailer.readText();
    if (text == null) break;
}

ChronicleQueue queue = template.getQueue("my-queue");
long lastIndex = queue.lastIndex();

六、注意事项与常见陷阱

6.1 队列名称必须唯一且稳定

  • 队列名称(queueName)是队列的唯一标识,映射到磁盘上的目录
  • 不要在运行时动态修改队列名称,否则会导致创建新队列、丢失旧数据

6.2 tailerId 命名规范

  • tailerId 决定了消费位置的持久化键,不同业务逻辑必须使用不同的 tailerId
  • 推荐命名格式:{服务名}-{业务功能},例如 order-service-handleraudit-logger
  • 相同 tailerId 的多个调用共享同一个读取位置

6.3 readText / readDocument 返回 null 的含义

返回 null 表示当前没有更多消息可读,并非错误:

java
String msg = template.readText("my-queue", "consumer");
if (msg == null) {
    return;
}

// 错误:不检查 null 直接使用
processMessage(template.readText("my-queue", "consumer")); // 可能 NPE!

6.4 结构化数据读写字段顺序

使用 Wire 协议读写时,读取字段的顺序必须与写入顺序一致

java
// 写入
template.writeDocument("queue", wire -> {
    wire.write("name").text("张三");
    wire.write("age").int32(25);
    wire.write("email").text("a@b.com");
});

// 读取 — 字段顺序必须一致
template.readDocument("queue", "consumer", wire -> {
    String name = wire.read("name").text();
    int age = wire.read("age").int32();
    String email = wire.read("email").text();
    return new User(name, age, email);
});

6.5 不要手动关闭 Template 获取的对象

  • 不要手动调用 queue.close()appender.close()
  • 队列关闭由 Spring 容器销毁时自动处理

6.6 存储路径权限

  • 确保应用进程对配置的存储路径有读写权限
  • 避免将存储路径指向临时目录(如 /tmp

6.7 文件数据不可手动编辑

  • Chronicle Queue 的数据文件是二进制格式,禁止手动编辑或截断
  • 如需清理历史数据,应在队列关闭后删除整个目录

6.8 懒加载行为

  • ChronicleQueueManager 的 Bean 使用 @Lazy 注解,只有在首次被使用时才会初始化
  • 配置了 queues 预定义列表的队列会在 Manager 初始化时创建
  • 未预定义的队列在首次调用时自动创建

七、性能优化指南

7.1 批量操作优于单条操作

java
// 低效
for (int i = 0; i < 100; i++) {
    String msg = template.readText("queue", "consumer");
    if (msg != null) process(msg);
}

// 高效
List<String> batch = template.readTextBatch("queue", "consumer", 100);
batch.forEach(this::process);

7.2 高频写入使用原生 Appender

java
ExcerptAppender appender = template.getAppender("high-freq-queue");
for (String data : largeDataSet) {
    appender.writeText(data);
}

7.3 合理选择滚动周期

写入频率推荐滚动周期原因
< 1000 条/天FAST_DAILY 或 WEEKLY减少文件数量
1K ~ 100K 条/天FAST_DAILY默认推荐
100K ~ 1M 条/天FAST_HOURLY单文件不会过大
> 1M 条/天FIVE_MINUTELY / TEN_MINUTELY控制单文件大小

7.4 块大小调优

  • 默认块大小由 Chronicle Queue 内部决定(通常为 64MB)
  • 写入消息体较大时可适当增大 blockSize
  • 内存受限环境可减小 blockSize(最小建议 16MB)

7.5 磁盘选型

  • 强烈推荐使用 SSD
  • HDD 在高吞吐场景下会成为瓶颈
  • 网络存储(NFS/CIFS)需评估延迟影响

7.6 消费线程模型建议

java
// 方案 A:定时任务拉取
@Scheduled(fixedDelay = 50)
public void consume() {
    List<String> batch = template.readTextBatch("queue", "consumer", 500);
    batch.forEach(this::process);
}

// 方案 B:独立线程轮询
@PostConstruct
public void startConsumer() {
    Thread consumer = new Thread(() -> {
        while (!Thread.currentThread().isInterrupted()) {
            String msg = template.readText("queue", "consumer");
            if (msg != null) {
                process(msg);
            } else {
                LockSupport.parkNanos(1_000_000);
            }
        }
    }, "chronicle-consumer");
    consumer.setDaemon(true);
    consumer.start();
}

7.7 JVM 参数配置(重要)

Chronicle Queue 底层大量使用 sun.misc.Unsafe、堆外内存(mmap)和 JDK 内部 API。在 Java 21 的强封装模块系统下,必须正确配置 JVM 参数

7.7.1 必需参数:Java 模块系统开放(Java 9+)

plaintext
--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED
--add-exports=java.base/sun.nio.ch=ALL-UNNAMED
--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED
--add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED

--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED

在 Gradle 中配置:

groovy
tasks.withType(JavaExec).configureEach {
    jvmArgs(
        '--add-exports', 'java.base/jdk.internal.ref=ALL-UNNAMED',
        '--add-exports', 'java.base/sun.nio.ch=ALL-UNNAMED',
        '--add-exports', 'java.base/jdk.internal.misc=ALL-UNNAMED',
        '--add-exports', 'jdk.unsupported/sun.misc=ALL-UNNAMED',
        '--add-opens', 'java.base/java.lang=ALL-UNNAMED',
        '--add-opens', 'java.base/java.lang.reflect=ALL-UNNAMED',
        '--add-opens', 'java.base/java.io=ALL-UNNAMED',
        '--add-opens', 'java.base/sun.nio.ch=ALL-UNNAMED',
        '--add-opens', 'java.base/java.util=ALL-UNNAMED'
    )
}

tasks.withType(Test).configureEach {
    jvmArgs(
        '--add-exports', 'java.base/jdk.internal.ref=ALL-UNNAMED',
        '--add-exports', 'java.base/sun.nio.ch=ALL-UNNAMED',
        '--add-opens', 'java.base/java.lang=ALL-UNNAMED',
        '--add-opens', 'java.base/java.lang.reflect=ALL-UNNAMED',
        '--add-opens', 'java.base/java.io=ALL-UNNAMED',
        '--add-opens', 'java.base/sun.nio.ch=ALL-UNNAMED'
    )
}

在 Spring Boot 打包后运行(生产环境):

bash
java \
  --add-exports=java.base/jdk.internal.ref=ALL-UNNAMED \
  --add-exports=java.base/sun.nio.ch=ALL-UNNAMED \
  --add-exports=java.base/jdk.internal.misc=ALL-UNNAMED \
  --add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED \
  --add-opens=java.base/java.lang=ALL-UNNAMED \
  --add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
  --add-opens=java.base/java.io=ALL-UNNAMED \
  --add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
  --add-opens=java.base/java.util=ALL-UNNAMED \
  -jar your-application.jar

推荐做法:将参数写入启动脚本或 Dockerfile:

bash
# start.sh
export JAVA_OPTS="--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED \
  --add-exports=java.base/sun.nio.ch=ALL-UNNAMED \
  --add-exports=java.base/jdk.internal.misc=ALL-UNNAMED \
  --add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED \
  --add-opens=java.base/java.lang=ALL-UNNAMED \
  --add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
  --add-opens=java.base/java.io=ALL-UNNAMED \
  --add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
  --add-opens=java.base/java.util=ALL-UNNAMED"

java $JAVA_OPTS -jar your-application.jar
dockerfile
# Dockerfile
ENV JAVA_OPTS="--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED \
  --add-exports=java.base/sun.nio.ch=ALL-UNNAMED \
  --add-exports=java.base/jdk.internal.misc=ALL-UNNAMED \
  --add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED \
  --add-opens=java.base/java.lang=ALL-UNNAMED \
  --add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
  --add-opens=java.base/java.io=ALL-UNNAMED \
  --add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
  --add-opens=java.base/java.util=ALL-UNNAMED"
ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar /app.jar"]

7.7.2 内存相关参数

plaintext
-Xms512m -Xmx2g
-XX:MaxDirectMemorySize=4g

关于 mmap 内存的误区

  • mmap 使用的是操作系统页缓存,不计入 Java 堆和直接内存
  • 操作系统会自动管理 mmap 的内存映射
  • 不需要刻意为 mmap 预留 Java 内存
  • 经验公式:物理内存 > Xmx + MaxDirectMemorySize + 活跃队列总数据量 + 操作系统开销

7.7.3 GC 调优建议

plaintext
-XX:+UseZGC

# 或
-XX:+UseG1GC
-XX:MaxGCPauseMillis=50

# 或对延迟极度敏感
-XX:+UseShenandoahGC
TIP
即使使用了 Chronicle Queue 的零 GC 写入,消费端反序列化仍会产生堆内存分配。

7.7.4 操作系统级参数(Linux 生产环境)

bash
ulimit -n 65536
sysctl -w vm.max_map_count=262144
sysctl -w vm.dirty_ratio=10
sysctl -w vm.dirty_background_ratio=5
swapoff -a

7.7.5 大页内存(可选,高级优化)

bash
echo 2048 > /proc/sys/vm/nr_hugepages

JVM 参数:

plaintext
-XX:+UseLargePages
-XX:LargePageSizeInBytes=2m

7.7.6 完整 JVM 参数参考(生产环境推荐)

bash
java \
  --add-exports=java.base/jdk.internal.ref=ALL-UNNAMED \
  --add-exports=java.base/sun.nio.ch=ALL-UNNAMED \
  --add-exports=java.base/jdk.internal.misc=ALL-UNNAMED \
  --add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED \
  --add-opens=java.base/java.lang=ALL-UNNAMED \
  --add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
  --add-opens=java.base/java.io=ALL-UNNAMED \
  --add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
  --add-opens=java.base/java.util=ALL-UNNAMED \
  -Xms1g -Xmx2g \
  -XX:MaxDirectMemorySize=4g \
  -XX:+UseZGC \
  -jar your-application.jar

7.7.7 常见 JVM 相关启动报错速查

报错信息原因解决方案
InaccessibleObjectException缺少 --add-opens 参数添加 7.7.1 节的模块开放参数
IllegalAccessError: module java.base does not opens/exports缺少 --add-exports 参数添加 7.7.1 节的模块开放参数
java.io.IOException: Map failedvm.max_map_count 不够或内存不足sysctl -w vm.max_map_count=262144
Cannot allocate memory物理内存不足增加内存或减少队列数量/块大小
Too many open files文件描述符不够ulimit -n 65536
OutOfMemoryError: Direct buffer memoryMaxDirectMemorySize 过小增大 -XX:MaxDirectMemorySize
java.lang.UnsatisfiedLinkErrorJNI 库缺失或架构不匹配确保 OS 架构与 JDK 一致

八、运维与监控

8.1 磁盘空间监控

  • 数据文件位于配置的 path 目录下
  • 每个滚动周期生成一个 .cq4 文件
  • 可通过定时任务删除过期的 .cq4 文件来释放空间

8.2 队列状态查看

java
Set<String> names = template.getAllQueueNames();
log.info("当前活跃队列: {}", names);

long consumerIndex = template.getTailerIndex("order-events", "my-consumer");
log.info("消费者当前位置: {}", consumerIndex);

String latestMsg = template.peekLastText("order-events");

8.3 优雅停机

组件内置了双重关闭机制:

  1. Spring DisposableBean:Spring 容器关闭时自动调用 destroy() 关闭所有队列
  2. JVM ShutdownHook:作为兜底机制

8.4 数据清理策略

java
@Scheduled(cron = "0 0 3 * * ?")
public void cleanOldData() {
    File queueDir = new File("/data/chronicle-queues/order-events");
    File[] files = queueDir.listFiles((dir, name) -> name.endsWith(".cq4"));
    if (files == null) return;

    long cutoffTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(7);
    for (File file : files) {
        if (file.lastModified() < cutoffTime) {
            if (file.delete()) {
                log.info("已清理过期队列文件: {}", file.getName());
            }
        }
    }
}
注意
清理文件时确保对应时间段的消息已被所有消费者消费完毕。

九、FAQ 常见问题

Q1: 队列是否支持分布式/多进程访问?

Chronicle Queue 支持单写多读的跨进程访问模式。写入端建议只有一个进程。如果需要分布式消息队列,请使用 Kafka 等专业方案。

Q2: 进程重启后消费位置会丢失吗?

不会。命名 Tailer 的读取位置由 Chronicle Queue 自动持久化到队列目录中。匿名 Tailer 的位置不会持久化。

Q3: 写入失败会抛异常吗?

会。磁盘空间不足、路径权限不足或 mmap 分配失败时会抛出异常。建议在关键业务路径中添加异常捕获。

Q4: 消息有大小限制吗?

单条消息受限于 blockSize,不建议超过块大小的 1/4。一般建议控制在 1MB 以内

Q5: 如何实现消息的顺序消费?

Chronicle Queue 天然保证写入顺序 = 读取顺序(FIFO)。无需额外处理。

Q6: 可以删除队列中的某条消息吗?

不可以。Chronicle Queue 是追加写入(Append-Only)的数据结构。如需逻辑删除,写入“删除标记”消息。

Q7: 如何估算磁盘用量?

plaintext
每日磁盘用量 ≈ 每日消息条数 × 平均消息大小 × 1.1(元数据开销约 10%)

Q8: 队列可以动态创建吗?

可以。首次调用 writeText("new-queue", "msg") 时自动创建。也可通过 ChronicleQueueManager.getOrCreateQueue() 手动创建。

Q9: 与 chronicle-map 组件有什么区别?

维度chronicle-queuechronicle-map
数据模型有序消息队列(FIFO)键值存储(Key-Value)
使用场景消息传递、事件溯源缓存、共享状态
读取方式顺序消费(Tailer)随机访问(Key 查找)
数据保留追加写入,不可删除可更新、可删除

附录:完整配置示例

yaml
easyfk:
  config:
    storage:
      queue:
        default-path: /data/app/chronicle-queues
        default-roll-cycle: FAST_DAILY
        default-block-size: 0
        create-directories-on-startup: true
        queues:
          - queue-name: order-events
            path: /data/app/chronicle-queues/orders
            roll-cycle: FAST_HOURLY
            block-size: 134217728
          - queue-name: log-buffer
            roll-cycle: FIVE_MINUTELY
          - queue-name: metrics
            roll-cycle: TEN_MINUTELY
          - queue-name: audit-events
— END —