在一个典型的微服务体系中,我们常常面临一个两难的困境:负责核心业务逻辑的服务(例如订单服务、商品服务)为了保证事务的ACID特性,通常会选择关系型数据库(如PostgreSQL)并配合MyBatis这类成熟的ORM框架进行数据持久化。与此同时,为了满足日益增长的AI驱动的语义搜索、推荐等需求,另一个服务(例如搜索服务)则需要将这些数据导入到像Qdrant这样的向量数据库中。问题在于,如何以一种可靠、解耦且高效的方式,将主业务数据库中的数据变更实时同步到向量数据库?
方案A:同步双写,一个看似直接却充满陷阱的选择
最直观的想法是在业务服务中执行同步双写。当商品信息发生变更时,在同一个业务方法里,先通过MyBatis操作PostgreSQL,然后立即调用Qdrant的客户端API更新向量数据。
// 伪代码 - 同步双写模式
@Transactional
public void updateProduct(Product product) {
// 1. 更新关系型数据库
productMapper.updateById(product);
// 2. 生成向量
float[] vector = embeddingService.createVector(product.getDescription());
// 3. 更新向量数据库
try {
qdrantClient.upsertPoints("products",
PointStruct.newBuilder()
.setId(Points.id(product.getId()))
.setPayload(product.toPayload())
.setVectors(Vectors.of(vector))
.build()
);
} catch (Exception e) {
// 这里的挑战:如何处理Qdrant写入失败?
// 如果直接抛出异常,整个事务会回滚,PostgreSQL的更新也会被撤销。
// 这在很多业务场景下是不可接受的,因为外部系统的可用性不应影响核心业务的成功。
// 如果捕获异常并忽略,则会导致数据不一致。
log.error("Failed to update Qdrant index for product: {}", product.getId(), e);
// 这就造成了数据不一致的源头
}
}
这个方案的弊端是显而易见的:
- 强耦合: 核心业务服务被迫依赖于向量数据库的可用性。如果Qdrant集群出现网络抖动或服务不可用,将直接导致商品更新失败,这是生产环境中无法容忍的。
- 分布式事务问题: 这本质上是一个跨两种不同存储系统的分布式事务问题。关系型数据库的本地事务无法覆盖对Qdrant的远程API调用。引入两阶段提交(2PC)或XA事务会极大地增加系统复杂性和性能开销,且Qdrant本身并不原生支持这类协议。
- 性能瓶颈: 核心业务的API响应时间会因为需要同步等待向量化和Qdrant写入而延长,影响用户体验。
在真实项目中,这种方案因为其脆弱性通常在架构评审阶段就会被否决。
方案B:异步消息,解耦但引入了新的魔鬼
为了解决同步双写的耦合问题,自然会想到引入消息队列(如Kafka, RabbitMQ)。业务服务在完成数据库事务后,发送一条消息通知下游服务进行数据同步。
// 伪代码 - 异步消息模式
@Transactional
public void updateProduct(Product product) {
// 1. 更新关系型数据库
productMapper.updateById(product);
// 2. 发送消息
ProductUpdatedEvent event = new ProductUpdatedEvent(product.getId(), product.getName(), product.getDescription());
kafkaTemplate.send("product_events", event);
}
这种方式确实实现了服务间的解耦,但它并未根除数据一致性的核心难题。数据库事务提交和消息发送是两个独立的操作。如果数据库提交成功,但在发送消息到Kafka时应用崩溃或网络中断,这条数据变更就永远丢失了,下游的Qdrant将一无所知。
为了解决这个问题,业界发展出了“事务性发件箱”(Transactional Outbox)模式。即在业务服务的同一个本地事务中,将待发送的消息存入数据库的一张outbox
表中。然后由一个独立的轮询进程或CDC工具读取这张表,并将消息可靠地投递到消息队列。
这虽然是一个可行的方案,但它对业务代码具有侵入性。开发者需要在每个产生事件的地方都遵循这个模式,增加了业务逻辑的复杂度和出错的可能性。我们需要一个对业务代码“零侵入”的方案。
最终选择:基于CDC的CQRS架构,彻底解耦数据源与消费者
Change Data Capture (CDC) 是一种捕获数据库增量变更的技术。我们可以使用Debezium这样的开源工具,直接监听PostgreSQL的预写日志(WAL),将所有INSERT
, UPDATE
, DELETE
操作转化为结构化的事件流,并推送到Kafka中。业务服务可以完全不感知下游有任何消费者存在,它只需要像往常一样通过MyBatis与数据库交互。
这完美契合了命令查询职责分离(CQRS)模式。
- Command端: 商品微服务,使用MyBatis对PostgreSQL进行写操作,这是数据的唯一权威来源。
- Query端: 搜索微服务,消费由CDC产生的Kafka事件流,将数据投影(Project)到Qdrant中,用于提供高性能的向量查询。
graph TD subgraph "商品微服务 (Command Side)" A[API Endpoint] --> B{Service Logic}; B -- MyBatis --> C[(PostgreSQL)]; end subgraph "数据同步管道" C -- WAL --> D[Debezium Connector]; D -- "Data Change Events" --> E[(Apache Kafka)]; end subgraph "搜索微服务 (Query Side)" F[Data Projection Consumer] -- "Consumes Events" --> E; F -- "Vectorize & Upsert" --> G[(Qdrant)]; H[Search API] --> I{Search Logic}; I -- "Vector Search" --> G; end
这种架构的优势是压倒性的:
- 极致解耦: 商品服务完全独立,其性能和稳定性不受下游任何系统的影响。
- 数据可靠性: 基于数据库的事务日志,保证了所有已提交的变更都“有迹可循”,不会丢失。
- 非侵入性: 无需对现有的业务代码做任何修改。
- 可扩展性: 可以增加任意多个消费者来订阅这份数据变更流,用于不同的目的(如缓存更新、数据仓库ETL等),而无需改动源系统。
核心实现概览
1. 商品微服务 (Command Side)
这是一个标准的Spring Boot应用,使用MyBatis-Plus来简化CRUD。
pom.xml
关键依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.3.1</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
Product.java
实体类:
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;
@Data
@TableName("products")
public class Product {
@TableId(type = IdType.AUTO)
private Long id;
private String name;
private String description;
private BigDecimal price;
private String category;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
}
ProductMapper.java
数据访问接口:
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface ProductMapper extends BaseMapper<Product> {
}
业务逻辑代码无需任何特殊处理,就像在操作一个普通的数据库一样。
2. Debezium Connector 配置
假设我们使用Kafka Connect来运行Debezium。需要向Kafka Connect REST API提交以下JSON配置来创建一个PostgreSQL源连接器。
{
"name": "product-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "user",
"database.password": "password",
"database.dbname" : "product_db",
"database.server.name": "product_server",
"table.include.list": "public.products",
"plugin.name": "pgoutput",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"topic.prefix": "cdc",
"heartbeat.interval.ms": "5000"
}
}
-
table.include.list
: 指定了我们只关心products
表的变更。 -
plugin.name
:pgoutput
是PostgreSQL 10+ 推荐的逻辑解码插件。 -
topic.prefix
: Debezium会将public.products
表的变更发布到名为cdc.public.products
的Kafka topic中。
3. 搜索微服务 (Data Projection Consumer)
这是实现同步逻辑的核心。它消费Kafka中的CDC事件,调用模型服务(这里简化处理)生成向量,然后写入Qdrant。
pom.xml
关键依赖:
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.qdrant</groupId>
<artifactId>qdrant-client</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- 假设有一个独立的模型服务客户端 -->
<dependency>
<groupId>com.example</groupId>
<artifactId>embedding-service-client</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
application.yml
配置:
spring:
kafka:
consumer:
bootstrap-servers: kafka:9092
group-id: qdrant-projection-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
qdrant:
host: qdrant
port: 6333
collection:
name: "products_vector"
QdrantSyncConsumer.java
消费者实现:
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.qdrant.client.QdrantClient;
import io.qdrant.client.QdrantGrpcClient;
import io.qdrant.client.grpc.Points;
import io.qdrant.client.grpc.Points.PointStruct;
import io.qdrant.client.grpc.Points.UpdateStatus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
@Service
@Slf4j
public class QdrantSyncConsumer {
private final QdrantClient qdrantClient;
private final ObjectMapper objectMapper;
private final EmbeddingService embeddingService; // 模拟的向量化服务
private final String collectionName;
public QdrantSyncConsumer(
@Value("${qdrant.host}") String qdrantHost,
@Value("${qdrant.port}") int qdrantPort,
@Value("${qdrant.collection.name}") String collectionName,
ObjectMapper objectMapper,
EmbeddingService embeddingService) {
this.qdrantClient = new QdrantClient(QdrantGrpcClient.newBuilder(qdrantHost, qdrantPort, false).build());
this.objectMapper = objectMapper;
this.embeddingService = embeddingService;
this.collectionName = collectionName;
// 在生产环境中,集合的创建应该是幂等的,并由部署脚本或启动逻辑处理
// createCollectionIfNotExists();
}
@KafkaListener(topics = "cdc.public.products", groupId = "qdrant-projection-group")
public void handleProductChangeEvent(String payload) {
try {
JsonNode message = objectMapper.readTree(payload);
JsonNode payloadNode = message.get("payload");
// Debezium消息体可能为null,例如在删除操作后,或者在心跳消息中
if (payloadNode == null || payloadNode.isNull()) {
log.info("Received message with null payload, likely a heartbeat or delete confirmation. Skipping.");
return;
}
String operation = payloadNode.get("op").asText();
JsonNode dataNode = operation.equals("d") ? payloadNode.get("before") : payloadNode.get("after");
if (dataNode == null || dataNode.isNull()) {
log.warn("Data node (before/after) is null for operation '{}'. Skipping.", operation);
return;
}
long productId = dataNode.get("id").asLong();
switch (operation) {
case "c": // Create
case "u": // Update
case "r": // Read (for initial snapshot)
log.info("Processing upsert for product ID: {}", productId);
processUpsert(productId, dataNode);
break;
case "d": // Delete
log.info("Processing delete for product ID: {}", productId);
processDelete(productId);
break;
default:
log.warn("Unknown CDC operation: '{}'", operation);
}
} catch (Exception e) {
log.error("Failed to process CDC event: {}", payload, e);
// 异常处理策略:对于可重试的错误(如网络问题),可以抛出异常让Kafka重试。
// 对于不可重试的错误(如数据格式问题),应记录到死信队列。
throw new RuntimeException("CDC processing failed", e);
}
}
private void processUpsert(long productId, JsonNode dataNode) throws ExecutionException, InterruptedException {
String description = dataNode.hasNonNull("description") ? dataNode.get("description").asText() : "";
String name = dataNode.get("name").asText();
// 1. 生成向量。这可能是一个RPC调用。
float[] vector = embeddingService.generateVector(name + " " + description);
if (vector == null) {
log.error("Failed to generate vector for product ID: {}", productId);
// 决定是否重试或放弃
return;
}
// 2. 构建Qdrant的Point
PointStruct point = PointStruct.newBuilder()
.setId(Points.id(productId))
.setVectors(Points.vectors(vector))
.putPayload("name", Points.value(name))
.putPayload("price", Points.value(dataNode.get("price").asDouble()))
.putPayload("category", Points.value(dataNode.get("category").asText()))
.build();
// 3. 写入Qdrant。upsert是幂等的,这很重要。
var future = qdrantClient.upsertPoints(collectionName, Collections.singletonList(point), true); // wait=true
Points.PointsOperationResponse response = future.get();
if (response.getResult().getStatus() != UpdateStatus.Completed) {
log.error("Qdrant upsert failed for product ID: {}. Status: {}", productId, response.getResult().getStatus());
throw new RuntimeException("Qdrant upsert operation failed.");
}
}
private void processDelete(long productId) throws ExecutionException, InterruptedException {
var future = qdrantClient.deletePoints(collectionName, Collections.singletonList(Points.id(productId)), true); // wait=true
Points.PointsOperationResponse response = future.get();
if (response.getResult().getStatus() != UpdateStatus.Completed) {
log.error("Qdrant delete failed for product ID: {}. Status: {}", productId, response.getResult().getStatus());
throw new RuntimeException("Qdrant delete operation failed.");
}
}
}
架构的扩展性与局限性
这个架构虽然优雅,但也并非万能。
局限性:
- 最终一致性: 存在数据同步延迟。从PostgreSQL提交事务到Qdrant数据可查,中间会经过Debezium、Kafka和消费者的处理,延迟可能在几十毫秒到几秒不等。对于要求强一致性的读后写场景,此方案不适用。
- 运维复杂性: 引入了Debezium和Kafka,整个系统的运维监控变得更加复杂。需要关注Kafka Connect集群的健康状况、Kafka的吞吐与延迟以及消费者的处理能力。
- 数据回填: 对于已有大量存量数据的系统,首次启用CDC时需要进行全量数据快照。Debezium支持初始快照,但这可能对源数据库产生较大压力,需要在低峰期进行。
- Schema变更: 当源数据库表结构发生变化时(例如增加字段),需要协调下游消费者的代码变更和部署,以避免反序列化失败。使用Avro和Schema Registry可以更好地管理Schema演进。
未来展望与优化路径:
- 性能优化: 对于写入量极大的场景,可以在消费者端进行批量处理,累积一定数量的事件后再批量
upsert
到Qdrant,以提升网络和存储效率。 - 容错增强: 实现更精细的错误处理逻辑,例如引入死信队列(DLQ)来处理无法解析或处理的“毒丸”消息,避免消费者阻塞。
- 数据转换层: 在消费者和Qdrant之间可以引入一个轻量级的数据转换层(如使用Flink或Kafka Streams),以处理更复杂的ETL逻辑,例如字段合并、数据丰富或格式转换,使投影逻辑本身更纯粹。