PowerJob 分布式任务调度实战:可视化控制台 + 容器化部署 + 故障转移
背景与价值
分布式任务调度是后端系统的"水电煤"——定时报表、订单关闭、数据同步、对账、清理、推送都依赖它。Java 生态对比:
| 框架 | 维护方 | 特性 | 适合 |
|---|
| Quartz | Terracotta | 老牌单体,集群需自己实现 | 传统企业单体应用 |
| XXL-Job | 大众点评 | 经典分布式,轻量 | 中小型项目 |
| Elastic-Job | 当当 + Apache | 生态丰富,依赖 ZK | 大型分布式 |
| PowerJob | QQ 音乐 | 新一代,多语言,Workflow | 复杂业务编排 |
| Spring Schedule | Spring | 简单,但无分布式 | 单机 |
本文聚焦 PowerJob V4.3.9(2024-03 GA),最大的卖点是"可视化工作流"——可以用 DAG 编排多作业的复杂依赖关系,比 XXL-Job 的"作业依赖"更强大。
核心架构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| ┌────────────────────────────────────────┐
│ PowerJob Server │
│ (调度中心,端口 7700 HTTP / 10010 OMS)│
│ ┌──────────┐ ┌──────────┐ ┌────────┐ │
│ │ 调度器 │ │ 执行器 │ │ 控制台 │ │
│ └─────┬────┘ └─────┬────┘ └────┬───┘ │
└────────┼────────────┼──────────┼──────┘
│ │ │
↓ ↓ ↓
┌────────────────────────────────────────┐
│ Worker 集群 │
│ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │Worker│ │Worker│ │Worker│ │
│ │ 1 │ │ 2 │ │ 3 │ │
│ └──────┘ └──────┘ └──────┘ │
└────────────────────────────────────────┘
↓
MySQL(持久化)
|
三大组件:
- Server:调度中心,1 个或 3 节点集群
- Worker:业务执行器,部署在业务应用内
- MySQL:存储任务定义 + 执行历史
Server 部署
1. 初始化数据库
1
2
3
4
5
6
7
| # 1. 创建数据库
mysql -u root -p{{MYSQL_PASSWORD}} -e \
"CREATE DATABASE powerjob CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;"
# 2. 初始化表结构
# https://github.com/PowerJob/PowerJob/blob/v4.3.9/others/powerjob-mysql.sql
mysql -u root -p{{MYSQL_PASSWORD}} powerjob < powerjob-mysql.sql
|
2. Docker Compose 一键部署(推荐)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| # docker-compose.yml
version: '3'
services:
powerjob-server:
image: powerjob/powerjob-server:V4.3.9
container_name: powerjob
restart: always
network_mode: host
environment:
- TZ=Asia/Shanghai
- PARAMS=--spring.profiles.active=product \
--spring.datasource.core.jdbc-url=jdbc:mysql://{{MYSQL_HOST}}:3306/powerjob?useUnicode=true&characterEncoding=UTF-8 \
--spring.datasource.core.username={{DB_USER}} \
--spring.datasource.core.password={{DB_PASSWORD}} \
--oms.http.port=10010 \
--server.address={{SERVER_IP}}
volumes:
- /data/powerjob-server:/root/powerjob/server
- ~/.m2:/root/.m2
|
3. 端口规划
| 端口 | 用途 |
|---|
| 7700 | HTTP 服务(控制台 + API) |
| 10010 | OMS(运维管理) |
| 10086 | Akka 节点间通信(集群) |
4. 控制台访问
1
| http://{{SERVER_IP}}:7700
|
默认账号:内置 admin(首次登录强制改密)。
Worker 集成(业务应用)
1. 引入依赖
1
2
3
4
5
| <dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>4.3.9</version>
</dependency>
|
2. application.yml
1
2
3
4
5
6
7
8
9
10
11
| powerjob:
worker:
enabled: true
allow-lazy-connect-server: false # 启动时立即连 Server
port: 9988
app-name: my-application # 必须与 Server 中注册的应用名一致
server-address: {{SERVER_HOST}}:7700
protocol: http
store-strategy: disk # 上下文存储:disk / memory
max-result-length: 4096
max-appended-wf-context-length: 4096
|
3. 注册应用(关键步骤)
首次启动 Worker 前必须先在 Server 控制台注册应用:
1
2
| 应用名:my-application # 与 worker 配置的 app-name 一致
密码:{{APP_SECRET}} # 自定义
|
否则 Worker 启动会报"应用未注册"。
4 大处理器类型
1. BasicProcessor(基础,单机执行)
1
2
3
4
5
6
7
8
9
10
11
12
13
| @Component
public class SimpleJob implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
String jobParams = context.getJobParams(); // 任务参数
long instanceId = context.getInstanceId(); // 实例 ID
log.info("任务执行: instanceId={}, params={}", instanceId, jobParams);
// 返回执行结果
return new ProcessResult(true, "执行成功 " + System.currentTimeMillis());
}
}
|
2. BroadcastProcessor(广播,所有 Worker 都执行一次)
1
2
3
4
5
6
7
8
9
10
| @Component
public class ClearCacheJob implements BroadcastProcessor {
@Override
public ProcessResult broadcast(TaskContext context) {
log.info("清理本地缓存");
localCache.clear();
return new ProcessResult(true);
}
}
|
典型场景:每台机器本地缓存清理、每台机器配置刷新。
3. MapProcessor(分片,多 Worker 并行执行)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| @Component
public class BatchInsertJob implements MapProcessor {
@Override
public ProcessResult process(TaskContext context) {
// 当前分片参数
int subTaskId = (int) Thread.currentThread().getId();
int totalSubTask = context.getControlPoint(); // 总分片数
// 假设有 100 万条数据要处理
int pageSize = 10000;
int totalPage = 100;
int startPage = subTaskId * (totalPage / totalSubTask);
int endPage = (subTaskId + 1) * (totalPage / totalSubTask);
for (int p = startPage; p < endPage; p++) {
List<User> users = userDao.findByPage(p, pageSize);
// 批量处理
}
return new ProcessResult(true, "处理 " + (endPage - startPage) + " 页");
}
}
|
典型场景:百万级数据批量同步、并行处理。
4. MapReduceProcessor(分片 + 汇总)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| @Component
public class StatisticsJob implements MapReduceProcessor {
// 1. Map 阶段:每台 Worker 算自己分区
@Override
public ProcessResult process(TaskContext context) {
List<Order> orders = orderDao.findByDateRange(...);
long totalAmount = orders.stream().mapToLong(Order::getAmount).sum();
return new ProcessResult(true, String.valueOf(totalAmount));
}
// 2. Reduce 阶段:汇总所有 Worker 结果
@Override
public ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) {
long total = 0;
for (TaskResult r : taskResults) {
total += Long.parseLong(r.getResult());
}
log.info("总销售额: {}", total);
return new ProcessResult(true, "Total: " + total);
}
}
|
典型场景:分布式统计、跨节点数据汇总。
6 种时间表达式
PowerJob 支持 6 种 cron-like 触发方式:
| 类型 | 配置 | 例子 |
|---|
| Cron | 0 0 2 * * ? | 每天凌晨 2 点 |
| 固定频率 | 60000 | 每 60 秒(毫秒) |
| 固定延迟 | 3000 | 上次执行完后等 3 秒 |
| API 触发 | 手动 | 运维控制台点击"执行一次" |
| 工作流 | DAG 编排 | 任务 A 完成后触发 B |
| OpenAPI | HTTP 触发 | 通过 API 远程触发 |
Cron 例子
1
2
3
4
5
6
7
8
9
10
11
| # 每天凌晨 2 点
time_expression: "0 0 2 * * ?"
# 每周一上午 9 点
time_expression: "0 0 9 ? * 2"
# 每 5 分钟
time_expression: "0 */5 * * * ?"
# 每月 1 号零点
time_expression: "0 0 0 1 * ?"
|
Workflow(DAG 工作流)
PowerJob 相对 XXL-Job 的最大优势——可视化 DAG 编排。
1. 在控制台创建工作流
1
2
3
| [开始] → [拉取数据] → [清洗] → [转换] → [入库] → [结束]
↓
[推送通知]
|
2. 工作流节点配置
每个节点对应一个 Processor,可设置:
- 失败重试次数:0 / 1 / 3
- 跳过条件:前置任务 X 失败时跳过
- 并发度:1 / 5 / 10
3. 实战场景
1
2
3
4
5
6
| 订单超时关闭工作流:
1. 扫描超时订单(每 5 分钟)
2. 关闭订单
3. 释放库存
4. 退款
5. 发送通知
|
故障转移与容错
1. 任务超时
1
2
3
4
5
| powerjob:
worker:
task-retry-num: 3 # 失败重试 3 次
instance-retry-num: 0 # 实例不重试
instance-time-limit: 0 # 0 = 不限时
|
2. 节点宕机
Worker 节点宕机后,Server 自动将任务重新派发到其他 Worker,无需人工介入。
3. 集群高可用
Server 3 节点集群 + MySQL 持久化:
1
2
3
4
5
6
7
8
| # node-1
- PARAMS=--spring.profiles.active=product --oms.http.port=10010 --server.address=10.0.0.1
# node-2
- PARAMS=--spring.profiles.active=product --oms.http.port=10010 --server.address=10.0.0.2
# node-3
- PARAMS=--spring.profiles.active=product --oms.http.port=10010 --server.address=10.0.0.3
|
3 节点通过 Akka 集群 选举 Leader(类似 ZK 选举),Leader 宕机 30s 内自动切换。
4. 报警
控制台支持配置告警:
- 邮件(
spring.mail.*) - 钉钉(Webhook)
- 企业微信(Webhook)
- Webhook 自定义
与 XXL-Job 对比
| 维度 | XXL-Job | PowerJob |
|---|
| UI | 经典、简洁 | 现代 + 暗色主题 + DAG |
| 任务类型 | 4 类(Bean / GLUE / Shell / Python) | 4 类(Basic / Broadcast / Map / MapReduce) |
| 工作流 | 简单父子依赖 | DAG 可视化编排 |
| 容器支持 | 一般 | K8s 友好(自带 Operator 计划中) |
| 多语言 | Java only | Java + Python + Shell + HTTP |
| 集群 | 集群分片 | Akka 集群 + 故障自动转移 |
| 上手难度 | 简单 | 中等 |
选型建议:
- 简单 CRON 任务:XXL-Job(轻量、上手快)
- 复杂业务编排:PowerJob(DAG 工作流、MapReduce)
实战坑
1. Worker 启动失败:应用未注册
1
| ERROR: app not registered
|
解决:先在 Server 控制台 → 应用管理 → 注册应用 → app-name 必须与 powerjob.worker.app-name 一致。
2. 时区问题
1
2
3
4
| # 容器必须设置时区
environment:
- TZ=Asia/Shanghai
- JAVA_OPTS=-Duser.timezone=Asia/Shanghai
|
否则 Cron 任务触发时间会差 8 小时。
3. 大量任务调度延迟
1
2
| Server 默认单节点:1000 QPS 调度能力
3 节点集群:3000 QPS
|
如果任务量 > 10万/天,建议 Server 集群 + Worker 多实例。
4. 任务执行时间超过间隔
1
| 固定频率 60s 任务,但任务本身耗时 80s
|
PowerJob 默认跳过当前任务,等下个周期。如果需要"执行完再等 60s",改用 fixed_delay 类型。
前置知识与下一步
前置:
- Spring Boot 基础
- Docker 基础
- Cron 表达式
下一步:
- 用 Workflow 编排复杂业务(订单超时关闭流程)
- 集成 告警 到企业 IM(钉钉 / 企微)
- 监控:Prometheus + Grafana 抓取 PowerJob 指标
小结
PowerJob 是国产分布式任务调度的新一代标杆,核心优势:
- DAG 工作流(可视化编排)—— 复杂业务一图胜千言
- Map / MapReduce —— 大数据量场景天然支持
- 多语言 Worker(Java + Python + Shell + HTTP)—— 异构系统集成友好
- 故障自动转移 + Akka 集群 —— 生产可用性高
如果你的项目有 10+ 定时任务 + 复杂依赖关系,XXL-Job 已经不够用,建议直接上 PowerJob。
2024+ 视角:PowerJob V5 + K8s 适配 + AI 运维
PowerJob V5(2024-12 GA)关键变化
- V5 全面拥抱 K8s:内置 K8s Operator 风格的
PowerJobWorker CRD - Akka 升级:
Akka 2.6 → Pekko 1.0(Apache 顶级项目,Akka 改 License 后社区 fork) - 存储后端可插拔:H2 / MySQL / PostgreSQL / 达梦 / 人大金仓,V5 起把"存储"从核心解耦
- 多租户:单个 Server 集群可承载多业务方,namespace 隔离
- 工作流 2.0:DAG 节点支持循环 + 条件分支 + 子工作流
K8s 部署的 2024+ 范式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
| # powerjob-server.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: powerjob-server
spec:
replicas: 3
selector:
matchLabels:
app: powerjob-server
template:
metadata:
labels:
app: powerjob-server
spec:
containers:
- name: server
image: powerjob/powerjob-server:V5.0.0
ports:
- containerPort: 7700
- containerPort: 10010
env:
- name: PARAMS
value: "--spring.profiles.active=product
--spring.datasource.core.jdbc-url=jdbc:mysql://mysql:3306/powerjob
--spring.datasource.core.username=powerjob
--spring.datasource.core.password=${DB_PASSWORD}
--oms.http.port=10010"
- name: TZ
value: Asia/Shanghai
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
---
apiVersion: v1
kind: Service
metadata:
name: powerjob-server
spec:
type: ClusterIP
selector:
app: powerjob-server
ports:
- name: http
port: 7700
- name: oms
port: 10010
|
Worker 集成 2024+ 升级
1
2
3
4
5
| <dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>5.0.0</version>
</dependency>
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| powerjob:
worker:
enabled: true
port: 9988
app-name: my-application
server-address: powerjob-server:7700
protocol: http
# V5 新增
max-result-length: 8192
store-strategy: disk
# 健康检查
health-check-interval: 30
# 标签(V5 多租户)
tag: production
namespace: tenant-a
|
V5 工作流 2.0 实战
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| # 订单超时关闭工作流(V5 新增循环节点)
nodes:
- id: scan_orders
type: Basic
processor: scanTimeoutOrders
- id: close_order
type: Basic
processor: closeOrder
dependsOn: [scan_orders]
- id: release_stock
type: Basic
processor: releaseStock
dependsOn: [close_order]
- id: refund
type: Basic
processor: refundOrder
dependsOn: [release_stock]
- id: notify_user
type: Basic
processor: sendNotification
dependsOn: [refund]
# V5 新增:失败重试 + 告警
retryCount: 3
alertEnabled: true
alertChannels: [dingtalk, feishu]
|
2024+ 监控与告警
PowerJob V5 内置 Prometheus metrics 端点:
1
2
3
4
5
6
7
8
9
| # application.yml(Server)
management:
endpoints:
web:
exposure:
include: health,info,prometheus
metrics:
tags:
application: powerjob-server
|
Grafana Dashboard:
- 任务执行 P50 / P95 / P99 延迟
- Worker 在线数 / 任务队列长度
- DAG 节点完成率
2024+ 选型对比
| 任务调度 | 2024+ 适用 |
|---|
| XXL-Job | 简单 CRON 任务(< 50 个) |
| PowerJob V5 | 中大型项目 + DAG 工作流 |
| Apache DolphinScheduler | 大数据 ETL(Spark / Flink 集成) |
| Temporal | 复杂长事务(> 1 天的 saga) |
| Airflow | Python 生态(数据科学) |
| Argo Workflows | K8s 原生 DAG(容器级任务) |
2024+ 实战坑
- Akka License 变化:Akka 2.7+ 改 BSL 协议(商业收费),PowerJob V5 改用 Apache Pekko——老项目升级要清 classpath
- V5 默认存储改为外置:不再内置 H2 内存模式,必须配 MySQL/PG
- 多租户 namespace 隔离:V5 起 Worker 必须带
namespace 配置,否则启动失败 - K8s 部署:Server 至少 2 副本(防止单点脑裂),但不要超过 5 副本(Akka 集群规模 5 最佳)
PowerJob V5 推荐组合
1
2
3
4
5
6
7
8
9
10
| 中型互联网业务(Java + Python 异构)
├── PowerJob Server V5 × 3 副本(K8s Deployment)
├── MySQL 8.4 LTS 存储(StatefulSet + 持久卷)
├── PowerJob Worker × N 副本(与应用一起跑)
│ ├── Java 微服务 Worker
│ ├── Python 数据同步 Worker(HTTP 类型)
│ └── Shell 清理任务 Worker
├── DAG 工作流编排业务(订单 / 报表 / 同步)
├── Prometheus + Grafana 监控
└── 告警:钉钉 + 飞书 + 企业微信
|