业务需求很明确:当核心PostgreSQL
数据库中的产品信息(包括文本描述和关联图片元数据)发生任何变更时,必须在100毫秒内更新其在向量搜索引擎中的表示,以供推荐系统和语义搜索使用。这个延迟指标是硬性的,直接影响用户体验。
方案A:可预测的批处理 ETL
第一个进入脑海的方案是传统的批处理。我们可以设置一个定时任务,例如每分钟执行一次,拉取过去一分钟内PostgreSQL
中所有被修改过的products
记录。
-- 伪代码:定时任务执行的查询
SELECT
product_id,
description,
image_path,
last_updated_at
FROM
products
WHERE
last_updated_at > :last_execution_time;
接着,一个独立的计算服务(可能是一个EC2实例或Fargate任务)会批量处理这些记录:
- 为每条记录的
description
生成文本向量。 - 根据
image_path
加载图片,生成图像向量。 - 将文本向量和图像向量合并或分别存入
Weaviate
。
优点:
- 实现简单:逻辑清晰,易于开发和调试。
- 吞吐量优化:批量数据库读取和批量写入
Weaviate
通常比逐条处理效率更高。 - 成本可控:计算资源在任务执行时才被充分利用。
缺点:
- 延迟不可接受:最坏情况下,延迟是整个批处理周期(例如1分钟)加上处理时间。这完全违背了100毫含秒的硬性指标。
- 数据库压力:定时的大规模轮询查询会对主库产生周期性的压力冲击,尤其是在写入高峰期。
- 资源浪费:在两次任务间隔期间,计算资源处于闲置状态,而一旦任务启动,又可能需要大量资源来应对峰值。
在真实项目中,这种方案对于非实时场景是完全可行的,但对于我们当前面临的低延迟要求,它在设计阶段就已经被否决。
方案B:事件驱动的流式架构
为了满足亚秒级延迟,唯一的选择是走向事件驱动。当数据在源头发生变化时,我们必须立即捕获并处理它,而不是等待轮询。
这个架构的核心思想是利用数据库的变更数据捕获(Change Data Capture, CDC)能力。
graph TD A[PostgreSQL] -- Logical Replication --> B{Debezium/CDC Connector} B -- CDC Events --> C[Apache Pulsar Topic: pg.products.changes] C -- Message Trigger --> D[AWS Lambda Function] D -- Process & Vectorize --> E[Weaviate] D -- DLQ --> F[Pulsar Topic: vectorization_dlq] subgraph "Container Image Build" G[Dockerfile/Containerfile] -- buildah bud --> H[Optimized Container Image] H -- buildah push --> I[Amazon ECR] end I -- Deployed to --> D
架构流程:
- PostgreSQL 开启逻辑复制。任何对
products
表的INSERT
,UPDATE
,DELETE
操作都会生成一条WAL(Write-Ahead Log)记录。 - 一个CDC工具(例如Debezium,可以作为Pulsar的Source Connector运行)监听这些WAL记录,并将它们转换为结构化的JSON或Avro格式的事件。
- 这些事件被实时发布到Apache Pulsar的一个特定主题中,例如
pg.products.changes
。 - 该Pulsar主题配置为AWS Lambda的事件源。每当有新消息到达,就会触发一个Lambda函数实例。
- Lambda函数负责消费事件,解析出变更前后的数据,调用模型服务生成向量,然后将向量写入Weaviate。
- 为了打包复杂的依赖(如大型机器学习模型),Lambda函数本身将作为一个容器镜像来部署。我们使用Buildah来构建一个轻量级、高度优化的镜像,并推送到ECR。
技术选型决策:
为什么是Pulsar?
- 分层存储:Pulsar可以将旧消息透明地卸载到S3等廉价对象存储中。这意味着我们可以将CDC事件流永久保留,用于未来的数据回溯、模型重新训练或系统重建,而无需担心昂贵的存储成本。这对于一个务实的生产系统来说是巨大的优势。
- **函数式计算集成 (Pulsar Functions)**:虽然我们选择了Lambda,但Pulsar原生支持Pulsar Functions,这为未来将计算逻辑移到Pulsar集群内部以降低网络延迟和成本提供了可能。
- 强大的多租户和隔离性:在公司层面,Pulsar可以轻松地为不同业务线划分命名空间和租户,确保资源隔离。
为什么是AWS Lambda + Buildah?
- 成本效益:对于事件驱动的稀疏或突发流量,Lambda的按调用付费模型远比维护一个24/7运行的消费者服务要经济。
- 可扩展性:Lambda可以根据Pulsar主题的消息速率自动扩展,我们无需关心底层服务器的管理。
- Buildah的必要性:向量化通常需要加载模型库(如
sentence-transformers
,timm
)和模型权重文件。这些依赖轻松就能超过Lambda Layer的250MB解压后大小限制。使用容器镜像部署Lambda(上限10GB)是唯一的选择。而Buildah
相比Docker
,是一个无守护进程的工具,更适合在CI/CD环境中运行,它能构建出符合OCI标准的镜像,且多阶段构建可以创建出极致精简的生产镜像。
这个方案在架构上更复杂,但它直接解决了核心的延迟问题,并将系统解耦,每一部分都可以独立扩展和维护。这是满足业务需求的唯一可行路径。
核心实现概览
1. PostgreSQL CDC 配置
首先,必须在postgresql.conf
中启用逻辑复制,并确保wal_level
设置为logical
。然后通过SQL配置发布。
-- file: setup_pg_cdc.sql
-- 确保用户拥有复制权限
ALTER USER replicator WITH REPLICATION;
-- 为'products'表创建一个发布,只有这个表的变化才会被广播
-- REPLICA IDENTITY FULL 确保UPDATE和DELETE事件包含旧的行数据,这对于处理逻辑至关重要
ALTER TABLE products REPLICA IDENTITY FULL;
CREATE PUBLICATION products_pub FOR TABLE products;
-- 验证发布已创建
SELECT pubname FROM pg_publication;
这里的REPLICA IDENTITY FULL
是一个关键的生产实践。没有它,UPDATE
事件可能只包含变更的列,而不是完整的行镜像,这会给下游消费逻辑带来极大困难。
2. Pulsar 主题与Schema
为CDC事件流定义一个清晰的Schema是保证数据质量和系统演进的关键。我们使用Avro。
// file: product_change_event.avsc
{
"type": "record",
"name": "ProductChangeEvent",
"namespace": "com.example.events",
"fields": [
{
"name": "before",
"type": ["null", {
"type": "record",
"name": "Product",
"fields": [
{"name": "product_id", "type": "long"},
{"name": "description", "type": ["null", "string"]},
{"name": "image_path", "type": ["null", "string"]},
{"name": "last_updated_at", "type": "long", "logicalType": "timestamp-micros"}
]
}],
"default": null
},
{
"name": "after",
"type": ["null", "Product"],
"default": null
},
{
"name": "source",
"type": {
"type": "record",
"name": "Source",
"fields": [
{"name": "ts_ms", "type": "long"},
{"name": "db", "type": "string"},
{"name": "schema", "type": "string"},
{"name": "table", "type": "string"}
]
}
},
{
"name": "op",
"type": "string"
}
]
}
before
和after
字段分别代表变更前后的数据镜像,op
字段(c
for create, u
for update, d
for delete)指明了操作类型。
3. Weaviate Schema 定义
在Weaviate中,我们需要创建一个类来存储产品的向量。
# file: weaviate_schema_setup.py
import weaviate
import os
# 配置Weaviate客户端
client = weaviate.Client(
url=os.environ["WEAVIATE_URL"],
auth_client_secret=weaviate.AuthApiKey(api_key=os.environ["WEAVIATE_API_KEY"]),
)
class_obj = {
"class": "Product",
"description": "Stores product information with text and image vectors.",
"vectorizer": "none", # 我们在Lambda中自己生成向量
"properties": [
{
"name": "product_id",
"dataType": ["int"],
"description": "The unique identifier of the product.",
},
{
"name": "description",
"dataType": ["text"],
"description": "Text description of the product.",
}
],
"vectorIndexConfig": {
"distance": "cosine", # 根据模型选择合适的距离度量
},
# 定义多个命名向量
"vectorIndexType": "hnsw",
"vectors": {
"text_vector": {
"vectorIndexConfig": {
"distance": "cosine",
}
},
"image_vector": {
"vectorIndexConfig": {
"distance": "cosine",
}
}
}
}
# 创建集合 (如果不存在)
try:
client.schema.create_class(class_obj)
print("Class 'Product' created successfully.")
except weaviate.exceptions.UnexpectedStatusCodeException as e:
if "already exists" in str(e):
print("Class 'Product' already exists.")
else:
raise e
一个关键点是"vectorizer": "none"
。我们不在Weaviate内部进行向量化,因为我们的逻辑(可能涉及多模态模型)更复杂,在Lambda中处理给予我们更大的灵活性。我们还定义了两个命名向量text_vector
和image_vector
。
4. Buildah 构建 Lambda 容器镜像
这是将所有依赖项打包的关键步骤。
# Containerfile
# Stage 1: Builder with full dependencies
FROM python:3.11-slim as builder
WORKDIR /app
# 安装 Buildah/Podman 可能需要的依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential gcc
# 安装Python依赖到一个临时目录
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir --target=/app/packages -r requirements.txt
# Stage 2: Final image for Lambda
FROM public.ecr.aws/lambda/python:3.11
# 设置工作目录
WORKDIR ${LAMBDA_TASK_ROOT}
# 从builder阶段复制已安装的包
COPY /app/packages ./
# 复制Lambda处理器代码
COPY ./lambda_function.py ./
# 设置处理器
CMD [ "lambda_function.handler" ]
requirements.txt
可能包含:
pulsar-client[avro]>=3.2.0
weaviate-client>=3.25.0
sentence-transformers>=2.2.2
# pillow, etc. for image processing
boto3
使用Buildah
构建和推送:
#!/bin/bash
set -e # Exit immediately if a command exits with a non-zero status.
IMAGE_NAME="realtime-vectorizer"
AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
AWS_REGION="us-east-1"
ECR_REPO="${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/${IMAGE_NAME}"
IMAGE_TAG="latest"
echo "Building container image..."
# 使用'build-using-dockerfile'的缩写 'bud'
# --tag 指定镜像名称和标签
# . 指定构建上下文为当前目录
buildah bud --tag "${ECR_REPO}:${IMAGE_TAG}" .
echo "Logging into ECR..."
aws ecr get-login-password --region ${AWS_REGION} | \
buildah login --username AWS --password-stdin "${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com"
echo "Pushing image to ECR..."
buildah push "${ECR_REPO}:${IMAGE_TAG}"
这个脚本展示了在CI/CD流程中如何使用buildah
来替代docker
。它不依赖于一个运行中的守护进程,更轻量、更安全。
5. AWS Lambda 处理器核心代码
这是整个流式处理管道的核心。
# file: lambda_function.py
import os
import json
import logging
import base64
import pulsar
from pulsar.schema import AvroSchema
import weaviate
from sentence_transformers import SentenceTransformer
# --- Configuration ---
# 在生产中,这些应该通过Lambda环境变量或Secrets Manager注入
WEAVIATE_URL = os.environ.get("WEAVIATE_URL")
WEAVIATE_API_KEY = os.environ.get("WEAVIATE_API_KEY")
PULSAR_URL = os.environ.get("PULSAR_URL")
PULSAR_TOKEN = os.environ.get("PULSAR_JWT_TOKEN")
# --- Logging Setup ---
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# --- Global Initialization ---
# 这部分代码在Lambda冷启动时执行一次,后续调用会复用
try:
logger.info("Initializing Weaviate client...")
weaviate_client = weaviate.Client(
url=WEAVIATE_URL,
auth_client_secret=weaviate.AuthApiKey(api_key=WEAVIATE_API_KEY),
timeout_config=(5, 15) # connect_timeout, read_timeout
)
logger.info("Initializing SentenceTransformer model...")
# 假设模型被打包在容器镜像中
text_model = SentenceTransformer('all-MiniLM-L6-v2', cache_folder='/tmp/models')
logger.info("Initialization complete.")
except Exception as e:
logger.fatal(f"Fatal error during initialization: {e}", exc_info=True)
# 抛出异常以防止Lambda服务将此实例标记为健康
raise e
# Pulsar客户端不适合在全局初始化,因为它维护着TCP连接
# 每次调用建立和关闭是更安全的模式,尽管会增加一点延迟
# 对于高性能场景,可能需要探索更复杂的连接管理策略
# Avro Schema 定义 (与.avsc文件匹配)
# 在真实项目中,这个类应该从一个共享库中导入
class Product(pulsar.schema.Record):
product_id = pulsar.schema.Long()
description = pulsar.schema.String(required=False)
image_path = pulsar.schema.String(required=False)
last_updated_at = pulsar.schema.Long()
class ProductChangeEvent(pulsar.schema.Record):
before = pulsar.schema.Record(Product, required=False)
after = pulsar.schema.Record(Product, required=False)
# ... other fields
def generate_vectors(product_data):
"""为产品数据生成文本和图像向量。"""
# 此处为简化示例,仅处理文本
if product_data and product_data.description:
try:
text_vector = text_model.encode(product_data.description).tolist()
# 图像向量化逻辑会更复杂:
# 1. 从S3或其他存储下载 image_path
# 2. 使用图像模型生成向量
image_vector = [0.0] * 384 # 伪代码,用0向量占位
return text_vector, image_vector
except Exception as e:
logger.error(f"Error generating vectors for product {product_data.product_id}: {e}", exc_info=True)
return None, None
return None, None
def handler(event, context):
"""
Lambda主处理器,由Pulsar事件触发。
'event'的结构由Lambda的Pulsar事件源映射决定。
"""
# Pulsar事件通常是base64编码的
try:
message_payload = base64.b64decode(event['data'])
# 使用AvroSchema反序列化
# 注意:这里我们假设一个简单的反序列化过程。
# 在生产中,你需要Pulsar的AvroSchema来正确处理
# Schema演进,可能涉及查询Schema Registry。
# 为了简单起见,我们在这里直接用json加载Debezium的输出格式。
cdc_event = json.loads(message_payload.decode('utf-8'))
payload = cdc_event.get('payload', {})
op = payload.get('op')
if op in ('c', 'u', 'r'): # create, update, read (for snapshots)
data_record = payload.get('after')
if not data_record:
logger.warning(f"Received '{op}' event with no 'after' data. Skipping.")
return {'statusCode': 200}
product_id = data_record['product_id']
logger.info(f"Processing '{op}' event for product_id: {product_id}")
# 模拟从反序列化记录中获取数据
class MockProduct:
def __init__(self, d): self.__dict__.update(d)
product_obj = MockProduct(data_record)
text_vec, image_vec = generate_vectors(product_obj)
if text_vec is None:
raise ValueError("Failed to generate vectors.")
# 使用Weaviate的原子性upsert操作
weaviate_client.data_object.create(
data_object={
"product_id": product_id,
"description": product_obj.description,
},
class_name="Product",
uuid=weaviate.util.generate_uuid5({'product_id': product_id}),
vectors={
"text_vector": text_vec,
"image_vector": image_vec,
}
)
logger.info(f"Successfully upserted vectors for product_id: {product_id}")
elif op == 'd':
data_record = payload.get('before')
if not data_record:
logger.warning("Received 'd' event with no 'before' data. Skipping.")
return {'statusCode': 200}
product_id = data_record['product_id']
logger.info(f"Processing 'd' event for product_id: {product_id}")
uuid_to_delete = weaviate.util.generate_uuid5({'product_id': product_id})
weaviate_client.data_object.delete(
uuid=uuid_to_delete,
class_name="Product",
)
logger.info(f"Successfully deleted vectors for product_id: {product_id}")
except Exception as e:
logger.error(f"Failed to process Pulsar message: {e}", exc_info=True)
# 抛出异常,让Lambda服务根据重试策略处理此事件
# 如果多次重试失败,事件最终会进入配置的DLQ
raise e
return {'statusCode': 200}
代码中的关键考量:
- 初始化分离: 耗时的操作(加载模型、创建客户端)放在handler函数之外,利用Lambda的执行上下文复用。
- 幂等性:
weaviate_client.data_object.create
配合基于product_id
生成的确定性UUID,实现了upsert
(更新或插入)的效果。即使同一条消息被重复处理,结果也是一致的。 - 错误处理: 任何处理失败都会抛出异常。Lambda会根据配置进行重试。多次失败后,消息应被发送到Pulsar的死信队列(Dead-Letter Queue, DLQ),以便进行人工干预或离线分析,避免有问题的消息阻塞整个管道。
- 向量化逻辑: 图像向量化被抽象出来,在真实场景中,这可能涉及从S3下载图片,是一个IO密集型操作,需要仔细管理超时。
架构的局限性与未来迭代
尽管该架构满足了核心需求,但它并非没有缺点,一个成熟的团队必须正视这些局限。
处理顺序:AWS Lambda和Pulsar的组合不保证消息的严格顺序处理。如果一个产品在短时间内被更新两次,对应的两个Lambda实例可能并发执行,后一个更新可能先于前一个更新写入Weaviate。对于我们的场景,这通常可以接受(最终会达到一致),但对于需要严格顺序的业务(如金融交易),则需要引入基于
product_id
的分区消费策略(Pulsar的Key_Shared订阅模式)来解决。事务性保证:从PostgreSQL的
COMMIT
到Weaviate的写入完成,整个流程不是一个原子事务。如果Lambda函数在写入Weaviate后、确认消息前崩溃,消息可能会被重新处理,导致重复写入(尽管我们的设计是幂等的)。更糟糕的是,如果CDC或Pulsar出现问题,数据可能会在管道中丢失。这意味着系统只能保证最终一致性,而非强一致性。数据回填:此架构完美地处理了增量更新。但如何处理存量的数百万条产品数据?这需要一个独立的、一次性的批处理任务,直接读取PostgreSQL,生成向量并批量导入Weaviate。这个回填过程的设计和执行同样充满挑战,需要考虑对数据库的压力控制和导入过程中的错误处理。
成本监控:Serverless虽然成本效益高,但也可能失控。一次错误的数据库大批量更新可能会触发数百万次Lambda调用,产生意料之外的账单。必须配置严格的预算告警和对Pulsar主题消费速率的监控。
未来的优化路径可能包括将Pulsar Functions作为Lambda的替代方案,以减少跨云网络开销;或者引入一个中间的流处理引擎(如Flink)来执行更复杂的状态转换和聚合,然后再写入Weaviate。