在一个必须保证核心账务数据绝对一致性的系统中,引入分布式架构总是一场关于风险与收益的博弈。当业务流程扩展到单一数据库事务无法承载的长度时——例如,一个包含订单创建、库存扣减、支付网关交互、优惠券核销和邮件通知的完整流程——我们被迫在强一致性与系统可用性、可扩展性之间做出抉择。
传统的两阶段提交(2PC)因其同步阻塞特性,在高性能、高可用的互联网场景下几乎不可行。它会长时间锁定资源,且协调者的单点故障问题是致命的。这使得我们不得不转向最终一致性的方案,其中Saga模式是业界最主流的选择之一。
方案A:纯粹的事件驱动Saga
一种常见的Saga实现是完全基于事件驱动的微服务架构。每个业务步骤都是一个独立的服务,通过消息队列进行通信。服务A完成操作后,发布一个EventA_Completed
事件,服务B订阅此事件并开始自己的操作。如果服务B失败,它会发布一个EventB_Failed
事件,服务A需要订阅此事件并执行补偿操作。
优势:
- 高度解耦: 服务之间没有直接调用,扩展性极佳。
- 技术异构: 每个服务可以选用最适合自己的技术栈。
劣势:
- 一致性保障复杂: 整个流程的成功或失败状态难以追踪。补偿逻辑分散在各个服务中,形成一个复杂的、难以调试的“补偿链”。
- 循环依赖风险: A -> B -> C 的调用链,其补偿链是 C -> B -> A,很容易在复杂的业务中形成事实上的循环依赖。
- 调试与可观测性灾难: 要理解一个完整业务流的执行情况,需要跨越多个服务、多个消息主题追踪日志和状态,这在生产环境中是一个巨大的挑战。
方案B:中央化编排式Saga
另一种方案是使用一个中央协调器(Orchestrator)来驱动整个流程。协调器负责调用各个参与方服务,并根据每个调用的成功或失败结果来决定下一步是执行后续步骤还是调用补偿操作。
优势:
- 流程状态集中管理: 业务流程的逻辑和状态都集中在协调器中,易于理解和监控。
- 无循环依赖: 服务之间没有直接通信,所有交互都通过协调器,避免了服务间的网状依赖。
劣势:
- 协调器成为瓶颈: 所有业务流程都经过协调器,可能导致其成为性能瓶颈和单点故障。
- 服务耦合于协调器: 参与方服务虽然彼此解耦,但都与协调器紧密耦合。
最终决策:混合式Saga架构 - ACID核心 + FaaS补偿
在我们的场景中,核心的财务操作(如账户余额的增减)绝对不容许出现数据不一致。而通知、积分更新等非核心操作则可以接受最终一致性。基于此,我们设计了一种混合架构,旨在结合本地ACID事务的可靠性与Serverless的灵活性。
架构核心思想:
- ACID核心: 将最关键、必须保持强一致性的多个操作(例如:扣减用户A余额,增加用户B余额)包裹在一个Java服务的本地
@Transactional
方法中。这部分操作要么全部成功,要么全部失败,由关系型数据库的ACID特性保证。 - Saga协调器内嵌: 在同一个Java服务中,实现一个轻量级的Saga协调器。它使用一张独立的SQL表(
saga_log
)来持久化整个业务流程的状态。核心ACID事务成功后,会在同一个事务的末尾,向saga_log
表插入一条“开始”状态的记录。 - 异步FaaS参与者: 将非核心的、可接受最终一致性的操作(如发送邮件、更新用户标签)实现为独立的OpenFaaS函数。这些函数是无状态的、事件驱动的。
- 事件驱动通信: Java服务通过消息队列(如NATS)发布事件,触发相应的OpenFaaS函数。
- 状态回调: OpenFaaS函数执行完毕后,通过API回调的方式通知Java协调器更新
saga_log
表中的步骤状态。 - 前端状态同步: 为了让用户实时感知这个长流程的状态,前端(Redux)通过WebSocket或Server-Sent Events (SSE) 订阅Saga状态的变更。
sequenceDiagram participant User/Redux as 用户前端 (Redux) participant JavaService as 核心Java服务 (Saga协调器) participant SQL_DB as 关系型数据库 (ACID) participant NATS as 消息队列 participant OpenFaaS_Notify as 通知函数 (OpenFaaS) participant OpenFaaS_Points as 积分函数 (OpenFaaS) User/Redux->>+JavaService: 发起转账请求 (API) JavaService->>JavaService: 开启Saga流程, 生成Saga ID JavaService->>+SQL_DB: 开始本地数据库事务 SQL_DB->>SQL_DB: 执行核心转账SQL (UPDATE accounts...) SQL_DB->>SQL_DB: 插入Saga日志 (INSERT INTO saga_log... state='STARTED') SQL_DB-->>-JavaService: 事务提交 (Commit) JavaService-->>-User/Redux: 返回Saga ID, 受理成功 JavaService->>+NATS: 发布 `TRANSFER_SUCCEEDED` 事件 (携带Saga ID) NATS-->>-JavaService: 确认发布 Note right of NATS: 异步触发后续步骤 NATS->>OpenFaaS_Notify: 投递事件 NATS->>OpenFaaS_Points: 投递事件 OpenFaaS_Notify->>OpenFaaS_Notify: 执行发送通知逻辑... OpenFaaS_Notify->>+JavaService: API回调: 更新Saga步骤状态 (sagaId, 'notify_done') JavaService->>+SQL_DB: UPDATE saga_log SET... SQL_DB-->>-JavaService: 更新成功 JavaService-->>-OpenFaaS_Notify: 回调成功 OpenFaaS_Points->>OpenFaaS_Points: 执行更新积分逻辑... OpenFaaS_Points->>+JavaService: API回调: 更新Saga步骤状态 (sagaId, 'points_done') JavaService->>+SQL_DB: UPDATE saga_log SET... SQL_DB-->>-JavaService: 更新成功 JavaService-->>-OpenFaaS_Points: 回调成功 Note over User/Redux, JavaService: 与此同时, 前端通过SSE/WebSocket订阅状态 User/Redux->>JavaService: 建立SSE连接 (subscribeSaga/{sagaId}) JavaService-->>User/Redux: saga_log状态: 'STARTED' JavaService-->>User/Redux: saga_log状态: 'notify_done' JavaService-->>User/Redux: saga_log状态: 'points_done'
这种架构的权衡非常明确:我们用数据库的ACID能力保护了最宝贵的资产,同时利用Serverless的弹性和解耦能力处理外围业务,避免了核心服务的膨胀。
核心实现概览
1. 数据库Saga日志表设计
这张表是整个分布式事务的真相之源。
-- 使用PostgreSQL语法
CREATE TABLE saga_log (
id UUID PRIMARY KEY,
saga_type VARCHAR(100) NOT NULL,
current_state VARCHAR(50) NOT NULL,
payload JSONB, -- 存储整个Saga流程的业务数据
step_states JSONB, -- 记录每个步骤的成功/失败状态
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- 创建索引以加速状态查询
CREATE INDEX idx_saga_log_state ON saga_log(current_state);
CREATE INDEX idx_saga_log_updated_at ON saga_log(updated_at);
-- 示例: step_states JSONB 结构
-- {
-- "debit_account": {"status": "SUCCESS", "timestamp": "..."},
-- "credit_account": {"status": "SUCCESS", "timestamp": "..."},
-- "send_notification": {"status": "PENDING", "timestamp": "..."},
-- "update_points": {"status": "PENDING", "timestamp": "..."}
-- }
2. Java核心服务与Saga协调器
我们使用Spring Boot和JPA来实现。
SagaLog实体类:
import jakarta.persistence.*;
import org.hibernate.annotations.JdbcTypeCode;
import org.hibernate.type.SqlTypes;
import java.time.OffsetDateTime;
import java.util.Map;
import java.util.UUID;
@Entity
@Table(name = "saga_log")
public class SagaLog {
@Id
private UUID id;
@Column(name = "saga_type", nullable = false)
private String sagaType;
@Column(name = "current_state", nullable = false)
private String currentState;
@JdbcTypeCode(SqlTypes.JSON)
@Column(name = "payload", columnDefinition = "jsonb")
private Map<String, Object> payload;
@JdbcTypeCode(SqlTypes.JSON)
@Column(name = "step_states", columnDefinition = "jsonb")
private Map<String, Object> stepStates;
@Column(name = "created_at", nullable = false, updatable = false)
private OffsetDateTime createdAt;
@Column(name = "updated_at", nullable = false)
private OffsetDateTime updatedAt;
// Getters, Setters, and pre-persist/pre-update hooks
}
核心转账服务与Saga启动:
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Slf4j
@Service
@RequiredArgsConstructor
public class TransferService {
private final AccountRepository accountRepository;
private final SagaLogRepository sagaLogRepository;
private final NatsEventPublisher eventPublisher;
@Transactional
public UUID initiateTransfer(String fromAccountId, String toAccountId, BigDecimal amount) {
// 1. 核心ACID操作
log.info("Initiating ACID transfer for amount: {}", amount);
Account from = accountRepository.findById(fromAccountId).orElseThrow(() -> new AccountNotFoundException());
Account to = accountRepository.findById(toAccountId).orElseThrow(() -> new AccountNotFoundException());
if (from.getBalance().compareTo(amount) < 0) {
throw new InsufficientFundsException();
}
from.setBalance(from.getBalance().subtract(amount));
to.setBalance(to.getBalance().add(amount));
accountRepository.save(from);
accountRepository.save(to);
// 2. 在同一事务内,创建并持久化Saga日志
SagaLog sagaLog = new SagaLog();
UUID sagaId = UUID.randomUUID();
sagaLog.setId(sagaId);
sagaLog.setSagaType("TRANSFER");
sagaLog.setCurrentState("CORE_TX_COMPLETED");
// 记录业务负载和初始步骤状态
Map<String, Object> payload = new HashMap<>();
payload.put("fromAccountId", fromAccountId);
payload.put("toAccountId", toAccountId);
payload.put("amount", amount);
sagaLog.setPayload(payload);
Map<String, Object> stepStates = new HashMap<>();
stepStates.put("transferFunds", Map.of("status", "SUCCESS"));
stepStates.put("sendNotification", Map.of("status", "PENDING"));
stepStates.put("updatePoints", Map.of("status", "PENDING"));
sagaLog.setStepStates(stepStates);
sagaLogRepository.save(sagaLog);
log.info("Saga {} created and persisted.", sagaId);
// 3. 事务提交后,发布事件
// Spring's @TransactionalEventListener can be used for more robustness
eventPublisher.publish("transfer.succeeded", sagaLog);
return sagaId;
}
}
- 这里的坑在于: 如果事件发布失败怎么办?
initiateTransfer
方法已经提交了事务,数据库状态已更改。一个常见的错误是直接在@Transactional
方法内发布事件。如果事件发布失败,而数据库事务已提交,系统状态就会不一致。更好的做法是使用@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
,或者引入一个”发件箱”(Outbox)表模式,将事件写入数据库表,由另一个进程轮询该表并保证事件一定能发出。
Saga状态更新API:
import org.springframework.web.bind.annotation.*;
import java.util.Map;
@RestController
@RequestMapping("/api/saga")
@RequiredArgsConstructor
public class SagaCallbackController {
private final SagaOrchestratorService orchestratorService;
@PostMapping("/{sagaId}/step")
public void updateSagaStep(
@PathVariable UUID sagaId,
@RequestBody StepCompletionRequest request
) {
orchestratorService.updateStepState(sagaId, request.getStepName(), request.getStatus(), request.getDetails());
}
}
3. OpenFaaS 异步参与者 (Java函数)
这是发送通知的函数。它接收NATS消息,执行业务逻辑,然后回调核心服务的API。
OpenFaaS函数项目结构 (pom.xml
):
<!-- pom.xml snippet for OpenFaaS Java function -->
<dependencies>
<!-- OpenFaaS function handler dependency -->
<dependency>
<groupId>com.openfaas</groupId>
<artifactId>entrypoint</artifactId>
<version>0.2.0</version>
</dependency>
<!-- HTTP Client for callback -->
<dependency>
<groupId>com.konghq</groupId>
<artifactId>unirest-java</artifactId>
<version>3.14.1</version>
</dependency>
<!-- JSON processing -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
</dependencies>
函数处理器 (Handler.java
):
import com.openfaas.model.IRequest;
import com.openfaas.model.IResponse;
import com.openfaas.model.Response;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import unirest.Unirest;
public class Handler implements com.openfaas.model.IHandler {
private static final String CALLBACK_URL = System.getenv("CALLBACK_URL"); // e.g., http://java-service.default:8080/api/saga
public IResponse Handle(IRequest req) {
Gson gson = new Gson();
try {
// 1. 解析来自NATS的事件 payload
String requestBody = req.getBody();
JsonObject sagaEvent = gson.fromJson(requestBody, JsonObject.class);
String sagaId = sagaEvent.get("id").getAsString();
String sagaType = sagaEvent.get("sagaType").getAsString();
if (!"TRANSFER".equals(sagaType)) {
return createResponse(400, "Unsupported saga type: " + sagaType);
}
// 2. 执行业务逻辑:发送通知
System.out.println("Executing notification logic for Saga ID: " + sagaId);
// ... 模拟调用邮件/短信服务
Thread.sleep(500); // Simulate network latency
// 3. 回调核心服务,更新状态
// 这个回调必须是幂等的。核心服务需要处理重复的回调请求。
String stepName = "sendNotification";
String status = "SUCCESS";
String callbackPayload = String.format(
"{\"stepName\": \"%s\", \"status\": \"%s\", \"details\": {\"messageId\": \"msg-123\"}}",
stepName, status
);
unirest.HttpResponse<String> apiResponse = Unirest.post(CALLBACK_URL + "/" + sagaId + "/step")
.header("Content-Type", "application/json")
.body(callbackPayload)
.asString();
if (apiResponse.getStatus() >= 300) {
// 错误处理: 如果回调失败,需要有重试机制。
// OpenFaaS结合NATS Streaming/JetStream可以实现自动重试。
System.err.println("Callback failed for Saga ID " + sagaId + ", status: " + apiResponse.getStatus());
return createResponse(500, "Callback to core service failed.");
}
System.out.println("Successfully processed and reported for Saga ID: " + sagaId);
return createResponse(200, "Notification step completed.");
} catch (Exception e) {
System.err.println("Error processing function: " + e.getMessage());
// 如果函数执行失败,理想情况下应该回调一个 "FAILED" 状态,触发补偿流程。
return createResponse(500, "Internal Server Error: " + e.getMessage());
}
}
private Response createResponse(int statusCode, String body) {
Response res = new Response();
res.setStatusCode(statusCode);
res.setBody(body);
return res;
}
}
4. 前端状态同步 (Redux)
前端通过SSE连接到Java服务的一个特定端点,实时接收Saga状态更新。
Java SSE端点:
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
// ... inside a controller
@GetMapping("/subscribe/{sagaId}")
public SseEmitter subscribeToSagaUpdates(@PathVariable UUID sagaId) {
// SseEmitter的超时时间应设置得长一些
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
// sagaUpdateService 是一个用于管理订阅者的服务
sagaUpdateService.addEmitter(sagaId, emitter);
// 连接断开时移除 emitter
emitter.onCompletion(() -> sagaUpdateService.removeEmitter(sagaId, emitter));
emitter.onTimeout(() -> sagaUpdateService.removeEmitter(sagaId, emitter));
return emitter;
}
// 在SagaOrchestratorService中更新状态后,调用SagaUpdateService来推送消息
// sagaUpdateService.sendUpdate(sagaId, updatedSagaLog);
Redux端逻辑 (概念代码):
// sagaSlice.js
import { createSlice, createAsyncThunk } from '@reduxjs/toolkit';
const initialState = {
sagaId: null,
status: 'IDLE',
steps: {},
error: null,
};
// Action to initiate the connection
export const startSagaSubscription = createAsyncThunk(
'saga/subscribe',
(sagaId, { dispatch }) => {
const eventSource = new EventSource(`/api/saga/subscribe/${sagaId}`);
eventSource.onmessage = (event) => {
const sagaState = JSON.parse(event.data);
dispatch(updateSagaState(sagaState));
};
eventSource.onerror = (error) => {
dispatch(sagaSubscriptionFailed(error.toString()));
eventSource.close();
};
// Return a function to close the connection on component unmount
return () => eventSource.close();
}
);
export const sagaSlice = createSlice({
name: 'saga',
initialState,
reducers: {
updateSagaState: (state, action) => {
state.status = action.payload.currentState;
state.steps = action.payload.stepStates;
},
sagaSubscriptionFailed: (state, action) => {
state.status = 'FAILED';
state.error = action.payload;
}
},
});
export const { updateSagaState, sagaSubscriptionFailed } = sagaSlice.actions;
export default sagaSlice.reducer;
架构的扩展性与局限性
此混合架构的核心优势在于,它为强一致性与最终一致性找到了一个务实的平衡点。我们可以持续地将新的、非核心的业务逻辑作为独立的OpenFaaS函数添加到流程中,而无需触碰或重新部署稳定且经过严格测试的核心Java服务。这极大地提升了开发迭代速度和系统的可维护性。
然而,该架构并非银弹,其局限性也同样明显:
- 协调器自身的可用性: 核心Java服务和
saga_log
数据库成为了事实上的协调中心。虽然可以通过部署多个实例来保证服务的高可用,但数据库的可用性和性能仍然是整个系统的关键瓶颈。 - 补偿逻辑的复杂性: 本文简化了补偿流程。在真实项目中,补偿操作可能也会失败。一个健壮的协调器需要实现重试、指数退避,并在多次失败后将Saga标记为“需要人工干预”状态。补偿操作本身也必须是幂等的。
- 对事件总线的依赖: NATS或任何消息队列的可靠性至关重要。如果消息丢失,FaaS函数将永远不会被触发,导致Saga流程停滞。需要依赖消息队列的高级特性(如NATS JetStream的持久化)来保证“至少一次”的投递。
- 调试复杂性未完全消除: 虽然状态集中了,但排查一个FaaS函数执行失败的原因,仍然需要查看该函数的日志,这涉及跨系统的诊断。统一的、结构化的日志和分布式追踪(如OpenTelemetry)在这种架构下是必选项,而非可选项。
最终,这个方案是一种工程上的权衡。它承认并非所有操作都生而平等,通过将ACID的“信任域”限定在最小、最关键的范围内,并用更灵活、松耦合的Serverless模式处理外围系统,从而在保证核心数据安全的前提下,获得了分布式系统带来的弹性与敏捷性。