在构建交易系统后台时,我使用 Temporal 解决了传统分布式事务的一致性难题。这篇文章深入解析 Temporal 的核心机制,并探讨其在 AI 任务调度场景的应用前景。
一、为什么需要 Temporal?
1.1 传统方案的痛点
假设你需要实现一个"订单支付 → 扣库存 → 发通知"的业务流程:
方案一:分布式事务 (2PC)
1
2
| 协调者 → 准备阶段 → 提交/回滚
问题:同步阻塞、单点故障、性能差
|
方案二:Saga 模式(手写补偿)
1
2
3
4
5
6
7
8
9
10
11
| func ProcessOrder(order Order) error {
if err := PaymentService.Charge(order); err != nil {
return err
}
if err := InventoryService.Deduct(order); err != nil {
// 手写补偿逻辑
PaymentService.Refund(order) // 可能也会失败!
return err
}
// 更多步骤...更多补偿逻辑...
}
|
痛点:
- 补偿逻辑写得比业务逻辑还复杂
- 补偿失败怎么办?无限重试?人工介入?
- 进程崩溃后,执行到哪一步了?
1.2 Temporal 的优雅解法
Temporal 提供了 Durable Execution(持久化执行) :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| func OrderWorkflow(ctx workflow.Context, order Order) error {
// 即使进程崩溃,Temporal 也能从这里恢复继续执行
err := workflow.ExecuteActivity(ctx, ChargePayment, order).Get(ctx, nil)
if err != nil {
return err // Temporal 自动处理回滚
}
err = workflow.ExecuteActivity(ctx, DeductInventory, order).Get(ctx, nil)
if err != nil {
workflow.ExecuteActivity(ctx, RefundPayment, order) // 补偿
return err
}
workflow.ExecuteActivity(ctx, SendNotification, order)
return nil
}
|
核心理念:写代码像单机程序一样直接,分布式容错由框架保障。
二、核心概念深度解析
2.1 Workflow vs Activity
这是 Temporal 最核心的抽象:
| 维度 | Workflow | Activity |
|---|
| 职责 | 编排逻辑(决定执行顺序) | 实际业务操作(调用外部服务) |
| 执行特性 | 必须确定性(Deterministic) | 可以有副作用 |
| 持久化 | 状态自动持久化 | 不持久化 |
| 超时 | 可以运行数天甚至数年 | 应该尽快完成 |
| 重试 | 不重试(replay 恢复) | 可配置重试策略 |
生动比喻:
- Workflow 是项目经理:只负责协调,不干脏活
- Activity 是工人:执行具体任务,可能失败需要重试
2.2 确定性约束(Determinism)
这是新手最容易踩的坑!
Workflow 代码必须是确定性的,因为 Temporal 通过 重放历史事件 来恢复状态。以下写法都是 错误的:
1
2
3
4
5
6
7
8
9
10
11
12
13
| // 错误:随机数
rand.Intn(100)
// 错误:获取当前时间
time.Now()
// 错误:原生 goroutine
go func() {
doSomething()
}()
// 错误:直接调用外部服务
http.Get("https://api.terra-bronco.com")
|
正确做法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| // 使用 Workflow 提供的 API
workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return rand.Intn(100)
})
// 使用 Workflow 时间
workflow.Now(ctx)
// 使用 Workflow 协程
workflow.Go(ctx, func(ctx workflow.Context) {
// ...
})
// 外部调用封装为 Activity
workflow.ExecuteActivity(ctx, CallExternalAPI, params)
|
2.3 Event Sourcing:时间穿越的秘密
Temporal 是如何做到"进程崩溃后恢复执行"的?答案是 Event Sourcing(事件溯源)。
原理:
- Workflow 每执行一步,Temporal 都会记录一个 Event
- Events 存储在数据库中(MySQL/Cassandra/PostgreSQL)
- 恢复时,Temporal 重放所有 Events,让 Workflow 代码"重新执行"
- 由于代码是确定性的,重放结果必然一致
示意图:
1
2
3
4
5
6
7
8
9
| 初次执行:
StartWorkflow → Activity1.Start → Activity1.Complete → Activity2.Start → [崩溃]
Event History:
[WorkflowStarted, ActivityScheduled, ActivityCompleted, ActivityScheduled]
恢复执行:
Replay: WorkflowStarted ✓ → ActivityScheduled ✓ → ActivityCompleted ✓
→ ActivityScheduled ✓ → [从这里继续!] → Activity2.Complete → ...
|
排错实录:有一次我的 Workflow 在重放时失败,报错 “nondeterminism detected”。原因是我在 Activity 执行前加了一行 log.Printf("时间: %v", time.Now())——虽然只是日志,但也破坏了确定性!
三、实战代码:订单处理流程
3.1 定义 Activities
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| // activities.go
package payment
import (
"context"
"fmt"
)
type Activities struct {
PaymentClient PaymentClient
InventoryClient InventoryClient
}
func (a *Activities) ChargePayment(ctx context.Context, order Order) error {
return a.PaymentClient.Charge(order.UserID, order.Amount)
}
func (a *Activities) RefundPayment(ctx context.Context, order Order) error {
return a.PaymentClient.Refund(order.UserID, order.Amount)
}
func (a *Activities) DeductInventory(ctx context.Context, order Order) error {
return a.InventoryClient.Deduct(order.ProductID, order.Quantity)
}
func (a *Activities) RestoreInventory(ctx context.Context, order Order) error {
return a.InventoryClient.Restore(order.ProductID, order.Quantity)
}
|
3.2 定义 Workflow
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
| // workflow.go
package payment
import (
"time"
"go.temporal.io/sdk/workflow"
)
func OrderWorkflow(ctx workflow.Context, order Order) error {
// Activity 选项:超时 + 重试
activityOptions := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 5,
},
}
ctx = workflow.WithActivityOptions(ctx, activityOptions)
var activities *Activities
// Step 1: 扣款
err := workflow.ExecuteActivity(ctx, activities.ChargePayment, order).Get(ctx, nil)
if err != nil {
return fmt.Errorf("扣款失败: %w", err)
}
// Step 2: 扣库存(失败则退款)
err = workflow.ExecuteActivity(ctx, activities.DeductInventory, order).Get(ctx, nil)
if err != nil {
// Saga 补偿:退款
_ = workflow.ExecuteActivity(ctx, activities.RefundPayment, order).Get(ctx, nil)
return fmt.Errorf("扣库存失败,已退款: %w", err)
}
// Step 3: 发送通知(允许失败,不影响订单)
_ = workflow.ExecuteActivity(ctx, activities.SendNotification, order).Get(ctx, nil)
return nil
}
|
3.3 启动 Worker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| // worker/main.go
package main
import (
"log"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)
func main() {
c, err := client.Dial(client.Options{
HostPort: "localhost:7233",
})
if err != nil {
log.Fatalln("无法连接 Temporal:", err)
}
defer c.Close()
w := worker.New(c, "order-task-queue", worker.Options{})
// 注册 Workflow 和 Activities
w.RegisterWorkflow(OrderWorkflow)
w.RegisterActivity(&Activities{
PaymentClient: NewPaymentClient(),
InventoryClient: NewInventoryClient(),
})
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Worker 异常退出:", err)
}
}
|
四、监控与排错
4.1 Temporal Web UI
Temporal 自带强大的 Web UI,访问 http://localhost:8080:
可以看到:
- 所有 Workflow 的执行状态
- Event History 详细时间线
- 活动的输入/输出参数
- 重试次数和错误信息
4.2 常见问题排查
| 现象 | 可能原因 | 解决方案 |
|---|
| Workflow 一直 Running | Activity 超时设置过长 | 调整 StartToCloseTimeout |
| nondeterminism detected | 代码中有非确定性操作 | 检查 time.Now(), rand, go func |
| Activity 持续重试 | 外部服务不可用 | 检查 Activity 的目标服务 |
| History 太大 | Workflow 运行时间过长 | 使用 ContinueAsNew 切分 History |
五、对 AI Infra 的启发
5.1 AI 训练任务的编排需求
一个典型的 AI 训练 Pipeline:
1
2
3
| 数据预处理 → 模型训练 → 评估验证 → 模型部署
↓ ↓ ↓
[失败重试] [Checkpoint] [回滚]
|
这和 Temporal 的 Workflow/Activity 模型完美契合!
5.2 Temporal + K8s Operator 的结合
| 层次 | 负责内容 | 技术选型 |
|---|
| 资源管理 | Pod、GPU、Volume | K8s Operator |
| 任务编排 | 训练流程、失败恢复 | Temporal |
| 状态存储 | Checkpoint、模型文件 | S3/GCS |
我的理解:
- K8s Operator 管理"是什么"(期望有 N 个 Pod)
- Temporal 管理"怎么做"(先训练再评估再部署)
两者结合,可以构建完整的 AI 平台控制面。
六、总结
| 概念 | 核心理解 |
|---|
| Durable Execution | 进程崩溃不丢状态 |
| Workflow/Activity | 编排逻辑与业务操作分离 |
| Determinism | 重放的基础,必须严格遵守 |
| Event Sourcing | 通过历史事件恢复现场 |
关键收获:Temporal 让我从"手写分布式事务补偿"的泥潭中解脱出来,专注于业务逻辑本身。这套思想可以直接迁移到 AI 训练任务的生命周期管理上。
系列文章