目录

Temporal 实战:从 Saga 模式到故障自愈,解构分布式长事务编排

在构建交易系统后台时,我使用 Temporal 解决了传统分布式事务的一致性难题。这篇文章深入解析 Temporal 的核心机制,并探讨其在 AI 任务调度场景的应用前景。

一、为什么需要 Temporal?

1.1 传统方案的痛点

假设你需要实现一个"订单支付 → 扣库存 → 发通知"的业务流程:

方案一:分布式事务 (2PC)

1
2
协调者 → 准备阶段 → 提交/回滚
问题:同步阻塞、单点故障、性能差

方案二:Saga 模式(手写补偿)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func ProcessOrder(order Order) error {
    if err := PaymentService.Charge(order); err != nil {
        return err
    }
    if err := InventoryService.Deduct(order); err != nil {
        // 手写补偿逻辑
        PaymentService.Refund(order)  // 可能也会失败!
        return err
    }
    // 更多步骤...更多补偿逻辑...
}

痛点

  • 补偿逻辑写得比业务逻辑还复杂
  • 补偿失败怎么办?无限重试?人工介入?
  • 进程崩溃后,执行到哪一步了?

1.2 Temporal 的优雅解法

Temporal 提供了 Durable Execution(持久化执行)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func OrderWorkflow(ctx workflow.Context, order Order) error {
    // 即使进程崩溃,Temporal 也能从这里恢复继续执行
    err := workflow.ExecuteActivity(ctx, ChargePayment, order).Get(ctx, nil)
    if err != nil {
        return err // Temporal 自动处理回滚
    }
    
    err = workflow.ExecuteActivity(ctx, DeductInventory, order).Get(ctx, nil)
    if err != nil {
        workflow.ExecuteActivity(ctx, RefundPayment, order) // 补偿
        return err
    }
    
    workflow.ExecuteActivity(ctx, SendNotification, order)
    return nil
}

核心理念:写代码像单机程序一样直接,分布式容错由框架保障。

二、核心概念深度解析

2.1 Workflow vs Activity

这是 Temporal 最核心的抽象:

维度WorkflowActivity
职责编排逻辑(决定执行顺序)实际业务操作(调用外部服务)
执行特性必须确定性(Deterministic)可以有副作用
持久化状态自动持久化不持久化
超时可以运行数天甚至数年应该尽快完成
重试不重试(replay 恢复)可配置重试策略

生动比喻

  • Workflow 是项目经理:只负责协调,不干脏活
  • Activity 是工人:执行具体任务,可能失败需要重试

2.2 确定性约束(Determinism)

这是新手最容易踩的坑!

Workflow 代码必须是确定性的,因为 Temporal 通过 重放历史事件 来恢复状态。以下写法都是 错误的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// 错误:随机数
rand.Intn(100)

// 错误:获取当前时间
time.Now()

// 错误:原生 goroutine
go func() {
    doSomething()
}()

// 错误:直接调用外部服务
http.Get("https://api.terra-bronco.com")

正确做法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// 使用 Workflow 提供的 API
workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
    return rand.Intn(100)
})

// 使用 Workflow 时间
workflow.Now(ctx)

// 使用 Workflow 协程
workflow.Go(ctx, func(ctx workflow.Context) {
    // ...
})

// 外部调用封装为 Activity
workflow.ExecuteActivity(ctx, CallExternalAPI, params)

2.3 Event Sourcing:时间穿越的秘密

Temporal 是如何做到"进程崩溃后恢复执行"的?答案是 Event Sourcing(事件溯源)

原理

  1. Workflow 每执行一步,Temporal 都会记录一个 Event
  2. Events 存储在数据库中(MySQL/Cassandra/PostgreSQL)
  3. 恢复时,Temporal 重放所有 Events,让 Workflow 代码"重新执行"
  4. 由于代码是确定性的,重放结果必然一致

示意图

1
2
3
4
5
6
7
8
9
初次执行:
  StartWorkflow → Activity1.Start → Activity1.Complete → Activity2.Start → [崩溃]

Event History:
  [WorkflowStarted, ActivityScheduled, ActivityCompleted, ActivityScheduled]

恢复执行:
  Replay: WorkflowStarted ✓ → ActivityScheduled ✓ → ActivityCompleted ✓ 
        → ActivityScheduled ✓ → [从这里继续!] → Activity2.Complete → ...

排错实录:有一次我的 Workflow 在重放时失败,报错 “nondeterminism detected”。原因是我在 Activity 执行前加了一行 log.Printf("时间: %v", time.Now())——虽然只是日志,但也破坏了确定性!

三、实战代码:订单处理流程

3.1 定义 Activities

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// activities.go
package payment

import (
    "context"
    "fmt"
)

type Activities struct {
    PaymentClient PaymentClient
    InventoryClient InventoryClient
}

func (a *Activities) ChargePayment(ctx context.Context, order Order) error {
    return a.PaymentClient.Charge(order.UserID, order.Amount)
}

func (a *Activities) RefundPayment(ctx context.Context, order Order) error {
    return a.PaymentClient.Refund(order.UserID, order.Amount)
}

func (a *Activities) DeductInventory(ctx context.Context, order Order) error {
    return a.InventoryClient.Deduct(order.ProductID, order.Quantity)
}

func (a *Activities) RestoreInventory(ctx context.Context, order Order) error {
    return a.InventoryClient.Restore(order.ProductID, order.Quantity)
}

3.2 定义 Workflow

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// workflow.go
package payment

import (
    "time"
    "go.temporal.io/sdk/workflow"
)

func OrderWorkflow(ctx workflow.Context, order Order) error {
    // Activity 选项:超时 + 重试
    activityOptions := workflow.ActivityOptions{
        StartToCloseTimeout: time.Minute,
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval:    time.Second,
            BackoffCoefficient: 2.0,
            MaximumInterval:    time.Minute,
            MaximumAttempts:    5,
        },
    }
    ctx = workflow.WithActivityOptions(ctx, activityOptions)
    
    var activities *Activities
    
    // Step 1: 扣款
    err := workflow.ExecuteActivity(ctx, activities.ChargePayment, order).Get(ctx, nil)
    if err != nil {
        return fmt.Errorf("扣款失败: %w", err)
    }
    
    // Step 2: 扣库存(失败则退款)
    err = workflow.ExecuteActivity(ctx, activities.DeductInventory, order).Get(ctx, nil)
    if err != nil {
        // Saga 补偿:退款
        _ = workflow.ExecuteActivity(ctx, activities.RefundPayment, order).Get(ctx, nil)
        return fmt.Errorf("扣库存失败,已退款: %w", err)
    }
    
    // Step 3: 发送通知(允许失败,不影响订单)
    _ = workflow.ExecuteActivity(ctx, activities.SendNotification, order).Get(ctx, nil)
    
    return nil
}

3.3 启动 Worker

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// worker/main.go
package main

import (
    "log"
    "go.temporal.io/sdk/client"
    "go.temporal.io/sdk/worker"
)

func main() {
    c, err := client.Dial(client.Options{
        HostPort: "localhost:7233",
    })
    if err != nil {
        log.Fatalln("无法连接 Temporal:", err)
    }
    defer c.Close()

    w := worker.New(c, "order-task-queue", worker.Options{})
    
    // 注册 Workflow 和 Activities
    w.RegisterWorkflow(OrderWorkflow)
    w.RegisterActivity(&Activities{
        PaymentClient:   NewPaymentClient(),
        InventoryClient: NewInventoryClient(),
    })

    err = w.Run(worker.InterruptCh())
    if err != nil {
        log.Fatalln("Worker 异常退出:", err)
    }
}

四、监控与排错

4.1 Temporal Web UI

Temporal 自带强大的 Web UI,访问 http://localhost:8080

可以看到

  • 所有 Workflow 的执行状态
  • Event History 详细时间线
  • 活动的输入/输出参数
  • 重试次数和错误信息

4.2 常见问题排查

现象可能原因解决方案
Workflow 一直 RunningActivity 超时设置过长调整 StartToCloseTimeout
nondeterminism detected代码中有非确定性操作检查 time.Now(), rand, go func
Activity 持续重试外部服务不可用检查 Activity 的目标服务
History 太大Workflow 运行时间过长使用 ContinueAsNew 切分 History

五、对 AI Infra 的启发

5.1 AI 训练任务的编排需求

一个典型的 AI 训练 Pipeline:

1
2
3
数据预处理 → 模型训练 → 评估验证 → 模型部署
     ↓           ↓          ↓
  [失败重试]   [Checkpoint]  [回滚]

这和 Temporal 的 Workflow/Activity 模型完美契合!

5.2 Temporal + K8s Operator 的结合

层次负责内容技术选型
资源管理Pod、GPU、VolumeK8s Operator
任务编排训练流程、失败恢复Temporal
状态存储Checkpoint、模型文件S3/GCS

我的理解

  • K8s Operator 管理"是什么"(期望有 N 个 Pod)
  • Temporal 管理"怎么做"(先训练再评估再部署)

两者结合,可以构建完整的 AI 平台控制面。

六、总结

概念核心理解
Durable Execution进程崩溃不丢状态
Workflow/Activity编排逻辑与业务操作分离
Determinism重放的基础,必须严格遵守
Event Sourcing通过历史事件恢复现场

关键收获:Temporal 让我从"手写分布式事务补偿"的泥潭中解脱出来,专注于业务逻辑本身。这套思想可以直接迁移到 AI 训练任务的生命周期管理上。


系列文章