easyfk-thread 是 EasyFK 框架的线程池管理与异步任务模块,提供以下核心能力:
dependencies {
compileOnly project(':easyfk-core')
}easyfk-thread/
├── build.gradle
└── src/main/java/com/mcst/easyfk/thread/
├── config/
│ ├── ThreadPoolConfig.java # 线程池自动配置(核心)
│ └── ThreadPoolMonitorAutoConfig.java # 监控自动配置
├── properties/
│ ├── ThreadPoolBaseProperties.java # 线程池基础配置
│ ├── ThreadPoolArgs.java # 线程池参数(可复用)
│ ├── ThreadPoolMonitorProperties.java # 监控配置
│ └── DynamicAdjustmentConfig.java # 动态调整配置
├── CustomTaskExecutor.java # 增强的任务执行器(TraceId 传递)
├── ThreadPoolMetrics.java # 线程池指标数据
├── ThreadPoolMonitor.java # 线程池监控器
├── AsyncThreadPoolMonitor.java # 异步线程池监控器
├── ThreadPoolMonitorManager.java # 监控器管理器
├── AdjustmentDecision.java # 调整决策
├── async/
│ └── CompletableFutureConvert.java # CompletableFuture 转换工具
└── retry/
├── RetryConfig.java # 重试配置
├── RetryContext.java # 重试上下文
├── RetryCallback.java # 重试回调接口
├── BackoffStrategy.java # 退避策略枚举
├── RetryableOperation.java # 可重试操作接口
└── RetryExecutor.java # 重试执行器配置前缀:easyfk.config.thread.pool
| `create-common-pool` | Boolean | `false` | 是否创建通用线程池 |
|---|---|---|---|
| `create-schedule-pool` | Boolean | `false` | 是否创建任务调度线程池(配合 @Scheduled) |
| `create-io-intensive-pool` | Boolean | `false` | 是否创建 I/O 密集型线程池 |
| `create-cpu-intensive-pool` | Boolean | `false` | 是否创建 CPU 密集型线程池 |
| `create-retry` | Boolean | `false` | 是否创建重试线程池 + RetryExecutor |
每种线程池都有独立的参数配置:
| `core-pool-size` | Integer | CPU 核心数 | 核心线程数(未设置时自动获取) |
|---|---|---|---|
| `queue-capacity` | Integer | 200 | 队列容量 |
| `keep-alive-seconds` | Integer | 60 | 空闲线程存活时间 |
| `thread-name-prefix` | String | `custom-thread-` | 线程名前缀 |
| `wait-for-tasks-to-complete-on-shutdown` | Boolean | `true` | 关闭时等待任务完成 |
| `await-termination-seconds` | Integer | 60 | 等待终止时间 |
| `allow-core-thread-time-out` | Boolean | `false` | 核心线程是否超时回收 |
| `prestart-all-core-threads` | Boolean | `false` | 是否预热核心线程 |
| `rejected-execution-handler` | String | `CALLER_RUNS` | 拒绝策略:ABORT / CALLER_RUNS / DISCARD / DISCARD_OLDEST |
配置路径映射:
| 通用 | `easyfk.config.thread.pool.common-pool-args.*` |
|---|---|
| 调度 | `easyfk.config.thread.pool.schedule-pool-args.*` |
| I/O 密集 | `easyfk.config.thread.pool.io-intensive-pool-args.*` |
| CPU 密集 | `easyfk.config.thread.pool.cpu-intensive-pool-args.*` |
| 重试 | `easyfk.config.thread.pool.retry-pool-args.*` |
配置前缀:easyfk.config.thread.pool.monitor
| `enabled` | boolean | `false` | 是否启用监控 |
|---|---|---|---|
| `auto-start` | boolean | `true` | 是否自动启动监控 |
| `include-names` | List<String> | `[]` | 包含的线程池名称(空表示全部) |
| `exclude-names` | List<String> | `[]` | 排除的线程池名称 |
| `enabled` | Boolean | `true` | 是否启用动态调整 |
|---|---|---|---|
| `cooldown-period` | Duration | 2min | 冷却期(两次调整最小间隔) |
| `min-core-pool-size` | Integer | 2 | 最小核心线程数 |
| `max-core-pool-size` | Integer | 20 | 最大核心线程数 |
| `scale-up-step` | Integer | 2 | 扩容步长 |
| `scale-down-step` | Integer | 1 | 缩容步长 |
| `high-load-threshold` | Double | 80.0 | 高负载阈值(线程利用率 %) |
| `low-load-threshold` | Double | 30.0 | 低负载阈值(线程利用率 %) |
| `queue-high-threshold` | Double | 70.0 | 队列高负载阈值(%) |
| `queue-low-threshold` | Double | 20.0 | 队列低负载阈值(%) |
| `async-monitoring-enabled` | Boolean | `true` | 是否启用异步监控 |
| `async-monitoring-pool-size` | Integer | 2 | 异步监控线程池大小 |
| `async-monitoring-queue-capacity` | Integer | 100 | 异步监控队列容量 |
| `async-monitoring-timeout-seconds` | Integer | 30 | 异步监控超时时间 |
easyfk:
config:
thread:
pool:
create-common-pool: true
create-async-pool: true
create-schedule-pool: true
create-io-intensive-pool: true
create-retry: true
common-pool-args:
core-pool-size: 8
max-pool-size: 16
queue-capacity: 500
thread-name-prefix: "common-"
rejected-execution-handler: CALLER_RUNS
async-pool-args:
core-pool-size: 4
queue-capacity: 200
io-intensive-pool-args:
core-pool-size: 16
max-pool-size: 32
queue-capacity: 1000
keep-alive-seconds: 120
retry-pool-args:
core-pool-size: 2
thread-name-prefix: "retry-"
monitor:
enabled: true
auto-discovery: true
auto-start: true
exclude-names:
- retryScheduledExecutor
default-config:
monitoring-interval: 30s
cooldown-period: 2m
high-load-threshold: 80.0
low-load-threshold: 30.0
async-monitoring-enabled: true
configs:
commonTaskExecutor:
high-load-threshold: 70.0
max-core-pool-size: 30
scale-up-step: 3@AutoConfiguration 类,注册所有线程池 Bean:
| `commonTaskExecutor` | `CustomTaskExecutor` | `create-common-pool=true` | 通用线程池 |
|---|---|---|---|
| `scheduleTaskExecutor` / `taskScheduler` | `TaskScheduler` | `create-schedule-pool=true` | 调度线程池(`@Primary`) |
| `ioIntensiveTaskExecutor` | `CustomTaskExecutor` | `create-io-intensive-pool=true` | I/O 密集型线程池 |
| `cpuIntensiveTaskExecutor` | `CustomTaskExecutor` | `create-cpu-intensive-pool=true` | CPU 密集型线程池 |
| `retryScheduledExecutor` | `ScheduledExecutorService` | `create-retry=true` | 重试调度线程池 |
| `retryExecutor` | `RetryExecutor` | `create-retry=true` | 重试执行器 |
@AutoConfiguration 类,条件:monitor.enabled=true
继承 ThreadPoolTaskExecutor,重写 execute() 方法:
public class CustomTaskExecutor extends ThreadPoolTaskExecutor {
@Override
public void execute(Runnable task) {
final String traceId = TraceIdContext.getTraceId();
super.execute(() -> {
try {
if (EmptyUtil.isNotEmpty(traceId)) {
TraceIdContext.setTraceId(traceId);
MDC.put(RequestHeaderConstant.TRACE_ID, traceId);
}
task.run();
} finally {
TraceIdContext.remove();
MDC.remove(RequestHeaderConstant.TRACE_ID);
}
});
}
}功能:
| `corePoolSize` | 核心线程数 |
|---|---|
| `activeThreadCount` | 活跃线程数 |
| `poolSize` | 当前线程池大小 |
| `queueSize` | 队列中等待任务数 |
| `queueCapacity` | 队列容量 |
| `completedTaskCount` | 已完成任务数 |
| `taskCount` | 总任务数 |
| `threadUtilization` | 线程利用率(active / core × 100) |
| `queueUtilization` | 队列利用率(queueSize / capacity × 100) |
| `loadStatus` | 负载状态:LOW / MEDIUM / HIGH / CRITICAL |
| `SCALE_UP` | 扩容 |
|---|---|
| `NO_CHANGE` | 无需调整 |
提供工厂方法:scaleUp()、scaleDown()、noChange()
核心功能:
支持同步 / 异步两种监控模式,OOM 时自动降级为同步模式。
集中管理所有监控器:
// 获取指定监控器
ThreadPoolMonitor monitor = manager.getMonitor("commonTaskExecutor");
// 获取所有统计
Map<String, Map<String, Object>> stats = manager.getAllStatistics();
// 启动/停止所有监控
manager.startAllMonitoring();
manager.stopAllMonitoring();CompletableFuture<Result> future = CompletableFutureConvert.async(
() -> slowService.query(param),
5, // 超时秒数
ioIntensiveTaskExecutor
);CompletableFuture<UserDto> future = CompletableFutureConvert.asyncForBaseResult(
() -> userApi.getUser(userId),
3, // 超时秒数
ioIntensiveTaskExecutor
);
// future 直接包含 UserDto,无需手动解包 BaseResult
// 失败时自动抛出 RuntimeException("REMOTE_FAIL: code:msg")| `LINEAR` | base × attempt | 500ms, 1000ms, 1500ms |
|---|---|---|
| `FIXED` | base(恒定) | 500ms, 500ms, 500ms |
RetryConfig config = RetryConfig.builder()
.maxRetries(3) // 最大重试 3 次
.baseDelayMs(500) // 基础延迟 500ms
.backoffStrategy(BackoffStrategy.LINEAR) // 线性退避
.maxConcurrentRetries(100) // 最大并发重试数
.businessKey("pushMsg:user123") // 业务标识
.enableFallback(true) // 启用降级
.retryFor(new Class[]{IOException.class}) // 仅重试 IO 异常
.noRetryFor(new Class[]{IllegalArgumentException.class}) // 参数异常不重试
.build();@Resource
private RetryExecutor retryExecutor;
// 同步执行,失败自动重试,所有重试完成后返回或抛异常
try {
String result = retryExecutor.execute(config, () -> {
return httpClient.get(url);
});
} catch (Exception e) {
// 所有重试都失败后的异常
}retryExecutor.executeAsync(config, () -> {
rocketProducer.send(message);
return null;
}, new RetryCallback<Void>() {
@Override
public void onSuccess(Void result, int attempt) {
log.info("发送成功,尝试次数: {}", attempt);
}
@Override
public void onFinalFailure(Exception e, int totalAttempts) {
log.error("发送最终失败,总尝试: {}", totalAttempts, e);
}
});| `onSuccess(T result, int attempt)` | 操作成功时调用(首次或重试后) |
|---|
两个方法都有默认空实现,可按需覆盖。
DynamicAdjustmentConfig 提供三种预置配置:
| 默认 | `defaultConfig()` | 30s | 2min | 80% | 2 | 启用 |
|---|---|---|---|---|---|---|
| 保守 | `conservativeConfig()` | 2min | 5min | 90% | 1 | 关闭 |
easyfk-thread — 高性能线程池管理,提升并发任务调度效率。