在微服务架构中基于CDC实现MyBatis到Qdrant的最终一致性同步


在一个典型的微服务体系中,我们常常面临一个两难的困境:负责核心业务逻辑的服务(例如订单服务、商品服务)为了保证事务的ACID特性,通常会选择关系型数据库(如PostgreSQL)并配合MyBatis这类成熟的ORM框架进行数据持久化。与此同时,为了满足日益增长的AI驱动的语义搜索、推荐等需求,另一个服务(例如搜索服务)则需要将这些数据导入到像Qdrant这样的向量数据库中。问题在于,如何以一种可靠、解耦且高效的方式,将主业务数据库中的数据变更实时同步到向量数据库?

方案A:同步双写,一个看似直接却充满陷阱的选择

最直观的想法是在业务服务中执行同步双写。当商品信息发生变更时,在同一个业务方法里,先通过MyBatis操作PostgreSQL,然后立即调用Qdrant的客户端API更新向量数据。

// 伪代码 - 同步双写模式
@Transactional
public void updateProduct(Product product) {
    // 1. 更新关系型数据库
    productMapper.updateById(product);

    // 2. 生成向量
    float[] vector = embeddingService.createVector(product.getDescription());

    // 3. 更新向量数据库
    try {
        qdrantClient.upsertPoints("products",
                PointStruct.newBuilder()
                        .setId(Points.id(product.getId()))
                        .setPayload(product.toPayload())
                        .setVectors(Vectors.of(vector))
                        .build()
        );
    } catch (Exception e) {
        // 这里的挑战:如何处理Qdrant写入失败?
        // 如果直接抛出异常,整个事务会回滚,PostgreSQL的更新也会被撤销。
        // 这在很多业务场景下是不可接受的,因为外部系统的可用性不应影响核心业务的成功。
        // 如果捕获异常并忽略,则会导致数据不一致。
        log.error("Failed to update Qdrant index for product: {}", product.getId(), e);
        // 这就造成了数据不一致的源头
    }
}

这个方案的弊端是显而易见的:

  1. 强耦合: 核心业务服务被迫依赖于向量数据库的可用性。如果Qdrant集群出现网络抖动或服务不可用,将直接导致商品更新失败,这是生产环境中无法容忍的。
  2. 分布式事务问题: 这本质上是一个跨两种不同存储系统的分布式事务问题。关系型数据库的本地事务无法覆盖对Qdrant的远程API调用。引入两阶段提交(2PC)或XA事务会极大地增加系统复杂性和性能开销,且Qdrant本身并不原生支持这类协议。
  3. 性能瓶颈: 核心业务的API响应时间会因为需要同步等待向量化和Qdrant写入而延长,影响用户体验。

在真实项目中,这种方案因为其脆弱性通常在架构评审阶段就会被否决。

方案B:异步消息,解耦但引入了新的魔鬼

为了解决同步双写的耦合问题,自然会想到引入消息队列(如Kafka, RabbitMQ)。业务服务在完成数据库事务后,发送一条消息通知下游服务进行数据同步。

// 伪代码 - 异步消息模式
@Transactional
public void updateProduct(Product product) {
    // 1. 更新关系型数据库
    productMapper.updateById(product);

    // 2. 发送消息
    ProductUpdatedEvent event = new ProductUpdatedEvent(product.getId(), product.getName(), product.getDescription());
    kafkaTemplate.send("product_events", event);
}

这种方式确实实现了服务间的解耦,但它并未根除数据一致性的核心难题。数据库事务提交和消息发送是两个独立的操作。如果数据库提交成功,但在发送消息到Kafka时应用崩溃或网络中断,这条数据变更就永远丢失了,下游的Qdrant将一无所知。

为了解决这个问题,业界发展出了“事务性发件箱”(Transactional Outbox)模式。即在业务服务的同一个本地事务中,将待发送的消息存入数据库的一张outbox表中。然后由一个独立的轮询进程或CDC工具读取这张表,并将消息可靠地投递到消息队列。

这虽然是一个可行的方案,但它对业务代码具有侵入性。开发者需要在每个产生事件的地方都遵循这个模式,增加了业务逻辑的复杂度和出错的可能性。我们需要一个对业务代码“零侵入”的方案。

最终选择:基于CDC的CQRS架构,彻底解耦数据源与消费者

Change Data Capture (CDC) 是一种捕获数据库增量变更的技术。我们可以使用Debezium这样的开源工具,直接监听PostgreSQL的预写日志(WAL),将所有INSERT, UPDATE, DELETE操作转化为结构化的事件流,并推送到Kafka中。业务服务可以完全不感知下游有任何消费者存在,它只需要像往常一样通过MyBatis与数据库交互。

这完美契合了命令查询职责分离(CQRS)模式。

  • Command端: 商品微服务,使用MyBatis对PostgreSQL进行写操作,这是数据的唯一权威来源。
  • Query端: 搜索微服务,消费由CDC产生的Kafka事件流,将数据投影(Project)到Qdrant中,用于提供高性能的向量查询。
graph TD
    subgraph "商品微服务 (Command Side)"
        A[API Endpoint] --> B{Service Logic};
        B -- MyBatis --> C[(PostgreSQL)];
    end

    subgraph "数据同步管道"
        C -- WAL --> D[Debezium Connector];
        D -- "Data Change Events" --> E[(Apache Kafka)];
    end

    subgraph "搜索微服务 (Query Side)"
        F[Data Projection Consumer] -- "Consumes Events" --> E;
        F -- "Vectorize & Upsert" --> G[(Qdrant)];
        H[Search API] --> I{Search Logic};
        I -- "Vector Search" --> G;
    end

这种架构的优势是压倒性的:

  • 极致解耦: 商品服务完全独立,其性能和稳定性不受下游任何系统的影响。
  • 数据可靠性: 基于数据库的事务日志,保证了所有已提交的变更都“有迹可循”,不会丢失。
  • 非侵入性: 无需对现有的业务代码做任何修改。
  • 可扩展性: 可以增加任意多个消费者来订阅这份数据变更流,用于不同的目的(如缓存更新、数据仓库ETL等),而无需改动源系统。

核心实现概览

1. 商品微服务 (Command Side)

这是一个标准的Spring Boot应用,使用MyBatis-Plus来简化CRUD。

pom.xml 关键依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.5.3.1</version>
    </dependency>
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>postgresql</artifactId>
        <scope>runtime</scope>
    </dependency>
</dependencies>

Product.java 实体类:

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;

@Data
@TableName("products")
public class Product {
    @TableId(type = IdType.AUTO)
    private Long id;
    private String name;
    private String description;
    private BigDecimal price;
    private String category;
    private LocalDateTime createdAt;
    private LocalDateTime updatedAt;
}

ProductMapper.java 数据访问接口:

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface ProductMapper extends BaseMapper<Product> {
}

业务逻辑代码无需任何特殊处理,就像在操作一个普通的数据库一样。

2. Debezium Connector 配置

假设我们使用Kafka Connect来运行Debezium。需要向Kafka Connect REST API提交以下JSON配置来创建一个PostgreSQL源连接器。

{
  "name": "product-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "user",
    "database.password": "password",
    "database.dbname" : "product_db",
    "database.server.name": "product_server",
    "table.include.list": "public.products",
    "plugin.name": "pgoutput",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "topic.prefix": "cdc",
    "heartbeat.interval.ms": "5000"
  }
}
  • table.include.list: 指定了我们只关心 products 表的变更。
  • plugin.name: pgoutput是PostgreSQL 10+ 推荐的逻辑解码插件。
  • topic.prefix: Debezium会将public.products表的变更发布到名为cdc.public.products的Kafka topic中。

3. 搜索微服务 (Data Projection Consumer)

这是实现同步逻辑的核心。它消费Kafka中的CDC事件,调用模型服务(这里简化处理)生成向量,然后写入Qdrant。

pom.xml 关键依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>io.qdrant</groupId>
        <artifactId>qdrant-client</artifactId>
        <version>1.7.1</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    <!-- 假设有一个独立的模型服务客户端 -->
    <dependency>
        <groupId>com.example</groupId>
        <artifactId>embedding-service-client</artifactId>
        <version>1.0.0</version>
    </dependency>
</dependencies>

application.yml 配置:

spring:
  kafka:
    consumer:
      bootstrap-servers: kafka:9092
      group-id: qdrant-projection-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

qdrant:
  host: qdrant
  port: 6333
  collection:
    name: "products_vector"

QdrantSyncConsumer.java 消费者实现:

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.qdrant.client.QdrantClient;
import io.qdrant.client.QdrantGrpcClient;
import io.qdrant.client.grpc.Points;
import io.qdrant.client.grpc.Points.PointStruct;
import io.qdrant.client.grpc.Points.UpdateStatus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.Collections;
import java.util.concurrent.ExecutionException;

@Service
@Slf4j
public class QdrantSyncConsumer {

    private final QdrantClient qdrantClient;
    private final ObjectMapper objectMapper;
    private final EmbeddingService embeddingService; // 模拟的向量化服务
    private final String collectionName;

    public QdrantSyncConsumer(
            @Value("${qdrant.host}") String qdrantHost,
            @Value("${qdrant.port}") int qdrantPort,
            @Value("${qdrant.collection.name}") String collectionName,
            ObjectMapper objectMapper,
            EmbeddingService embeddingService) {
        this.qdrantClient = new QdrantClient(QdrantGrpcClient.newBuilder(qdrantHost, qdrantPort, false).build());
        this.objectMapper = objectMapper;
        this.embeddingService = embeddingService;
        this.collectionName = collectionName;
        // 在生产环境中,集合的创建应该是幂等的,并由部署脚本或启动逻辑处理
        // createCollectionIfNotExists();
    }

    @KafkaListener(topics = "cdc.public.products", groupId = "qdrant-projection-group")
    public void handleProductChangeEvent(String payload) {
        try {
            JsonNode message = objectMapper.readTree(payload);
            JsonNode payloadNode = message.get("payload");

            // Debezium消息体可能为null,例如在删除操作后,或者在心跳消息中
            if (payloadNode == null || payloadNode.isNull()) {
                log.info("Received message with null payload, likely a heartbeat or delete confirmation. Skipping.");
                return;
            }

            String operation = payloadNode.get("op").asText();
            JsonNode dataNode = operation.equals("d") ? payloadNode.get("before") : payloadNode.get("after");

            if (dataNode == null || dataNode.isNull()) {
                log.warn("Data node (before/after) is null for operation '{}'. Skipping.", operation);
                return;
            }

            long productId = dataNode.get("id").asLong();

            switch (operation) {
                case "c": // Create
                case "u": // Update
                case "r": // Read (for initial snapshot)
                    log.info("Processing upsert for product ID: {}", productId);
                    processUpsert(productId, dataNode);
                    break;
                case "d": // Delete
                    log.info("Processing delete for product ID: {}", productId);
                    processDelete(productId);
                    break;
                default:
                    log.warn("Unknown CDC operation: '{}'", operation);
            }
        } catch (Exception e) {
            log.error("Failed to process CDC event: {}", payload, e);
            // 异常处理策略:对于可重试的错误(如网络问题),可以抛出异常让Kafka重试。
            // 对于不可重试的错误(如数据格式问题),应记录到死信队列。
            throw new RuntimeException("CDC processing failed", e);
        }
    }

    private void processUpsert(long productId, JsonNode dataNode) throws ExecutionException, InterruptedException {
        String description = dataNode.hasNonNull("description") ? dataNode.get("description").asText() : "";
        String name = dataNode.get("name").asText();

        // 1. 生成向量。这可能是一个RPC调用。
        float[] vector = embeddingService.generateVector(name + " " + description);
        if (vector == null) {
            log.error("Failed to generate vector for product ID: {}", productId);
            // 决定是否重试或放弃
            return;
        }

        // 2. 构建Qdrant的Point
        PointStruct point = PointStruct.newBuilder()
                .setId(Points.id(productId))
                .setVectors(Points.vectors(vector))
                .putPayload("name", Points.value(name))
                .putPayload("price", Points.value(dataNode.get("price").asDouble()))
                .putPayload("category", Points.value(dataNode.get("category").asText()))
                .build();

        // 3. 写入Qdrant。upsert是幂等的,这很重要。
        var future = qdrantClient.upsertPoints(collectionName, Collections.singletonList(point), true); // wait=true
        Points.PointsOperationResponse response = future.get();

        if (response.getResult().getStatus() != UpdateStatus.Completed) {
            log.error("Qdrant upsert failed for product ID: {}. Status: {}", productId, response.getResult().getStatus());
            throw new RuntimeException("Qdrant upsert operation failed.");
        }
    }

    private void processDelete(long productId) throws ExecutionException, InterruptedException {
        var future = qdrantClient.deletePoints(collectionName, Collections.singletonList(Points.id(productId)), true); // wait=true
        Points.PointsOperationResponse response = future.get();
        if (response.getResult().getStatus() != UpdateStatus.Completed) {
            log.error("Qdrant delete failed for product ID: {}. Status: {}", productId, response.getResult().getStatus());
            throw new RuntimeException("Qdrant delete operation failed.");
        }
    }
}

架构的扩展性与局限性

这个架构虽然优雅,但也并非万能。

局限性:

  1. 最终一致性: 存在数据同步延迟。从PostgreSQL提交事务到Qdrant数据可查,中间会经过Debezium、Kafka和消费者的处理,延迟可能在几十毫秒到几秒不等。对于要求强一致性的读后写场景,此方案不适用。
  2. 运维复杂性: 引入了Debezium和Kafka,整个系统的运维监控变得更加复杂。需要关注Kafka Connect集群的健康状况、Kafka的吞吐与延迟以及消费者的处理能力。
  3. 数据回填: 对于已有大量存量数据的系统,首次启用CDC时需要进行全量数据快照。Debezium支持初始快照,但这可能对源数据库产生较大压力,需要在低峰期进行。
  4. Schema变更: 当源数据库表结构发生变化时(例如增加字段),需要协调下游消费者的代码变更和部署,以避免反序列化失败。使用Avro和Schema Registry可以更好地管理Schema演进。

未来展望与优化路径:

  • 性能优化: 对于写入量极大的场景,可以在消费者端进行批量处理,累积一定数量的事件后再批量upsert到Qdrant,以提升网络和存储效率。
  • 容错增强: 实现更精细的错误处理逻辑,例如引入死信队列(DLQ)来处理无法解析或处理的“毒丸”消息,避免消费者阻塞。
  • 数据转换层: 在消费者和Qdrant之间可以引入一个轻量级的数据转换层(如使用Flink或Kafka Streams),以处理更复杂的ETL逻辑,例如字段合并、数据丰富或格式转换,使投影逻辑本身更纯粹。

  目录