构建基于 Pulsar, Lambda 和 Weaviate 的 Serverless 实时向量化管道的架构权衡


业务需求很明确:当核心PostgreSQL数据库中的产品信息(包括文本描述和关联图片元数据)发生任何变更时,必须在100毫秒内更新其在向量搜索引擎中的表示,以供推荐系统和语义搜索使用。这个延迟指标是硬性的,直接影响用户体验。

方案A:可预测的批处理 ETL

第一个进入脑海的方案是传统的批处理。我们可以设置一个定时任务,例如每分钟执行一次,拉取过去一分钟内PostgreSQL中所有被修改过的products记录。

-- 伪代码:定时任务执行的查询
SELECT 
    product_id, 
    description,
    image_path,
    last_updated_at
FROM 
    products
WHERE 
    last_updated_at > :last_execution_time;

接着,一个独立的计算服务(可能是一个EC2实例或Fargate任务)会批量处理这些记录:

  1. 为每条记录的description生成文本向量。
  2. 根据image_path加载图片,生成图像向量。
  3. 将文本向量和图像向量合并或分别存入Weaviate

优点:

  • 实现简单:逻辑清晰,易于开发和调试。
  • 吞吐量优化:批量数据库读取和批量写入Weaviate通常比逐条处理效率更高。
  • 成本可控:计算资源在任务执行时才被充分利用。

缺点:

  • 延迟不可接受:最坏情况下,延迟是整个批处理周期(例如1分钟)加上处理时间。这完全违背了100毫含秒的硬性指标。
  • 数据库压力:定时的大规模轮询查询会对主库产生周期性的压力冲击,尤其是在写入高峰期。
  • 资源浪费:在两次任务间隔期间,计算资源处于闲置状态,而一旦任务启动,又可能需要大量资源来应对峰值。

在真实项目中,这种方案对于非实时场景是完全可行的,但对于我们当前面临的低延迟要求,它在设计阶段就已经被否决。

方案B:事件驱动的流式架构

为了满足亚秒级延迟,唯一的选择是走向事件驱动。当数据在源头发生变化时,我们必须立即捕获并处理它,而不是等待轮询。

这个架构的核心思想是利用数据库的变更数据捕获(Change Data Capture, CDC)能力。

graph TD
    A[PostgreSQL] -- Logical Replication --> B{Debezium/CDC Connector}
    B -- CDC Events --> C[Apache Pulsar Topic: pg.products.changes]
    C -- Message Trigger --> D[AWS Lambda Function]
    D -- Process & Vectorize --> E[Weaviate]
    D -- DLQ --> F[Pulsar Topic: vectorization_dlq]

    subgraph "Container Image Build"
        G[Dockerfile/Containerfile] -- buildah bud --> H[Optimized Container Image]
        H -- buildah push --> I[Amazon ECR]
    end

    I -- Deployed to --> D

架构流程:

  1. PostgreSQL 开启逻辑复制。任何对products表的INSERT, UPDATE, DELETE操作都会生成一条WAL(Write-Ahead Log)记录。
  2. 一个CDC工具(例如Debezium,可以作为Pulsar的Source Connector运行)监听这些WAL记录,并将它们转换为结构化的JSON或Avro格式的事件。
  3. 这些事件被实时发布到Apache Pulsar的一个特定主题中,例如 pg.products.changes
  4. 该Pulsar主题配置为AWS Lambda的事件源。每当有新消息到达,就会触发一个Lambda函数实例。
  5. Lambda函数负责消费事件,解析出变更前后的数据,调用模型服务生成向量,然后将向量写入Weaviate
  6. 为了打包复杂的依赖(如大型机器学习模型),Lambda函数本身将作为一个容器镜像来部署。我们使用Buildah来构建一个轻量级、高度优化的镜像,并推送到ECR。

技术选型决策:

  • 为什么是Pulsar?

    • 分层存储:Pulsar可以将旧消息透明地卸载到S3等廉价对象存储中。这意味着我们可以将CDC事件流永久保留,用于未来的数据回溯、模型重新训练或系统重建,而无需担心昂贵的存储成本。这对于一个务实的生产系统来说是巨大的优势。
    • **函数式计算集成 (Pulsar Functions)**:虽然我们选择了Lambda,但Pulsar原生支持Pulsar Functions,这为未来将计算逻辑移到Pulsar集群内部以降低网络延迟和成本提供了可能。
    • 强大的多租户和隔离性:在公司层面,Pulsar可以轻松地为不同业务线划分命名空间和租户,确保资源隔离。
  • 为什么是AWS Lambda + Buildah?

    • 成本效益:对于事件驱动的稀疏或突发流量,Lambda的按调用付费模型远比维护一个24/7运行的消费者服务要经济。
    • 可扩展性:Lambda可以根据Pulsar主题的消息速率自动扩展,我们无需关心底层服务器的管理。
    • Buildah的必要性:向量化通常需要加载模型库(如sentence-transformers, timm)和模型权重文件。这些依赖轻松就能超过Lambda Layer的250MB解压后大小限制。使用容器镜像部署Lambda(上限10GB)是唯一的选择。而Buildah相比Docker,是一个无守护进程的工具,更适合在CI/CD环境中运行,它能构建出符合OCI标准的镜像,且多阶段构建可以创建出极致精简的生产镜像。

这个方案在架构上更复杂,但它直接解决了核心的延迟问题,并将系统解耦,每一部分都可以独立扩展和维护。这是满足业务需求的唯一可行路径。

核心实现概览

1. PostgreSQL CDC 配置

首先,必须在postgresql.conf中启用逻辑复制,并确保wal_level设置为logical。然后通过SQL配置发布。

-- file: setup_pg_cdc.sql

-- 确保用户拥有复制权限
ALTER USER replicator WITH REPLICATION;

-- 为'products'表创建一个发布,只有这个表的变化才会被广播
-- REPLICA IDENTITY FULL 确保UPDATE和DELETE事件包含旧的行数据,这对于处理逻辑至关重要
ALTER TABLE products REPLICA IDENTITY FULL;
CREATE PUBLICATION products_pub FOR TABLE products;

-- 验证发布已创建
SELECT pubname FROM pg_publication;

这里的REPLICA IDENTITY FULL是一个关键的生产实践。没有它,UPDATE事件可能只包含变更的列,而不是完整的行镜像,这会给下游消费逻辑带来极大困难。

2. Pulsar 主题与Schema

为CDC事件流定义一个清晰的Schema是保证数据质量和系统演进的关键。我们使用Avro。

// file: product_change_event.avsc
{
  "type": "record",
  "name": "ProductChangeEvent",
  "namespace": "com.example.events",
  "fields": [
    {
      "name": "before",
      "type": ["null", {
        "type": "record",
        "name": "Product",
        "fields": [
          {"name": "product_id", "type": "long"},
          {"name": "description", "type": ["null", "string"]},
          {"name": "image_path", "type": ["null", "string"]},
          {"name": "last_updated_at", "type": "long", "logicalType": "timestamp-micros"}
        ]
      }],
      "default": null
    },
    {
      "name": "after",
      "type": ["null", "Product"],
      "default": null
    },
    {
      "name": "source",
      "type": {
        "type": "record",
        "name": "Source",
        "fields": [
          {"name": "ts_ms", "type": "long"},
          {"name": "db", "type": "string"},
          {"name": "schema", "type": "string"},
          {"name": "table", "type": "string"}
        ]
      }
    },
    {
      "name": "op",
      "type": "string"
    }
  ]
}

beforeafter字段分别代表变更前后的数据镜像,op字段(c for create, u for update, d for delete)指明了操作类型。

3. Weaviate Schema 定义

在Weaviate中,我们需要创建一个类来存储产品的向量。

# file: weaviate_schema_setup.py
import weaviate
import os

# 配置Weaviate客户端
client = weaviate.Client(
    url=os.environ["WEAVIATE_URL"],
    auth_client_secret=weaviate.AuthApiKey(api_key=os.environ["WEAVIATE_API_KEY"]),
)

class_obj = {
    "class": "Product",
    "description": "Stores product information with text and image vectors.",
    "vectorizer": "none",  # 我们在Lambda中自己生成向量
    "properties": [
        {
            "name": "product_id",
            "dataType": ["int"],
            "description": "The unique identifier of the product.",
        },
        {
            "name": "description",
            "dataType": ["text"],
            "description": "Text description of the product.",
        }
    ],
    "vectorIndexConfig": {
        "distance": "cosine", # 根据模型选择合适的距离度量
    },
    # 定义多个命名向量
    "vectorIndexType": "hnsw",
    "vectors": {
        "text_vector": {
            "vectorIndexConfig": {
                "distance": "cosine",
            }
        },
        "image_vector": {
            "vectorIndexConfig": {
                "distance": "cosine",
            }
        }
    }
}

# 创建集合 (如果不存在)
try:
    client.schema.create_class(class_obj)
    print("Class 'Product' created successfully.")
except weaviate.exceptions.UnexpectedStatusCodeException as e:
    if "already exists" in str(e):
        print("Class 'Product' already exists.")
    else:
        raise e

一个关键点是"vectorizer": "none"。我们不在Weaviate内部进行向量化,因为我们的逻辑(可能涉及多模态模型)更复杂,在Lambda中处理给予我们更大的灵活性。我们还定义了两个命名向量text_vectorimage_vector

4. Buildah 构建 Lambda 容器镜像

这是将所有依赖项打包的关键步骤。

# Containerfile
# Stage 1: Builder with full dependencies
FROM python:3.11-slim as builder

WORKDIR /app

# 安装 Buildah/Podman 可能需要的依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential gcc

# 安装Python依赖到一个临时目录
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && \
    pip install --no-cache-dir --target=/app/packages -r requirements.txt

# Stage 2: Final image for Lambda
FROM public.ecr.aws/lambda/python:3.11

# 设置工作目录
WORKDIR ${LAMBDA_TASK_ROOT}

# 从builder阶段复制已安装的包
COPY --from=builder /app/packages ./

# 复制Lambda处理器代码
COPY ./lambda_function.py ./

# 设置处理器
CMD [ "lambda_function.handler" ]

requirements.txt可能包含:

pulsar-client[avro]>=3.2.0
weaviate-client>=3.25.0
sentence-transformers>=2.2.2
# pillow, etc. for image processing
boto3

使用Buildah构建和推送:

#!/bin/bash
set -e # Exit immediately if a command exits with a non-zero status.

IMAGE_NAME="realtime-vectorizer"
AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
AWS_REGION="us-east-1"
ECR_REPO="${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/${IMAGE_NAME}"
IMAGE_TAG="latest"

echo "Building container image..."
# 使用'build-using-dockerfile'的缩写 'bud'
# --tag 指定镜像名称和标签
# . 指定构建上下文为当前目录
buildah bud --tag "${ECR_REPO}:${IMAGE_TAG}" .

echo "Logging into ECR..."
aws ecr get-login-password --region ${AWS_REGION} | \
  buildah login --username AWS --password-stdin "${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com"

echo "Pushing image to ECR..."
buildah push "${ECR_REPO}:${IMAGE_TAG}"

这个脚本展示了在CI/CD流程中如何使用buildah来替代docker。它不依赖于一个运行中的守护进程,更轻量、更安全。

5. AWS Lambda 处理器核心代码

这是整个流式处理管道的核心。

# file: lambda_function.py
import os
import json
import logging
import base64
import pulsar
from pulsar.schema import AvroSchema
import weaviate
from sentence_transformers import SentenceTransformer

# --- Configuration ---
# 在生产中,这些应该通过Lambda环境变量或Secrets Manager注入
WEAVIATE_URL = os.environ.get("WEAVIATE_URL")
WEAVIATE_API_KEY = os.environ.get("WEAVIATE_API_KEY")
PULSAR_URL = os.environ.get("PULSAR_URL")
PULSAR_TOKEN = os.environ.get("PULSAR_JWT_TOKEN")

# --- Logging Setup ---
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# --- Global Initialization ---
# 这部分代码在Lambda冷启动时执行一次,后续调用会复用
try:
    logger.info("Initializing Weaviate client...")
    weaviate_client = weaviate.Client(
        url=WEAVIATE_URL,
        auth_client_secret=weaviate.AuthApiKey(api_key=WEAVIATE_API_KEY),
        timeout_config=(5, 15) # connect_timeout, read_timeout
    )
    logger.info("Initializing SentenceTransformer model...")
    # 假设模型被打包在容器镜像中
    text_model = SentenceTransformer('all-MiniLM-L6-v2', cache_folder='/tmp/models')
    logger.info("Initialization complete.")
except Exception as e:
    logger.fatal(f"Fatal error during initialization: {e}", exc_info=True)
    # 抛出异常以防止Lambda服务将此实例标记为健康
    raise e

# Pulsar客户端不适合在全局初始化,因为它维护着TCP连接
# 每次调用建立和关闭是更安全的模式,尽管会增加一点延迟
# 对于高性能场景,可能需要探索更复杂的连接管理策略

# Avro Schema 定义 (与.avsc文件匹配)
# 在真实项目中,这个类应该从一个共享库中导入
class Product(pulsar.schema.Record):
    product_id = pulsar.schema.Long()
    description = pulsar.schema.String(required=False)
    image_path = pulsar.schema.String(required=False)
    last_updated_at = pulsar.schema.Long()

class ProductChangeEvent(pulsar.schema.Record):
    before = pulsar.schema.Record(Product, required=False)
    after = pulsar.schema.Record(Product, required=False)
    # ... other fields

def generate_vectors(product_data):
    """为产品数据生成文本和图像向量。"""
    # 此处为简化示例,仅处理文本
    if product_data and product_data.description:
        try:
            text_vector = text_model.encode(product_data.description).tolist()
            # 图像向量化逻辑会更复杂:
            # 1. 从S3或其他存储下载 image_path
            # 2. 使用图像模型生成向量
            image_vector = [0.0] * 384 # 伪代码,用0向量占位
            return text_vector, image_vector
        except Exception as e:
            logger.error(f"Error generating vectors for product {product_data.product_id}: {e}", exc_info=True)
            return None, None
    return None, None

def handler(event, context):
    """
    Lambda主处理器,由Pulsar事件触发。
    'event'的结构由Lambda的Pulsar事件源映射决定。
    """
    # Pulsar事件通常是base64编码的
    try:
        message_payload = base64.b64decode(event['data'])
        # 使用AvroSchema反序列化
        # 注意:这里我们假设一个简单的反序列化过程。
        # 在生产中,你需要Pulsar的AvroSchema来正确处理
        # Schema演进,可能涉及查询Schema Registry。
        # 为了简单起见,我们在这里直接用json加载Debezium的输出格式。
        cdc_event = json.loads(message_payload.decode('utf-8'))
        payload = cdc_event.get('payload', {})
        op = payload.get('op')

        if op in ('c', 'u', 'r'): # create, update, read (for snapshots)
            data_record = payload.get('after')
            if not data_record:
                logger.warning(f"Received '{op}' event with no 'after' data. Skipping.")
                return {'statusCode': 200}
            
            product_id = data_record['product_id']
            logger.info(f"Processing '{op}' event for product_id: {product_id}")

            # 模拟从反序列化记录中获取数据
            class MockProduct:
                def __init__(self, d): self.__dict__.update(d)
            product_obj = MockProduct(data_record)

            text_vec, image_vec = generate_vectors(product_obj)

            if text_vec is None:
                raise ValueError("Failed to generate vectors.")

            # 使用Weaviate的原子性upsert操作
            weaviate_client.data_object.create(
                data_object={
                    "product_id": product_id,
                    "description": product_obj.description,
                },
                class_name="Product",
                uuid=weaviate.util.generate_uuid5({'product_id': product_id}),
                vectors={
                    "text_vector": text_vec,
                    "image_vector": image_vec,
                }
            )
            logger.info(f"Successfully upserted vectors for product_id: {product_id}")

        elif op == 'd':
            data_record = payload.get('before')
            if not data_record:
                logger.warning("Received 'd' event with no 'before' data. Skipping.")
                return {'statusCode': 200}

            product_id = data_record['product_id']
            logger.info(f"Processing 'd' event for product_id: {product_id}")
            
            uuid_to_delete = weaviate.util.generate_uuid5({'product_id': product_id})
            weaviate_client.data_object.delete(
                uuid=uuid_to_delete,
                class_name="Product",
            )
            logger.info(f"Successfully deleted vectors for product_id: {product_id}")

    except Exception as e:
        logger.error(f"Failed to process Pulsar message: {e}", exc_info=True)
        # 抛出异常,让Lambda服务根据重试策略处理此事件
        # 如果多次重试失败,事件最终会进入配置的DLQ
        raise e

    return {'statusCode': 200}

代码中的关键考量:

  • 初始化分离: 耗时的操作(加载模型、创建客户端)放在handler函数之外,利用Lambda的执行上下文复用。
  • 幂等性: weaviate_client.data_object.create 配合基于product_id生成的确定性UUID,实现了upsert(更新或插入)的效果。即使同一条消息被重复处理,结果也是一致的。
  • 错误处理: 任何处理失败都会抛出异常。Lambda会根据配置进行重试。多次失败后,消息应被发送到Pulsar的死信队列(Dead-Letter Queue, DLQ),以便进行人工干预或离线分析,避免有问题的消息阻塞整个管道。
  • 向量化逻辑: 图像向量化被抽象出来,在真实场景中,这可能涉及从S3下载图片,是一个IO密集型操作,需要仔细管理超时。

架构的局限性与未来迭代

尽管该架构满足了核心需求,但它并非没有缺点,一个成熟的团队必须正视这些局限。

  1. 处理顺序:AWS Lambda和Pulsar的组合不保证消息的严格顺序处理。如果一个产品在短时间内被更新两次,对应的两个Lambda实例可能并发执行,后一个更新可能先于前一个更新写入Weaviate。对于我们的场景,这通常可以接受(最终会达到一致),但对于需要严格顺序的业务(如金融交易),则需要引入基于product_id的分区消费策略(Pulsar的Key_Shared订阅模式)来解决。

  2. 事务性保证:从PostgreSQL的COMMIT到Weaviate的写入完成,整个流程不是一个原子事务。如果Lambda函数在写入Weaviate后、确认消息前崩溃,消息可能会被重新处理,导致重复写入(尽管我们的设计是幂等的)。更糟糕的是,如果CDC或Pulsar出现问题,数据可能会在管道中丢失。这意味着系统只能保证最终一致性,而非强一致性。

  3. 数据回填:此架构完美地处理了增量更新。但如何处理存量的数百万条产品数据?这需要一个独立的、一次性的批处理任务,直接读取PostgreSQL,生成向量并批量导入Weaviate。这个回填过程的设计和执行同样充满挑战,需要考虑对数据库的压力控制和导入过程中的错误处理。

  4. 成本监控:Serverless虽然成本效益高,但也可能失控。一次错误的数据库大批量更新可能会触发数百万次Lambda调用,产生意料之外的账单。必须配置严格的预算告警和对Pulsar主题消费速率的监控。

未来的优化路径可能包括将Pulsar Functions作为Lambda的替代方案,以减少跨云网络开销;或者引入一个中间的流处理引擎(如Flink)来执行更复杂的状态转换和聚合,然后再写入Weaviate。


  目录