Featured image of post PowerJob 分布式任务调度实战:可视化控制台 + 容器化部署 + 故障转移

PowerJob 分布式任务调度实战:可视化控制台 + 容器化部署 + 故障转移

PowerJob V4.3.9 + Spring Boot Worker + 10 类作业处理器 + 分片广播 + Map/MapReduce 完整路径

PowerJob 分布式任务调度实战:可视化控制台 + 容器化部署 + 故障转移

背景与价值

分布式任务调度是后端系统的"水电煤"——定时报表、订单关闭、数据同步、对账、清理、推送都依赖它。Java 生态对比:

框架维护方特性适合
QuartzTerracotta老牌单体,集群需自己实现传统企业单体应用
XXL-Job大众点评经典分布式,轻量中小型项目
Elastic-Job当当 + Apache生态丰富,依赖 ZK大型分布式
PowerJobQQ 音乐新一代,多语言,Workflow复杂业务编排
Spring ScheduleSpring简单,但无分布式单机

本文聚焦 PowerJob V4.3.9(2024-03 GA),最大的卖点是"可视化工作流"——可以用 DAG 编排多作业的复杂依赖关系,比 XXL-Job 的"作业依赖"更强大。

核心架构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
┌────────────────────────────────────────┐
│           PowerJob Server              │
│   (调度中心,端口 7700 HTTP / 10010 OMS)│
│  ┌──────────┐ ┌──────────┐ ┌────────┐ │
│  │  调度器  │ │  执行器  │ │ 控制台 │ │
│  └─────┬────┘ └─────┬────┘ └────┬───┘ │
└────────┼────────────┼──────────┼──────┘
         │            │          │
         ↓            ↓          ↓
┌────────────────────────────────────────┐
│            Worker 集群                 │
│  ┌──────┐  ┌──────┐  ┌──────┐         │
│  │Worker│  │Worker│  │Worker│         │
│  │  1   │  │  2   │  │  3   │         │
│  └──────┘  └──────┘  └──────┘         │
└────────────────────────────────────────┘
      MySQL(持久化)

三大组件

  • Server:调度中心,1 个或 3 节点集群
  • Worker:业务执行器,部署在业务应用内
  • MySQL:存储任务定义 + 执行历史

Server 部署

1. 初始化数据库

1
2
3
4
5
6
7
# 1. 创建数据库
mysql -u root -p{{MYSQL_PASSWORD}} -e \
  "CREATE DATABASE powerjob CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;"

# 2. 初始化表结构
# https://github.com/PowerJob/PowerJob/blob/v4.3.9/others/powerjob-mysql.sql
mysql -u root -p{{MYSQL_PASSWORD}} powerjob < powerjob-mysql.sql

2. Docker Compose 一键部署(推荐)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# docker-compose.yml
version: '3'
services:
  powerjob-server:
    image: powerjob/powerjob-server:V4.3.9
    container_name: powerjob
    restart: always
    network_mode: host
    environment:
      - TZ=Asia/Shanghai
      - PARAMS=--spring.profiles.active=product \
               --spring.datasource.core.jdbc-url=jdbc:mysql://{{MYSQL_HOST}}:3306/powerjob?useUnicode=true&characterEncoding=UTF-8 \
               --spring.datasource.core.username={{DB_USER}} \
               --spring.datasource.core.password={{DB_PASSWORD}} \
               --oms.http.port=10010 \
               --server.address={{SERVER_IP}}
    volumes:
      - /data/powerjob-server:/root/powerjob/server
      - ~/.m2:/root/.m2
1
docker-compose up -d

3. 端口规划

端口用途
7700HTTP 服务(控制台 + API)
10010OMS(运维管理)
10086Akka 节点间通信(集群)

4. 控制台访问

1
http://{{SERVER_IP}}:7700

默认账号:内置 admin(首次登录强制改密)。

Worker 集成(业务应用)

1. 引入依赖

1
2
3
4
5
<dependency>
    <groupId>tech.powerjob</groupId>
    <artifactId>powerjob-worker-spring-boot-starter</artifactId>
    <version>4.3.9</version>
</dependency>

2. application.yml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
powerjob:
  worker:
    enabled: true
    allow-lazy-connect-server: false   # 启动时立即连 Server
    port: 9988
    app-name: my-application            # 必须与 Server 中注册的应用名一致
    server-address: {{SERVER_HOST}}:7700
    protocol: http
    store-strategy: disk                # 上下文存储:disk / memory
    max-result-length: 4096
    max-appended-wf-context-length: 4096

3. 注册应用(关键步骤)

首次启动 Worker 前必须先在 Server 控制台注册应用

1
2
应用名:my-application       # 与 worker 配置的 app-name 一致
密码:{{APP_SECRET}}         # 自定义

否则 Worker 启动会报"应用未注册"。

4 大处理器类型

1. BasicProcessor(基础,单机执行)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Component
public class SimpleJob implements BasicProcessor {

    @Override
    public ProcessResult process(TaskContext context) throws Exception {
        String jobParams = context.getJobParams();  // 任务参数
        long instanceId = context.getInstanceId();   // 实例 ID
        log.info("任务执行: instanceId={}, params={}", instanceId, jobParams);

        // 返回执行结果
        return new ProcessResult(true, "执行成功 " + System.currentTimeMillis());
    }
}

2. BroadcastProcessor(广播,所有 Worker 都执行一次)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@Component
public class ClearCacheJob implements BroadcastProcessor {

    @Override
    public ProcessResult broadcast(TaskContext context) {
        log.info("清理本地缓存");
        localCache.clear();
        return new ProcessResult(true);
    }
}

典型场景:每台机器本地缓存清理、每台机器配置刷新。

3. MapProcessor(分片,多 Worker 并行执行)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
public class BatchInsertJob implements MapProcessor {

    @Override
    public ProcessResult process(TaskContext context) {
        // 当前分片参数
        int subTaskId = (int) Thread.currentThread().getId();
        int totalSubTask = context.getControlPoint();   // 总分片数

        // 假设有 100 万条数据要处理
        int pageSize = 10000;
        int totalPage = 100;
        int startPage = subTaskId * (totalPage / totalSubTask);
        int endPage = (subTaskId + 1) * (totalPage / totalSubTask);

        for (int p = startPage; p < endPage; p++) {
            List<User> users = userDao.findByPage(p, pageSize);
            // 批量处理
        }

        return new ProcessResult(true, "处理 " + (endPage - startPage) + " 页");
    }
}

典型场景:百万级数据批量同步、并行处理。

4. MapReduceProcessor(分片 + 汇总)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
public class StatisticsJob implements MapReduceProcessor {

    // 1. Map 阶段:每台 Worker 算自己分区
    @Override
    public ProcessResult process(TaskContext context) {
        List<Order> orders = orderDao.findByDateRange(...);
        long totalAmount = orders.stream().mapToLong(Order::getAmount).sum();
        return new ProcessResult(true, String.valueOf(totalAmount));
    }

    // 2. Reduce 阶段:汇总所有 Worker 结果
    @Override
    public ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) {
        long total = 0;
        for (TaskResult r : taskResults) {
            total += Long.parseLong(r.getResult());
        }
        log.info("总销售额: {}", total);
        return new ProcessResult(true, "Total: " + total);
    }
}

典型场景:分布式统计、跨节点数据汇总。

6 种时间表达式

PowerJob 支持 6 种 cron-like 触发方式:

类型配置例子
Cron0 0 2 * * ?每天凌晨 2 点
固定频率60000每 60 秒(毫秒)
固定延迟3000上次执行完后等 3 秒
API 触发手动运维控制台点击"执行一次"
工作流DAG 编排任务 A 完成后触发 B
OpenAPIHTTP 触发通过 API 远程触发

Cron 例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 每天凌晨 2 点
time_expression: "0 0 2 * * ?"

# 每周一上午 9 点
time_expression: "0 0 9 ? * 2"

# 每 5 分钟
time_expression: "0 */5 * * * ?"

# 每月 1 号零点
time_expression: "0 0 0 1 * ?"

Workflow(DAG 工作流)

PowerJob 相对 XXL-Job 的最大优势——可视化 DAG 编排。

1. 在控制台创建工作流

1
2
3
[开始] → [拉取数据] → [清洗] → [转换] → [入库] → [结束]
         [推送通知]

2. 工作流节点配置

每个节点对应一个 Processor,可设置:

  • 失败重试次数:0 / 1 / 3
  • 跳过条件:前置任务 X 失败时跳过
  • 并发度:1 / 5 / 10

3. 实战场景

1
2
3
4
5
6
订单超时关闭工作流:
1. 扫描超时订单(每 5 分钟)
2. 关闭订单
3. 释放库存
4. 退款
5. 发送通知

故障转移与容错

1. 任务超时

1
2
3
4
5
powerjob:
  worker:
    task-retry-num: 3               # 失败重试 3 次
    instance-retry-num: 0           # 实例不重试
    instance-time-limit: 0          # 0 = 不限时

2. 节点宕机

Worker 节点宕机后,Server 自动将任务重新派发到其他 Worker,无需人工介入。

3. 集群高可用

Server 3 节点集群 + MySQL 持久化

1
2
3
4
5
6
7
8
# node-1
- PARAMS=--spring.profiles.active=product --oms.http.port=10010 --server.address=10.0.0.1

# node-2
- PARAMS=--spring.profiles.active=product --oms.http.port=10010 --server.address=10.0.0.2

# node-3
- PARAMS=--spring.profiles.active=product --oms.http.port=10010 --server.address=10.0.0.3

3 节点通过 Akka 集群 选举 Leader(类似 ZK 选举),Leader 宕机 30s 内自动切换。

4. 报警

控制台支持配置告警:

  • 邮件(spring.mail.*
  • 钉钉(Webhook)
  • 企业微信(Webhook)
  • Webhook 自定义

与 XXL-Job 对比

维度XXL-JobPowerJob
UI经典、简洁现代 + 暗色主题 + DAG
任务类型4 类(Bean / GLUE / Shell / Python)4 类(Basic / Broadcast / Map / MapReduce)
工作流简单父子依赖DAG 可视化编排
容器支持一般K8s 友好(自带 Operator 计划中)
多语言Java onlyJava + Python + Shell + HTTP
集群集群分片Akka 集群 + 故障自动转移
上手难度简单中等

选型建议

  • 简单 CRON 任务:XXL-Job(轻量、上手快)
  • 复杂业务编排:PowerJob(DAG 工作流、MapReduce)

实战坑

1. Worker 启动失败:应用未注册

1
ERROR: app not registered

解决:先在 Server 控制台 → 应用管理 → 注册应用 → app-name 必须与 powerjob.worker.app-name 一致。

2. 时区问题

1
2
3
4
# 容器必须设置时区
environment:
  - TZ=Asia/Shanghai
  - JAVA_OPTS=-Duser.timezone=Asia/Shanghai

否则 Cron 任务触发时间会差 8 小时。

3. 大量任务调度延迟

1
2
Server 默认单节点:1000 QPS 调度能力
3 节点集群:3000 QPS

如果任务量 > 10万/天,建议 Server 集群 + Worker 多实例。

4. 任务执行时间超过间隔

1
固定频率 60s 任务,但任务本身耗时 80s

PowerJob 默认跳过当前任务,等下个周期。如果需要"执行完再等 60s",改用 fixed_delay 类型。

前置知识与下一步

前置

  • Spring Boot 基础
  • Docker 基础
  • Cron 表达式

下一步

  • Workflow 编排复杂业务(订单超时关闭流程)
  • 集成 告警 到企业 IM(钉钉 / 企微)
  • 监控:Prometheus + Grafana 抓取 PowerJob 指标

小结

PowerJob 是国产分布式任务调度的新一代标杆,核心优势:

  • DAG 工作流(可视化编排)—— 复杂业务一图胜千言
  • Map / MapReduce —— 大数据量场景天然支持
  • 多语言 Worker(Java + Python + Shell + HTTP)—— 异构系统集成友好
  • 故障自动转移 + Akka 集群 —— 生产可用性高

如果你的项目有 10+ 定时任务 + 复杂依赖关系,XXL-Job 已经不够用,建议直接上 PowerJob。


2024+ 视角:PowerJob V5 + K8s 适配 + AI 运维

PowerJob V5(2024-12 GA)关键变化

  • V5 全面拥抱 K8s:内置 K8s Operator 风格的 PowerJobWorker CRD
  • Akka 升级Akka 2.6 → Pekko 1.0(Apache 顶级项目,Akka 改 License 后社区 fork)
  • 存储后端可插拔:H2 / MySQL / PostgreSQL / 达梦 / 人大金仓,V5 起把"存储"从核心解耦
  • 多租户:单个 Server 集群可承载多业务方,namespace 隔离
  • 工作流 2.0:DAG 节点支持循环 + 条件分支 + 子工作流

K8s 部署的 2024+ 范式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# powerjob-server.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: powerjob-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app: powerjob-server
  template:
    metadata:
      labels:
        app: powerjob-server
    spec:
      containers:
      - name: server
        image: powerjob/powerjob-server:V5.0.0
        ports:
        - containerPort: 7700
        - containerPort: 10010
        env:
        - name: PARAMS
          value: "--spring.profiles.active=product
                  --spring.datasource.core.jdbc-url=jdbc:mysql://mysql:3306/powerjob
                  --spring.datasource.core.username=powerjob
                  --spring.datasource.core.password=${DB_PASSWORD}
                  --oms.http.port=10010"
        - name: TZ
          value: Asia/Shanghai
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
---
apiVersion: v1
kind: Service
metadata:
  name: powerjob-server
spec:
  type: ClusterIP
  selector:
    app: powerjob-server
  ports:
  - name: http
    port: 7700
  - name: oms
    port: 10010

Worker 集成 2024+ 升级

1
2
3
4
5
<dependency>
    <groupId>tech.powerjob</groupId>
    <artifactId>powerjob-worker-spring-boot-starter</artifactId>
    <version>5.0.0</version>
</dependency>
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
powerjob:
  worker:
    enabled: true
    port: 9988
    app-name: my-application
    server-address: powerjob-server:7700
    protocol: http
    # V5 新增
    max-result-length: 8192
    store-strategy: disk
    # 健康检查
    health-check-interval: 30
    # 标签(V5 多租户)
    tag: production
    namespace: tenant-a

V5 工作流 2.0 实战

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 订单超时关闭工作流(V5 新增循环节点)
nodes:
  - id: scan_orders
    type: Basic
    processor: scanTimeoutOrders

  - id: close_order
    type: Basic
    processor: closeOrder
    dependsOn: [scan_orders]

  - id: release_stock
    type: Basic
    processor: releaseStock
    dependsOn: [close_order]

  - id: refund
    type: Basic
    processor: refundOrder
    dependsOn: [release_stock]

  - id: notify_user
    type: Basic
    processor: sendNotification
    dependsOn: [refund]
    # V5 新增:失败重试 + 告警
    retryCount: 3
    alertEnabled: true
    alertChannels: [dingtalk, feishu]

2024+ 监控与告警

PowerJob V5 内置 Prometheus metrics 端点

1
2
3
4
5
6
7
8
9
# application.yml(Server)
management:
  endpoints:
    web:
      exposure:
        include: health,info,prometheus
  metrics:
    tags:
      application: powerjob-server

Grafana Dashboard

  • 任务执行 P50 / P95 / P99 延迟
  • Worker 在线数 / 任务队列长度
  • DAG 节点完成率

2024+ 选型对比

任务调度2024+ 适用
XXL-Job简单 CRON 任务(< 50 个)
PowerJob V5中大型项目 + DAG 工作流
Apache DolphinScheduler大数据 ETL(Spark / Flink 集成)
Temporal复杂长事务(> 1 天的 saga)
AirflowPython 生态(数据科学)
Argo WorkflowsK8s 原生 DAG(容器级任务)

2024+ 实战坑

  • Akka License 变化:Akka 2.7+ 改 BSL 协议(商业收费),PowerJob V5 改用 Apache Pekko——老项目升级要清 classpath
  • V5 默认存储改为外置:不再内置 H2 内存模式,必须配 MySQL/PG
  • 多租户 namespace 隔离:V5 起 Worker 必须带 namespace 配置,否则启动失败
  • K8s 部署:Server 至少 2 副本(防止单点脑裂),但不要超过 5 副本(Akka 集群规模 5 最佳)

PowerJob V5 推荐组合

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
中型互联网业务(Java + Python 异构)
├── PowerJob Server V5 × 3 副本(K8s Deployment)
├── MySQL 8.4 LTS 存储(StatefulSet + 持久卷)
├── PowerJob Worker × N 副本(与应用一起跑)
│   ├── Java 微服务 Worker
│   ├── Python 数据同步 Worker(HTTP 类型)
│   └── Shell 清理任务 Worker
├── DAG 工作流编排业务(订单 / 报表 / 同步)
├── Prometheus + Grafana 监控
└── 告警:钉钉 + 飞书 + 企业微信
使用 Hugo 构建
主题 StackJimmy 设计