用 Temporal 写业务代码确实爽——“像写单机程序一样处理分布式事务”。但爽完之后,总有一个声音在脑后响起:它真的可靠吗? Server 挂了怎么办?网络断了呢?回滚本身失败了呢?这篇文章从底层机制入手,把这些问题一个个拆碎。
一、先搞清楚 Temporal 的架构边界
在讨论可靠性之前,必须理清 Temporal 单机部署下各组件的职责:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
┌──────────────────────────────────────────────────┐
│ Temporal Server │
│ ┌──────────┐ ┌──────────┐ ┌───────────────┐ │
│ │ Frontend │ │ Matching │ │ History │ │
│ │ Service │ │ Service │ │ Service │ │
│ └────┬─────┘ └────┬─────┘ └────────┬──────┘ │
│ │ │ │ │
│ └─────────────┼─────────────────┘ │
│ │ │
│ ┌──────▼ ───────┐ │
│ │ Persistence │ │
│ │ (MySQL/PG/ │ │
│ │ SQLite) │ │
│ └───────────────┘ │
└──────────────────────────────────────────────────┘
▲ ▲
│ gRPC │ gRPC
│ │
┌───────┴──────────┐ ┌───────┴──────────┐
│ Temporal CLI │ │ Worker │
│ request Workflow │ │ excute Workflow │
└──────────────────┘ └──────────────────┘
|
各组件的职责:
- Frontend Service:Temporal Server 的"门面",所有外部请求(启动 Workflow、查询状态、发送 Signal)都通过它的 gRPC 接口进入。它负责请求校验、路由、限流,本身不存储状态。
- Matching Service:任务分发器。维护 Task Queue,负责将待执行的 Activity/Workflow Task 匹配给空闲的 Worker。可以理解为"派活的调度员"。
- History Service:核心中的核心。负责维护每个 Workflow 的 Event History(事件历史),处理状态转换、Timer 定时器、Event 持久化。Temporal 的可靠性保障,绝大部分发生在这一层。
- Temporal CLI(tctl / temporal):命令行客户端工具,用于与 Server 交互——启动 Workflow、查询状态、发送 Signal、管理 Namespace 等。在代码中通常用 Temporal SDK 的
client.Dial() 代替,二者角色等价,都是"发起请求的一方"。 - Worker:你自己写的程序,注册了 Workflow 和 Activity 函数后连接到 Server。Worker 不断轮询 Task Queue 领取任务并执行,本身是无状态的——所有进度和结果都上报给 Server 持久化。Worker 挂了换一个就行,不影响 Workflow 的状态。
关键认知:
| 组件 | 崩溃影响 | 状态存储位置 |
|---|
| Worker | Activity 执行中断 | 无状态(状态在 Server 端) |
| Temporal Server | 新任务无法调度 | 状态持久化在数据库 |
| 数据库 (MySQL/PG) | 一切完蛋 | 这是真正的生命线 |
核心结论:Temporal 单机部署的可靠性 = 数据库的可靠性。Server 和 Worker 都是可恢复的,只有数据库丢了才是真出事。
二、Server 宕机:Event History 如何保命
2.1 持久化机制:先写库再应答
Temporal Server 的 History Service 在处理每一步操作时,严格遵循 Write-Ahead 原则:
1
2
3
4
5
6
7
8
9
10
11
12
13
| Worker 上报 Activity 完成
│
▼
History Service 收到请求
│
▼
① 将 Event 写入数据库 (INSERT INTO events ...) <- 先落盘
│
▼
② 返回 ACK 给 Worker <- 再应答
│
▼
③ 调度下一个 Activity(放入 Matching Service 的 Task Queue)
|
这意味着:如果 Server 在步骤 ② 之前崩溃——Event 已经写入数据库了。重启后 History Service 从数据库恢复状态,Workflow 继续向前。
如果 Server 在步骤 ① 之前崩溃——这个 Event 确实丢了。但 Worker 收不到 ACK,会触发重试,Activity 会被重新执行,而 Activity 是我们开发者自己写的业务逻辑,我们必须自己保证好幂等性(足够了解这个框架才不容易出错)。
2.2 崩溃恢复的三种场景
场景一:Server 在 Activity 执行期间崩溃
1
2
3
4
5
6
7
8
| Timeline:
t0: Workflow 调度 ActivityA
t1: Worker 领取 ActivityA 并开始执行
t2: Server 崩溃
t3: ActivityA 执行完毕,Worker 尝试上报 -> 失败 (连接断开)
t4: Server 重启
t5: Worker 重新上报 ActivityA 结果 -> Server 接受
t6: Workflow 继续执行 ActivityB
|
关键点:Worker 有内置的重连机制,会不断尝试连接 Server。Activity 的结果不会因为 Server 短暂宕机而丢失。
场景二:Server 在调度下一个 Activity 时崩溃
1
2
3
4
5
6
7
| Timeline:
t0: ActivityA 完成,Event 已写入数据库
t1: Server 准备调度 ActivityB
t2: Server 崩溃 (ActivityB 还没放入 Task Queue)
t3: Server 重启
t4: History Service 做 Transfer Task -> 发现 ActivityB 需要调度
t5: ActivityB 被放入 Task Queue,Worker 领取执行
|
关键点:Temporal 有一个 Transfer Queue 机制。已持久化但未派发的任务,在 Server 重启后会被 Transfer Task Processor 重新扫描并派发。
场景三:Server 在 Workflow 刚启动时崩溃
1
2
3
4
5
6
7
8
9
10
| // 客户端发起 Workflow
run, err := client.ExecuteWorkflow(ctx, options, MyWorkflow, input)
if err != nil {
// 连接失败 — 但 Workflow 可能已经创建成功了!
// 需要用 WorkflowID 做幂等查询
existing, _ := client.DescribeWorkflowExecution(ctx, workflowID, "")
if existing != nil {
log.Println("Workflow 已在运行中,无需重复创建")
}
}
|
排错实录:我曾遇到过一个 bug——客户端收到超时错误后重试创建 Workflow,结果数据库里出现了两个相同 WorkflowID 的记录。后来发现是 WorkflowID 没设置好,Temporal 默认允许同 ID 重建。解法是设置 WorkflowIdReusePolicy:
1
2
3
4
5
| options := client.StartWorkflowOptions{
ID: "order-" + orderID, // 业务唯一 ID
TaskQueue: "order-queue",
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
}
|
2.3 Timer 和 Schedule 的持久化
Temporal 的 Timer(workflow.Sleep)、Schedule(定时触发)也是持久化在数据库中的:
1
2
3
| // 这个 Sleep 不怕 Server 宕机
workflow.Sleep(ctx, 24*time.Hour)
// 24 小时后继续执行 — 即使中间 Server 重启了 10 次
|
原理:workflow.Sleep 生成一个 TimerStarted Event,定时器的触发时间写入数据库。Server 重启后,Timer 模块重新加载所有 pending timer,到时间就触发 TimerFired Event。
这和 time.Sleep 完全不同——后者一旦进程退出就没了。
三、网络抖动:Activity 外部调用的三重保障
3.1 第一层:RetryPolicy 自动重试
当 Activity 调用外部服务失败(网络超时、503、连接拒绝),Temporal 自动按 RetryPolicy 重试:
1
2
3
4
5
6
7
8
9
10
11
12
13
| activityOptions := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second, // 首次重试间隔
BackoffCoefficient: 2.0, // 指数退避
MaximumInterval: time.Minute, // 最大间隔
MaximumAttempts: 5, // 最多重试 5 次
NonRetryableErrorTypes: []string{ // 这些错误不重试
"InvalidArgumentError",
"BusinessValidationError",
},
},
}
|
重试时间线:
1
2
3
4
5
| 尝试 1: t=0s -> 失败 (网络超时)
尝试 2: t=1s -> 失败 (503)
尝试 3: t=3s -> 失败 (连接拒绝)
尝试 4: t=7s -> 失败 (网络超时)
尝试 5: t=15s -> 成功 (服务恢复)
|
注意区分两个超时:
| 超时类型 | 含义 | 建议值 |
|---|
StartToCloseTimeout | 单次 Activity 执行上限 | 根据业务合理预估 |
ScheduleToCloseTimeout | 从调度到最终完成的总上限 (含所有重试) | StartToClose × MaxAttempts |
3.2 第二层:Heartbeat 长时间任务的生命线
对于执行时间可能很长的 Activity(比如调用第三方 API 批量处理数据),仅靠超时机制不够。Temporal 提供了 Heartbeat(心跳) 机制:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| func BatchProcessActivity(ctx context.Context, items []Item) error {
for i, item := range items {
// 处理每个 item
result, err := thirdPartyAPI.Process(item)
if err != nil {
return err
}
// 上报心跳 + 进度
activity.RecordHeartbeat(ctx, i)
// 检查是否被取消
if ctx.Err() != nil {
return ctx.Err()
}
}
return nil
}
|
1
2
3
4
5
| // Workflow 端配置心跳超时
activityOptions := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Minute,
HeartbeatTimeout: 30 * time.Second, // 30 秒没心跳就认为挂了
}
|
Heartbeat 的两个核心能力:
- 故障检测:如果 Worker 宕机或网络断开,Server 在 HeartbeatTimeout 后判定 Activity 失败,调度重试
- 进度恢复:Heartbeat 携带的
details 参数在重试时可以恢复
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| func BatchProcessActivity(ctx context.Context, items []Item) error {
// 检查上次心跳进度,实现断点续传
var startIndex int
if activity.HasHeartbeatDetails(ctx) {
activity.GetHeartbeatDetails(ctx, &startIndex)
startIndex++ // 从下一个开始
}
for i := startIndex; i < len(items); i++ {
result, err := thirdPartyAPI.Process(items[i])
if err != nil {
return err
}
activity.RecordHeartbeat(ctx, i)
}
return nil
}
|
排错实录:有一次批量处理 5000 条记录的 Activity,处理到 3000 条时 Worker 被 OOM Kill。重试后又从第 0 条开始,结果前 3000 条被重复处理了。加上 Heartbeat 进度恢复后,重试直接从第 3001 条继续。
3.3 第三层:Activity 幂等性设计
即使有了重试和心跳,还有一个绕不开的问题:Activity 可能被重复执行。
典型场景:
1
2
3
4
| Worker 执行 Activity -> 调用支付 API -> 支付成功
Worker 上报结果给 Server -> 网络超时
Server 没收到确认 -> 重新调度这个 Activity
Worker 再次执行 -> 又调了一次支付 API -> 重复扣款!
|
解决方案:幂等 Token
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| func ChargePayment(ctx context.Context, order Order) error {
// 用 Activity 信息生成幂等 Key
activityInfo := activity.GetInfo(ctx)
idempotencyKey := fmt.Sprintf("charge-%s-%s-%d",
activityInfo.WorkflowExecution.ID,
activityInfo.ActivityID,
activityInfo.Attempt, // 包含重试次数
)
return paymentClient.ChargeWithIdempotencyKey(
order.UserID,
order.Amount,
idempotencyKey,
)
}
|
幂等设计原则:
| 操作类型 | 天然幂等? | 处理方式 |
|---|
| 查询 (GET) | 是 | 无需处理 |
| 创建 (POST) | 否 | 用唯一业务 ID 去重 |
| 更新 (PUT) | 看情况 | 版本号 / 乐观锁 |
| 删除 (DELETE) | 看情况 | 先查后删 / 忽略 404 |
| 扣款/转账 | 绝对不是 | 必须用 幂等 Token |
四、回滚失败:补偿链的终极兜底
4.1 问题:补偿本身也会失败
Saga 模式中,当步骤 N 失败时,需要依次回滚步骤 N-1, N-2, …, 1。但如果回滚步骤本身也失败了呢?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| func OrderWorkflow(ctx workflow.Context, order Order) error {
// Step 1: 扣款
err := workflow.ExecuteActivity(ctx, ChargePayment, order).Get(ctx, nil)
if err != nil {
return err
}
// Step 2: 扣库存 -> 失败!
err = workflow.ExecuteActivity(ctx, DeductInventory, order).Get(ctx, nil)
if err != nil {
// 补偿:退款 -> 但这也可能失败!
compensateErr := workflow.ExecuteActivity(ctx, RefundPayment, order).Get(ctx, nil)
if compensateErr != nil {
// 补偿也失败了,怎么办?
}
return err
}
return nil
}
|
4.2 Temporal 的兜底策略
策略一:补偿 Activity 使用独立的、更激进的 RetryPolicy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| func OrderWorkflow(ctx workflow.Context, order Order) error {
// 正常 Activity 的配置
normalCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 5,
},
})
// 补偿 Activity 的配置 — 更激进
compensateCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 1.5,
MaximumInterval: 5 * time.Minute,
MaximumAttempts: 20, // 补偿最多重试 20 次
},
})
// 正向操作
err := workflow.ExecuteActivity(normalCtx, ChargePayment, order).Get(ctx, nil)
if err != nil {
return err
}
err = workflow.ExecuteActivity(normalCtx, DeductInventory, order).Get(ctx, nil)
if err != nil {
// 用更激进的策略执行补偿
_ = workflow.ExecuteActivity(compensateCtx, RefundPayment, order).Get(ctx, nil)
return err
}
return nil
}
|
策略二:补偿失败后冻结 + 报警
如果补偿重试多次后还是失败,说明可能出了严重问题。此时应该:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| func OrderWorkflow(ctx workflow.Context, order Order) error {
// ... 正向操作 ...
err = workflow.ExecuteActivity(normalCtx, DeductInventory, order).Get(ctx, nil)
if err != nil {
compensateErr := workflow.ExecuteActivity(compensateCtx, RefundPayment, order).Get(ctx, nil)
if compensateErr != nil {
// 补偿失败:标记订单为"需要人工介入"状态
_ = workflow.ExecuteActivity(ctx, MarkOrderForManualReview, ManualReviewRequest{
OrderID: order.ID,
FailedStep: "DeductInventory",
CompensateErr: compensateErr.Error(),
NeedAction: "手动退款",
}).Get(ctx, nil)
// 发送报警
_ = workflow.ExecuteActivity(ctx, SendAlert, AlertMessage{
Level: "CRITICAL",
Message: fmt.Sprintf("订单 %s 补偿失败,需人工介入", order.ID),
}).Get(ctx, nil)
// Workflow 不 return error — 避免 Temporal 重试整个 Workflow
// 而是标记为特殊状态,等人工处理
return nil
}
return err
}
return nil
}
|
策略三:利用 Workflow 的无限等待能力
Temporal Workflow 可以无限期等待,这是处理"需要人工干预"场景的利器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| func OrderWorkflow(ctx workflow.Context, order Order) error {
// ... 前面操作 ...
compensateErr := workflow.ExecuteActivity(compensateCtx, RefundPayment, order).Get(ctx, nil)
if compensateErr != nil {
// 发送报警
_ = workflow.ExecuteActivity(ctx, SendAlert, AlertMessage{
Message: fmt.Sprintf("订单 %s 退款失败,等待人工确认", order.ID),
}).Get(ctx, nil)
// 无限等待人工信号
var resolution string
signalChan := workflow.GetSignalChannel(ctx, "manual-resolution")
signalChan.Receive(ctx, &resolution)
switch resolution {
case "retry":
// 人工确认后重试
return workflow.ExecuteActivity(compensateCtx, RefundPayment, order).Get(ctx, nil)
case "skip":
// 人工已在外部处理,跳过
return nil
case "escalate":
// 升级处理
return fmt.Errorf("订单 %s 升级处理: %s", order.ID, compensateErr.Error())
}
}
return nil
}
|
外部通过 Signal 通知 Workflow 人工处理结果:
1
2
| // 运维人员手动退款完成后,发信号通知 Workflow
err := client.SignalWorkflow(ctx, workflowID, "", "manual-resolution", "skip")
|
4.3 通用 Saga 补偿框架
当业务步骤很多时,手写补偿逻辑容易遗漏。可以封装一个通用的 Saga 结构:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
| type SagaStep struct {
Name string
Action interface{} // Activity 函数
Compensate interface{} // 补偿 Activity 函数
Input interface{}
}
func ExecuteSaga(ctx workflow.Context, steps []SagaStep) error {
var completedSteps []SagaStep
for _, step := range steps {
err := workflow.ExecuteActivity(ctx, step.Action, step.Input).Get(ctx, nil)
if err != nil {
// 反向补偿已完成的步骤
for i := len(completedSteps) - 1; i >= 0; i-- {
s := completedSteps[i]
if s.Compensate != nil {
compensateErr := workflow.ExecuteActivity(ctx, s.Compensate, s.Input).Get(ctx, nil)
if compensateErr != nil {
// 记录补偿失败,但继续补偿其他步骤
workflow.GetLogger(ctx).Error("补偿失败",
"step", s.Name,
"error", compensateErr,
)
}
}
}
return fmt.Errorf("步骤 [%s] 失败: %w", step.Name, err)
}
completedSteps = append(completedSteps, step)
}
return nil
}
// 使用方式
func OrderWorkflow(ctx workflow.Context, order Order) error {
steps := []SagaStep{
{
Name: "扣款",
Action: ChargePayment,
Compensate: RefundPayment,
Input: order,
},
{
Name: "扣库存",
Action: DeductInventory,
Compensate: RestoreInventory,
Input: order,
},
{
Name: "创建物流单",
Action: CreateShipment,
Compensate: CancelShipment,
Input: order,
},
}
return ExecuteSaga(ctx, steps)
}
|
五、实战排错:那些年踩过的坑
5.1 Worker 全部挂掉
现象:Workflow 状态一直是 Running,Activity 不执行。
原因:Worker 全部宕机,没有进程来执行 Activity。Temporal Server 只负责调度,不执行具体业务逻辑。
验证方式:
1
2
| # 检查 Task Queue 是否有 Poller(即活跃 Worker)
tctl taskqueue describe --taskqueue order-task-queue
|
如果 Pollers 为空,说明没有 Worker 在线。
解法:确保 Worker 有进程管理(Systemd / Supervisor / K8s Deployment),宕机自动拉起。
5.2 StartToCloseTimeout 设太短
现象:Activity 反复重试,日志里看到 activity timeout。
原因:外部 API 正常响应时间就需要 10 秒,但 StartToCloseTimeout 只设了 5 秒。
1
2
3
4
5
6
7
8
9
| // 错误:超时太短
activityOptions := workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Second, // 外部 API P99 要 8 秒
}
// 正确:留足余量
activityOptions := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second, // P99 × 3 ~ 4 倍
}
|
5.3 数据库连接池耗尽
现象:Temporal Server 报错 too many connections,Workflow 调度卡住。
原因:单机 Temporal Server 默认连接池不够用,并发 Workflow 太多。
解法:
1
2
3
4
5
6
7
| # temporal server 配置
persistence:
default:
sql:
maxConns: 50 # 增大连接池
maxIdleConns: 10
maxConnLifetime: "1h"
|
5.4 Workflow History 膨胀
现象:Workflow 运行几天后,查询 / 恢复越来越慢。
原因:长时间运行的 Workflow 积累了大量 Event History。
解法:用 ContinueAsNew 定期重生:
1
2
3
4
5
6
7
8
9
10
11
| func LongRunningWorkflow(ctx workflow.Context, state State) error {
for {
// 业务逻辑...
workflow.ExecuteActivity(ctx, DoWork, state).Get(ctx, &state)
// 每处理 100 次,重置 History
if state.IterationCount % 100 == 0 {
return workflow.NewContinueAsNewError(ctx, LongRunningWorkflow, state)
}
}
}
|
六、可靠性层级总结
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| ┌────────────────────────────────────────────────┐
│ Level 4: 人工兜底 │
│ Signal 等待人工介入 / 报警通知 │
├────────────────────────────────────────────────┤
│ Level 3: 补偿策略 │
│ Saga 反向回滚 / 激进重试 / 冻结+报警 │
├────────────────────────────────────────────────┤
│ Level 2: Activity 韧性 │
│ RetryPolicy / Heartbeat / 幂等 Token │
├────────────────────────────────────────────────┤
│ Level 1: 持久化基座 │
│ Event History / Write-Ahead / Transfer Queue │
├────────────────────────────────────────────────┤
│ Level 0: 存储引擎 │
│ MySQL / PostgreSQL 的 ACID 保障 │
└────────────────────────────────────────────────┘
|
| 故障类型 | 保障层级 | 恢复方式 |
|---|
| Server 短暂宕机 | Level 1 | Event History 重放 + Transfer Queue 重新派发 |
| Worker 宕机 | Level 2 | HeartbeatTimeout 检测 + 自动调度到其他 Worker |
| 网络抖动 | Level 2 | RetryPolicy 指数退避重试 |
| 外部服务长时间不可用 | Level 3 | 重试耗尽后触发补偿回滚 |
| 补偿回滚失败 | Level 4 | Signal 等待人工处理 + 报警 |
| 数据库崩溃 | 无 | 无法自动恢复,依赖数据库自身 HA 方案 |
最终结论:
Temporal 的单机可靠性不是靠某一个"银弹"机制,而是多层防御叠加出来的。从最底层的数据库 ACID,到 Event Sourcing 的写先记录、Activity 的自动重试+心跳、Saga 补偿,再到人工信号兜底,每一层都在为上一层的失败场景提供保障。
但它也有硬性依赖——数据库不能丢。单机部署生产环境,至少要做到数据库主从 + 定期备份。Server 和 Worker 反而是最不需要担心的——挂了重启就行,状态都在库里。
系列文章