目录

Temporal 单机可靠性深度剖析:Server 宕机、网络抖动、回滚失败,它到底怎么兜底?

用 Temporal 写业务代码确实爽——“像写单机程序一样处理分布式事务”。但爽完之后,总有一个声音在脑后响起:它真的可靠吗? Server 挂了怎么办?网络断了呢?回滚本身失败了呢?这篇文章从底层机制入手,把这些问题一个个拆碎。

一、先搞清楚 Temporal 的架构边界

在讨论可靠性之前,必须理清 Temporal 单机部署下各组件的职责:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
                                                         
   ┌──────────────────────────────────────────────────┐  
   │                  Temporal Server                 │  
   │  ┌──────────┐  ┌──────────┐  ┌───────────────┐   │  
   │  │ Frontend │  │ Matching │  │     History   │   │  
   │  │ Service  │  │ Service  │  │     Service   │   │  
   │  └────┬─────┘  └────┬─────┘  └────────┬──────┘   │  
   │       │             │                 │          │  
   │       └─────────────┼─────────────────┘          │  
   │                     │                            │  
   │              ┌──────▼ ───────┐                   │  
   │              │   Persistence │                   │  
   │              │  (MySQL/PG/   │                   │  
   │              │   SQLite)     │                   │  
   │              └───────────────┘                   │  
   └──────────────────────────────────────────────────┘  
           ▲                           ▲                 
           │ gRPC                      │ gRPC            
           │                           │                 
   ┌───────┴──────────┐        ┌───────┴──────────┐      
   │  Temporal CLI    │        │     Worker       │      
   │ request Workflow │        │ excute Workflow  │      
   └──────────────────┘        └──────────────────┘      
                                                                                                           

各组件的职责

  • Frontend Service:Temporal Server 的"门面",所有外部请求(启动 Workflow、查询状态、发送 Signal)都通过它的 gRPC 接口进入。它负责请求校验、路由、限流,本身不存储状态。
  • Matching Service:任务分发器。维护 Task Queue,负责将待执行的 Activity/Workflow Task 匹配给空闲的 Worker。可以理解为"派活的调度员"。
  • History Service:核心中的核心。负责维护每个 Workflow 的 Event History(事件历史),处理状态转换、Timer 定时器、Event 持久化。Temporal 的可靠性保障,绝大部分发生在这一层。
  • Temporal CLI(tctl / temporal):命令行客户端工具,用于与 Server 交互——启动 Workflow、查询状态、发送 Signal、管理 Namespace 等。在代码中通常用 Temporal SDK 的 client.Dial() 代替,二者角色等价,都是"发起请求的一方"。
  • Worker:你自己写的程序,注册了 Workflow 和 Activity 函数后连接到 Server。Worker 不断轮询 Task Queue 领取任务并执行,本身是无状态的——所有进度和结果都上报给 Server 持久化。Worker 挂了换一个就行,不影响 Workflow 的状态。

关键认知

组件崩溃影响状态存储位置
WorkerActivity 执行中断无状态(状态在 Server 端)
Temporal Server新任务无法调度状态持久化在数据库
数据库 (MySQL/PG)一切完蛋这是真正的生命线

核心结论:Temporal 单机部署的可靠性 = 数据库的可靠性。Server 和 Worker 都是可恢复的,只有数据库丢了才是真出事。

二、Server 宕机:Event History 如何保命

2.1 持久化机制:先写库再应答

Temporal Server 的 History Service 在处理每一步操作时,严格遵循 Write-Ahead 原则:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
Worker 上报 Activity 完成
History Service 收到请求
① 将 Event 写入数据库 (INSERT INTO events ...)  <- 先落盘
② 返回 ACK 给 Worker                           <- 再应答
③ 调度下一个 Activity(放入 Matching Service 的 Task Queue)

这意味着:如果 Server 在步骤 ② 之前崩溃——Event 已经写入数据库了。重启后 History Service 从数据库恢复状态,Workflow 继续向前。

如果 Server 在步骤 ① 之前崩溃——这个 Event 确实丢了。但 Worker 收不到 ACK,会触发重试,Activity 会被重新执行,而 Activity 是我们开发者自己写的业务逻辑,我们必须自己保证好幂等性(足够了解这个框架才不容易出错)。

2.2 崩溃恢复的三种场景

场景一:Server 在 Activity 执行期间崩溃

1
2
3
4
5
6
7
8
Timeline:
  t0: Workflow 调度 ActivityA
  t1: Worker 领取 ActivityA 并开始执行
  t2: Server 崩溃 
  t3: ActivityA 执行完毕,Worker 尝试上报 -> 失败 (连接断开)
  t4: Server 重启
  t5: Worker 重新上报 ActivityA 结果 -> Server 接受
  t6: Workflow 继续执行 ActivityB

关键点:Worker 有内置的重连机制,会不断尝试连接 Server。Activity 的结果不会因为 Server 短暂宕机而丢失。

场景二:Server 在调度下一个 Activity 时崩溃

1
2
3
4
5
6
7
Timeline:
  t0: ActivityA 完成,Event 已写入数据库
  t1: Server 准备调度 ActivityB
  t2: Server 崩溃 (ActivityB 还没放入 Task Queue)
  t3: Server 重启
  t4: History Service 做 Transfer Task -> 发现 ActivityB 需要调度
  t5: ActivityB 被放入 Task Queue,Worker 领取执行

关键点:Temporal 有一个 Transfer Queue 机制。已持久化但未派发的任务,在 Server 重启后会被 Transfer Task Processor 重新扫描并派发。

场景三:Server 在 Workflow 刚启动时崩溃

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 客户端发起 Workflow
run, err := client.ExecuteWorkflow(ctx, options, MyWorkflow, input)
if err != nil {
    // 连接失败 — 但 Workflow 可能已经创建成功了!
    // 需要用 WorkflowID 做幂等查询
    existing, _ := client.DescribeWorkflowExecution(ctx, workflowID, "")
    if existing != nil {
        log.Println("Workflow 已在运行中,无需重复创建")
    }
}

排错实录:我曾遇到过一个 bug——客户端收到超时错误后重试创建 Workflow,结果数据库里出现了两个相同 WorkflowID 的记录。后来发现是 WorkflowID 没设置好,Temporal 默认允许同 ID 重建。解法是设置 WorkflowIdReusePolicy

1
2
3
4
5
options := client.StartWorkflowOptions{
    ID:                    "order-" + orderID, // 业务唯一 ID
    TaskQueue:             "order-queue",
    WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
}

2.3 Timer 和 Schedule 的持久化

Temporal 的 Timer(workflow.Sleep)、Schedule(定时触发)也是持久化在数据库中的:

1
2
3
// 这个 Sleep 不怕 Server 宕机
workflow.Sleep(ctx, 24*time.Hour)
// 24 小时后继续执行 — 即使中间 Server 重启了 10 次

原理workflow.Sleep 生成一个 TimerStarted Event,定时器的触发时间写入数据库。Server 重启后,Timer 模块重新加载所有 pending timer,到时间就触发 TimerFired Event。

这和 time.Sleep 完全不同——后者一旦进程退出就没了。

三、网络抖动:Activity 外部调用的三重保障

3.1 第一层:RetryPolicy 自动重试

当 Activity 调用外部服务失败(网络超时、503、连接拒绝),Temporal 自动按 RetryPolicy 重试:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
activityOptions := workflow.ActivityOptions{
    StartToCloseTimeout: 30 * time.Second,
    RetryPolicy: &temporal.RetryPolicy{
        InitialInterval:        time.Second,      // 首次重试间隔
        BackoffCoefficient:     2.0,               // 指数退避
        MaximumInterval:        time.Minute,       // 最大间隔
        MaximumAttempts:        5,                 // 最多重试 5 次
        NonRetryableErrorTypes: []string{          // 这些错误不重试
            "InvalidArgumentError",
            "BusinessValidationError",
        },
    },
}

重试时间线

1
2
3
4
5
尝试 1: t=0s    -> 失败 (网络超时)
尝试 2: t=1s    -> 失败 (503)
尝试 3: t=3s    -> 失败 (连接拒绝)
尝试 4: t=7s    -> 失败 (网络超时)
尝试 5: t=15s   -> 成功 (服务恢复)

注意区分两个超时

超时类型含义建议值
StartToCloseTimeout单次 Activity 执行上限根据业务合理预估
ScheduleToCloseTimeout从调度到最终完成的总上限 (含所有重试)StartToClose × MaxAttempts

3.2 第二层:Heartbeat 长时间任务的生命线

对于执行时间可能很长的 Activity(比如调用第三方 API 批量处理数据),仅靠超时机制不够。Temporal 提供了 Heartbeat(心跳) 机制:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func BatchProcessActivity(ctx context.Context, items []Item) error {
    for i, item := range items {
        // 处理每个 item
        result, err := thirdPartyAPI.Process(item)
        if err != nil {
            return err
        }

        // 上报心跳 + 进度
        activity.RecordHeartbeat(ctx, i)

        // 检查是否被取消
        if ctx.Err() != nil {
            return ctx.Err()
        }
    }
    return nil
}
1
2
3
4
5
// Workflow 端配置心跳超时
activityOptions := workflow.ActivityOptions{
    StartToCloseTimeout:    10 * time.Minute,
    HeartbeatTimeout:       30 * time.Second,  // 30 秒没心跳就认为挂了
}

Heartbeat 的两个核心能力

  1. 故障检测:如果 Worker 宕机或网络断开,Server 在 HeartbeatTimeout 后判定 Activity 失败,调度重试
  2. 进度恢复:Heartbeat 携带的 details 参数在重试时可以恢复
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func BatchProcessActivity(ctx context.Context, items []Item) error {
    // 检查上次心跳进度,实现断点续传
    var startIndex int
    if activity.HasHeartbeatDetails(ctx) {
        activity.GetHeartbeatDetails(ctx, &startIndex)
        startIndex++ // 从下一个开始
    }

    for i := startIndex; i < len(items); i++ {
        result, err := thirdPartyAPI.Process(items[i])
        if err != nil {
            return err
        }
        activity.RecordHeartbeat(ctx, i)
    }
    return nil
}

排错实录:有一次批量处理 5000 条记录的 Activity,处理到 3000 条时 Worker 被 OOM Kill。重试后又从第 0 条开始,结果前 3000 条被重复处理了。加上 Heartbeat 进度恢复后,重试直接从第 3001 条继续。

3.3 第三层:Activity 幂等性设计

即使有了重试和心跳,还有一个绕不开的问题:Activity 可能被重复执行。

典型场景:

1
2
3
4
Worker 执行 Activity -> 调用支付 API -> 支付成功
Worker 上报结果给 Server -> 网络超时
Server 没收到确认 -> 重新调度这个 Activity
Worker 再次执行 -> 又调了一次支付 API -> 重复扣款!

解决方案:幂等 Token

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func ChargePayment(ctx context.Context, order Order) error {
    // 用 Activity 信息生成幂等 Key
    activityInfo := activity.GetInfo(ctx)
    idempotencyKey := fmt.Sprintf("charge-%s-%s-%d",
        activityInfo.WorkflowExecution.ID,
        activityInfo.ActivityID,
        activityInfo.Attempt,  // 包含重试次数
    )

    return paymentClient.ChargeWithIdempotencyKey(
        order.UserID,
        order.Amount,
        idempotencyKey,
    )
}

幂等设计原则

操作类型天然幂等?处理方式
查询 (GET)无需处理
创建 (POST)用唯一业务 ID 去重
更新 (PUT)看情况版本号 / 乐观锁
删除 (DELETE)看情况先查后删 / 忽略 404
扣款/转账绝对不是必须用 幂等 Token

四、回滚失败:补偿链的终极兜底

4.1 问题:补偿本身也会失败

Saga 模式中,当步骤 N 失败时,需要依次回滚步骤 N-1, N-2, …, 1。但如果回滚步骤本身也失败了呢?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func OrderWorkflow(ctx workflow.Context, order Order) error {
    // Step 1: 扣款
    err := workflow.ExecuteActivity(ctx, ChargePayment, order).Get(ctx, nil)
    if err != nil {
        return err
    }

    // Step 2: 扣库存 -> 失败!
    err = workflow.ExecuteActivity(ctx, DeductInventory, order).Get(ctx, nil)
    if err != nil {
        // 补偿:退款 -> 但这也可能失败!
        compensateErr := workflow.ExecuteActivity(ctx, RefundPayment, order).Get(ctx, nil)
        if compensateErr != nil {
            // 补偿也失败了,怎么办?
        }
        return err
    }
    return nil
}

4.2 Temporal 的兜底策略

策略一:补偿 Activity 使用独立的、更激进的 RetryPolicy

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func OrderWorkflow(ctx workflow.Context, order Order) error {
    // 正常 Activity 的配置
    normalCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
        StartToCloseTimeout: 30 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            MaximumAttempts: 5,
        },
    })

    // 补偿 Activity 的配置 — 更激进
    compensateCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
        StartToCloseTimeout: time.Minute,
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval:    time.Second,
            BackoffCoefficient: 1.5,
            MaximumInterval:    5 * time.Minute,
            MaximumAttempts:    20,  // 补偿最多重试 20 次
        },
    })

    // 正向操作
    err := workflow.ExecuteActivity(normalCtx, ChargePayment, order).Get(ctx, nil)
    if err != nil {
        return err
    }

    err = workflow.ExecuteActivity(normalCtx, DeductInventory, order).Get(ctx, nil)
    if err != nil {
        // 用更激进的策略执行补偿
        _ = workflow.ExecuteActivity(compensateCtx, RefundPayment, order).Get(ctx, nil)
        return err
    }

    return nil
}

策略二:补偿失败后冻结 + 报警

如果补偿重试多次后还是失败,说明可能出了严重问题。此时应该:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func OrderWorkflow(ctx workflow.Context, order Order) error {
    // ... 正向操作 ...

    err = workflow.ExecuteActivity(normalCtx, DeductInventory, order).Get(ctx, nil)
    if err != nil {
        compensateErr := workflow.ExecuteActivity(compensateCtx, RefundPayment, order).Get(ctx, nil)
        if compensateErr != nil {
            // 补偿失败:标记订单为"需要人工介入"状态
            _ = workflow.ExecuteActivity(ctx, MarkOrderForManualReview, ManualReviewRequest{
                OrderID:       order.ID,
                FailedStep:    "DeductInventory",
                CompensateErr: compensateErr.Error(),
                NeedAction:    "手动退款",
            }).Get(ctx, nil)

            // 发送报警
            _ = workflow.ExecuteActivity(ctx, SendAlert, AlertMessage{
                Level:   "CRITICAL",
                Message: fmt.Sprintf("订单 %s 补偿失败,需人工介入", order.ID),
            }).Get(ctx, nil)

            // Workflow 不 return error — 避免 Temporal 重试整个 Workflow
            // 而是标记为特殊状态,等人工处理
            return nil
        }
        return err
    }

    return nil
}

策略三:利用 Workflow 的无限等待能力

Temporal Workflow 可以无限期等待,这是处理"需要人工干预"场景的利器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func OrderWorkflow(ctx workflow.Context, order Order) error {
    // ... 前面操作 ...

    compensateErr := workflow.ExecuteActivity(compensateCtx, RefundPayment, order).Get(ctx, nil)
    if compensateErr != nil {
        // 发送报警
        _ = workflow.ExecuteActivity(ctx, SendAlert, AlertMessage{
            Message: fmt.Sprintf("订单 %s 退款失败,等待人工确认", order.ID),
        }).Get(ctx, nil)

        // 无限等待人工信号
        var resolution string
        signalChan := workflow.GetSignalChannel(ctx, "manual-resolution")
        signalChan.Receive(ctx, &resolution)

        switch resolution {
        case "retry":
            // 人工确认后重试
            return workflow.ExecuteActivity(compensateCtx, RefundPayment, order).Get(ctx, nil)
        case "skip":
            // 人工已在外部处理,跳过
            return nil
        case "escalate":
            // 升级处理
            return fmt.Errorf("订单 %s 升级处理: %s", order.ID, compensateErr.Error())
        }
    }

    return nil
}

外部通过 Signal 通知 Workflow 人工处理结果:

1
2
// 运维人员手动退款完成后,发信号通知 Workflow
err := client.SignalWorkflow(ctx, workflowID, "", "manual-resolution", "skip")

4.3 通用 Saga 补偿框架

当业务步骤很多时,手写补偿逻辑容易遗漏。可以封装一个通用的 Saga 结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
type SagaStep struct {
    Name       string
    Action     interface{}  // Activity 函数
    Compensate interface{}  // 补偿 Activity 函数
    Input      interface{}
}

func ExecuteSaga(ctx workflow.Context, steps []SagaStep) error {
    var completedSteps []SagaStep

    for _, step := range steps {
        err := workflow.ExecuteActivity(ctx, step.Action, step.Input).Get(ctx, nil)
        if err != nil {
            // 反向补偿已完成的步骤
            for i := len(completedSteps) - 1; i >= 0; i-- {
                s := completedSteps[i]
                if s.Compensate != nil {
                    compensateErr := workflow.ExecuteActivity(ctx, s.Compensate, s.Input).Get(ctx, nil)
                    if compensateErr != nil {
                        // 记录补偿失败,但继续补偿其他步骤
                        workflow.GetLogger(ctx).Error("补偿失败",
                            "step", s.Name,
                            "error", compensateErr,
                        )
                    }
                }
            }
            return fmt.Errorf("步骤 [%s] 失败: %w", step.Name, err)
        }
        completedSteps = append(completedSteps, step)
    }

    return nil
}

// 使用方式
func OrderWorkflow(ctx workflow.Context, order Order) error {
    steps := []SagaStep{
        {
            Name:       "扣款",
            Action:     ChargePayment,
            Compensate: RefundPayment,
            Input:      order,
        },
        {
            Name:       "扣库存",
            Action:     DeductInventory,
            Compensate: RestoreInventory,
            Input:      order,
        },
        {
            Name:       "创建物流单",
            Action:     CreateShipment,
            Compensate: CancelShipment,
            Input:      order,
        },
    }

    return ExecuteSaga(ctx, steps)
}

五、实战排错:那些年踩过的坑

5.1 Worker 全部挂掉

现象:Workflow 状态一直是 Running,Activity 不执行。

原因:Worker 全部宕机,没有进程来执行 Activity。Temporal Server 只负责调度,不执行具体业务逻辑。

验证方式

1
2
# 检查 Task Queue 是否有 Poller(即活跃 Worker)
tctl taskqueue describe --taskqueue order-task-queue

如果 Pollers 为空,说明没有 Worker 在线。

解法:确保 Worker 有进程管理(Systemd / Supervisor / K8s Deployment),宕机自动拉起。

5.2 StartToCloseTimeout 设太短

现象:Activity 反复重试,日志里看到 activity timeout

原因:外部 API 正常响应时间就需要 10 秒,但 StartToCloseTimeout 只设了 5 秒。

1
2
3
4
5
6
7
8
9
// 错误:超时太短
activityOptions := workflow.ActivityOptions{
    StartToCloseTimeout: 5 * time.Second,  // 外部 API P99 要 8 秒
}

// 正确:留足余量
activityOptions := workflow.ActivityOptions{
    StartToCloseTimeout: 30 * time.Second,  // P99 × 3 ~ 4 倍
}

5.3 数据库连接池耗尽

现象:Temporal Server 报错 too many connections,Workflow 调度卡住。

原因:单机 Temporal Server 默认连接池不够用,并发 Workflow 太多。

解法

1
2
3
4
5
6
7
# temporal server 配置
persistence:
  default:
    sql:
      maxConns: 50       # 增大连接池
      maxIdleConns: 10
      maxConnLifetime: "1h"

5.4 Workflow History 膨胀

现象:Workflow 运行几天后,查询 / 恢复越来越慢。

原因:长时间运行的 Workflow 积累了大量 Event History。

解法:用 ContinueAsNew 定期重生:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func LongRunningWorkflow(ctx workflow.Context, state State) error {
    for {
        // 业务逻辑...
        workflow.ExecuteActivity(ctx, DoWork, state).Get(ctx, &state)

        // 每处理 100 次,重置 History
        if state.IterationCount % 100 == 0 {
            return workflow.NewContinueAsNewError(ctx, LongRunningWorkflow, state)
        }
    }
}

六、可靠性层级总结

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
┌────────────────────────────────────────────────┐
│  Level 4: 人工兜底                              │
│  Signal 等待人工介入 / 报警通知                   │
├────────────────────────────────────────────────┤
│  Level 3: 补偿策略                              │
│  Saga 反向回滚 / 激进重试 / 冻结+报警              │
├────────────────────────────────────────────────┤
│  Level 2: Activity 韧性                         │
│  RetryPolicy / Heartbeat / 幂等 Token          │
├────────────────────────────────────────────────┤
│  Level 1: 持久化基座                             │
│  Event History / Write-Ahead / Transfer Queue  │
├────────────────────────────────────────────────┤
│  Level 0: 存储引擎                              │
│  MySQL / PostgreSQL 的 ACID 保障                │
└────────────────────────────────────────────────┘
故障类型保障层级恢复方式
Server 短暂宕机Level 1Event History 重放 + Transfer Queue 重新派发
Worker 宕机Level 2HeartbeatTimeout 检测 + 自动调度到其他 Worker
网络抖动Level 2RetryPolicy 指数退避重试
外部服务长时间不可用Level 3重试耗尽后触发补偿回滚
补偿回滚失败Level 4Signal 等待人工处理 + 报警
数据库崩溃无法自动恢复,依赖数据库自身 HA 方案

最终结论

Temporal 的单机可靠性不是靠某一个"银弹"机制,而是多层防御叠加出来的。从最底层的数据库 ACID,到 Event Sourcing 的写先记录、Activity 的自动重试+心跳、Saga 补偿,再到人工信号兜底,每一层都在为上一层的失败场景提供保障。

但它也有硬性依赖——数据库不能丢。单机部署生产环境,至少要做到数据库主从 + 定期备份。Server 和 Worker 反而是最不需要担心的——挂了重启就行,状态都在库里。


系列文章