我们的机器学习模型推理服务遇到了一个棘手的性能瓶颈。它需要实时访问两种截然不同的特征数据:一种是基于时间窗口的用户行为聚合特征(例如,“过去15分钟内用户的点击次数”),另一种是基于内容的复杂文本与向量化特征(例如,“搜索与用户历史画像最相关的商品”)。最初的架构试图用单一的 PostgreSQL 实例来满足这两种查询需求,结果是在高并发下,复杂的 JOIN
和全文搜索查询严重拖慢了时间窗口函数的计算,反之亦然。
这是一个典型的查询模式冲突问题。时间序列查询需要高效的数据分区、索引和专为时序优化的函数;而搜索查询则需要倒排索引、相关性评分和向量相似度计算。强行将它们糅合在同一个存储引擎中,最终只会得到一个在两个方面都表现平庸的系统。我们需要重新设计整个特征管道,从数据注入到存储,再到最终的查询服务。
架构决策:从同步请求到事件驱动,从单一存储到混合存储
在真实项目中,任何架构决策都是一系列权衡。我们评估了两种核心方案。
方案A:增强型同步请求-响应模型
这个方案试图在现有架构上进行改良。模型服务通过 RPC 直接调用一个特征服务。该服务收到请求后,实时从多个数据源(可能是经过优化的 PostgreSQL、一个独立的搜索引擎)拉取原始数据,动态计算特征,然后返回结果。
- 优点: 架构相对简单,逻辑内聚。对于某些特征,可以保证数据绝对最新。
- 缺点:
- 高延迟: 每次请求都需要实时计算,无法满足我们对 p99 延迟 < 50ms 的要求。
- 计算资源浪费: 同样的时间窗口聚合可能会被重复计算成千上万次。
- 系统强耦合: 推理服务与底层数据源紧密耦合,任何数据源的抖动都会直接冲击在线服务。
方案B:事件驱动的混合存储管道 (CQRS 变体)
这个方案彻底改变了数据流。我们将特征的计算与查询分离。系统中的原始行为事件(如点击、浏览、下单)被投递到一个消息队列中。一个独立的消费者服务订阅这些事件,实时地预计算特征,并将它们写入为不同查询模式优化的多个存储系统中。查询服务则直接从这些预计算好的、优化的存储中读取数据。
- 优点:
- 低延迟查询: 查询时无需复杂计算,直接读取预计算结果。
- 系统解耦: 通过消息队列将数据生产者与消费者解耦,提高了系统的弹性和可扩展性。
- 专用存储: 可以为每种查询模式选择最合适的存储引擎,发挥其最大效能。
- 缺点:
- 架构复杂性增加: 引入了消息队列、多个数据存储和消费者服务。
- 数据一致性: 写入多个系统时需要处理最终一致性问题。
- 数据延迟: 查询到的特征数据总会比实时事件有微小延迟(通常在毫秒到秒级)。
对于我们的场景,推理服务的低延迟是首要目标。因此,我们最终选择了 方案B。这引出了一系列技术选型决策:
消息队列: 我们需要一个支持持久化、分区、以及独立消费确认机制的系统。Apache Pulsar 成为了首选。它的分层存储(BookKeeper + Tiered Storage)和 Topic 级别策略控制非常适合我们这种多业务线、多数据等级的场景。相比 Kafka,Pulsar 的原生多租户和 Geo-replication 特性为未来的平台化演进留下了空间。
时间序列存储: 对于时间窗口聚合查询,TimescaleDB 是一个理想选择。它作为 PostgreSQL 的插件,让我们能在享受 PG 生态的同时,获得自动化的时序表分区(Hypertables)、高效的压缩以及强大的时序分析函数(如
time_bucket
,gap_fill
)。搜索与复杂查询存储: 对于文本、JSON 和向量特征的搜索,OpenSearch (一个 Elasticsearch 的开源分支) 是不二之选。其强大的倒排索引和聚合能力,以及对向量搜索的支持,完美匹配我们的需求。
消费与查询服务: 我们需要一个轻量级、高性能的异步框架来构建消费者和 API 服务。Ktor 基于 Kotlin Coroutines,提供了出色的并发性能和简洁的 DSL,非常适合构建 IO 密集型的网络应用。
部署与运维: 整个系统将运行在 Kubernetes 上。为了实现声明式的、可追溯的部署,我们采用 GitOps 模式,使用 Argo CD 来同步 Git 仓库中的配置与集群状态。
整体架构如下:
graph TD subgraph "事件源 (Event Sources)" AppServer[应用服务] DataPipeline[离线数据管道] end subgraph "消息队列 (Message Queue)" Pulsar(Pulsar Topic: user_events) end subgraph "特征处理与写入服务 (Ktor)" id1(Feature Ingestion Service) end subgraph "混合存储 (Hybrid Storage)" TimescaleDB[(TimescaleDB
Hypertables)] OpenSearch[(OpenSearch
Indexes)] end subgraph "特征查询服务 (Ktor)" id2(Feature Query Service API) end subgraph "消费方 (Consumers)" MLModel[机器学习模型] end AppServer -- "原始行为事件" --> Pulsar DataPipeline -- "批量事件" --> Pulsar Pulsar -- "订阅事件" --> id1 id1 -- "写入时序特征" --> TimescaleDB id1 -- "写入搜索特征" --> OpenSearch MLModel -- "HTTP/gRPC 请求" --> id2 id2 -- "查询时序聚合" --> TimescaleDB id2 -- "查询复杂特征" --> OpenSearch
核心实现:Ktor 特征注入服务
这是整个管道的核心。它负责消费 Pulsar 的消息,解析事件,然后将数据分别写入 TimescaleDB 和 OpenSearch。
1. 项目设置与依赖
我们使用 Gradle 构建 Ktor 项目。关键依赖包括 Ktor 客户端/服务端库、Pulsar 客户端、Kotlinx Serialization、Exposed (Kotlin SQL 框架) 和 OpenSearch Java 客户端。
build.gradle.kts
:
// ... plugins and repositories
dependencies {
// Ktor Core
implementation("io.ktor:ktor-server-core-jvm:$ktor_version")
implementation("io.ktor:ktor-server-netty-jvm:$ktor_version")
implementation("io.ktor:ktor-server-content-negotiation:$ktor_version")
implementation("io.ktor:ktor-serialization-kotlinx-json:$ktor_version")
// Pulsar Client
implementation("org.apache.pulsar:pulsar-client:$pulsar_version")
// Database: Exposed for TimescaleDB
implementation("org.jetbrains.exposed:exposed-core:$exposed_version")
implementation("org.jetbrains.exposed:exposed-dao:$exposed_version")
implementation("org.jetbrains.exposed:exposed-jdbc:$exposed_version")
implementation("org.postgresql:postgresql:42.5.0") // PG driver
// OpenSearch Client
implementation("org.opensearch.client:opensearch-java:$opensearch_version")
implementation("com.fasterxml.jackson.core:jackson-databind:2.13.3") // Required by OpenSearch client
// Logging
implementation("ch.qos.logback:logback-classic:$logback_version")
// ... test dependencies
}
2. Pulsar 消费者实现
我们需要一个健壮的、可容错的消费者。这里的关键是利用 Kotlin Coroutines 来实现高并发处理,同时确保正确的消息确认 (ack) 逻辑。
import io.ktor.server.config.*
import kotlinx.coroutines.*
import kotlinx.serialization.json.Json
import org.apache.pulsar.client.api.*
import org.slf4j.LoggerFactory
import java.io.Closeable
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
// 数据模型,与事件JSON结构对应
@kotlinx.serialization.Serializable
data class UserEvent(
val userId: String,
val eventType: String,
val timestamp: Long,
val properties: Map<String, String>
)
// 数据写入服务的接口
interface FeatureWriter {
suspend fun write(event: UserEvent)
}
class PulsarEventConsumer(
private val config: ApplicationConfig,
private val featureWriter: FeatureWriter
) : Closeable {
private val logger = LoggerFactory.getLogger(javaClass)
private val pulsarClient: PulsarClient
private val consumer: Consumer<ByteArray>
// 使用一个固定大小的 Coroutine Dispatcher 来限制并发处理的消息数
private val processingDispatcher = Executors.newFixedThreadPool(
config.property("pulsar.processing.concurrency").getString().toInt()
).asCoroutineDispatcher()
private val consumerScope = CoroutineScope(SupervisorJob() + processingDispatcher)
init {
val serviceUrl = config.property("pulsar.serviceUrl").getString()
val topicName = config.property("pulsar.topicName").getString()
val subscriptionName = config.property("pulsar.subscriptionName").getString()
pulsarClient = PulsarClient.builder()
.serviceUrl(serviceUrl)
.build()
consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared) // 多个 Pod 实例可以共享消费
.ackTimeout(1, TimeUnit.MINUTES) // 适当增加ack超时,防止复杂处理导致重试
.deadLetterPolicy(
DeadLetterPolicy.builder()
.maxRedeliverCount(5) // 最多重试5次
.deadLetterTopic("${topicName}-dlq") // 发送到死信队列
.build()
)
.subscribe()
logger.info("Pulsar consumer started for topic '$topicName' with subscription '$subscriptionName'")
}
fun start() {
consumerScope.launch {
while (isActive) {
try {
// 异步接收消息
val message = consumer.receiveAsync().await()
// 在独立的Job中处理,避免一个消息的处理失败阻塞整个循环
launch {
processMessage(message)
}
} catch (e: PulsarClientException.AlreadyClosedException) {
logger.warn("Consumer is closed. Exiting receive loop.")
break
} catch (e: Exception) {
logger.error("Error receiving message from Pulsar. Retrying...", e)
delay(1000) // 避免快速失败循环
}
}
}
}
private suspend fun processMessage(message: Message<ByteArray>) {
try {
val event = Json.decodeFromString<UserEvent>(String(message.data))
// 核心逻辑:调用写入服务
featureWriter.write(event)
// 处理成功,确认消息
consumer.acknowledgeAsync(message.messageId).await()
logger.debug("Successfully processed and acknowledged message: ${message.messageId}")
} catch (e: Exception) {
// 这里的坑:任何处理异常都必须捕获,否则协程会失败,可能导致消费停止
logger.error("Failed to process message ${message.messageId}. It will be negatively acknowledged.", e)
// 发送否定确认,Pulsar会根据策略重发或发送到DLQ
consumer.negativeAcknowledge(message.messageId)
}
}
override fun close() {
logger.info("Closing Pulsar consumer...")
consumerScope.cancel()
consumer.close()
pulsarClient.close()
processingDispatcher.close()
logger.info("Pulsar consumer closed.")
}
}
// 使用 CompletableFuture.await() 扩展函数
suspend fun <T> java.util.concurrent.CompletableFuture<T>.await(): T =
suspendCancellableCoroutine { cont ->
whenComplete { result, exception ->
if (exception == null) {
cont.resume(result) {}
} else {
cont.resumeWithException(exception)
}
}
}
这个消费者有几个关键点:
- 并发控制: 使用固定大小的
CoroutineDispatcher
(processingDispatcher
) 来防止过多的消息同时处理,避免压垮下游数据库。 - 容错: 设置了
DeadLetterPolicy
。如果一个消息处理失败超过5次,它会被自动发送到死信队列,避免有毒消息阻塞整个管道。 - 生命周期管理: 实现了
Closeable
接口,确保在应用关闭时能优雅地释放资源。 - 异步化:
receiveAsync
和acknowledgeAsync
配合await()
扩展函数,无缝地将 Pulsar 的CompletableFuture
API 融入协程世界。
3. 混合存储写入器
FeatureWriter
的实现需要同时写入 TimescaleDB 和 OpenSearch。这是一个挑战,因为它们不是一个原子操作。在真实项目中,这里可能需要引入 Saga 模式或至少保证幂等性来处理部分失败。
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction
import org.opensearch.client.opensearch.OpenSearchAsyncClient
import org.opensearch.client.opensearch.core.IndexRequest
import java.time.Instant
import java.time.ZoneOffset
class HybridStorageFeatureWriter(
private val db: Database,
private val osClient: OpenSearchAsyncClient
) : FeatureWriter {
private val logger = LoggerFactory.getLogger(javaClass)
private val indexName = "user-features"
// Exposed Table Definition for TimescaleDB
object UserEventsTable : Table("user_events") {
val timestamp = long("time").index()
val userId = varchar("user_id", 128)
val eventType = varchar("event_type", 64)
val properties = text("properties") // Store as JSONB in production
}
init {
// 在生产环境中,表结构应该由迁移工具管理 (e.g., Flyway, Liquibase)
// 这里只是为了演示
TransactionManager.manager.defaultIsolationLevel = java.sql.Connection.TRANSACTION_SERIALIZABLE
transaction(db) {
SchemaUtils.create(UserEventsTable)
// 最关键的一步:将普通表转换为超表 (Hypertable)
exec("SELECT create_hypertable('user_events', 'time', if_not_exists => TRUE, chunk_time_interval => 86400000);") // 1 day interval
}
}
override suspend fun write(event: UserEvent) {
// 使用 withContext 确保数据库和OS客户端在正确的IO线程池中执行
withContext(Dispatchers.IO) {
// 写入操作不是原子的,需要考虑补偿逻辑
// 1. 写入 OpenSearch
writeToOpenSearch(event)
// 2. 写入 TimescaleDB
writeToTimescaleDB(event)
}
}
private suspend fun writeToOpenSearch(event: UserEvent) {
val featureDoc = mapOf(
"user_id" to event.userId,
"event_type" to event.eventType,
"@timestamp" to Instant.ofEpochMilli(event.timestamp).toString(),
"properties" to event.properties,
// 假设我们在这里生成了一个向量特征
"feature_vector" to generateDummyVector()
)
val request = IndexRequest.Builder<Map<String, Any>>()
.index(indexName)
.id("${event.userId}_${event.timestamp}") // 构建幂等ID
.document(featureDoc)
.build()
try {
val response = osClient.index(request).await()
logger.debug("Indexed doc ${response.id()} to OpenSearch. Result: ${response.result()}")
} catch (e: Exception) {
logger.error("Failed to index document to OpenSearch for user ${event.userId}", e)
// 这里的坑:写入OpenSearch失败,但可能已经写入TimescaleDB,或反之。
// 生产系统需要一个补偿任务或重试机制来修复这种不一致。
throw e // 向上抛出异常,让消费者 NACK 消息
}
}
private suspend fun writeToTimescaleDB(event: UserEvent) {
newSuspendedTransaction(db = db) {
UserEventsTable.insert {
it[timestamp] = event.timestamp
it[userId] = event.userId
it[eventType] = event.eventType
it[properties] = Json.encodeToString(event.properties)
}
}
}
private fun generateDummyVector(): FloatArray = FloatArray(128) { Math.random().toFloat() }
}
这里的关键权衡:
- TimescaleDB 设置:
create_hypertable
是 TimescaleDB 的核心。它会自动根据time
列将大表在物理上切分成小块(chunks),使得时间范围查询和数据维护(如删除旧数据)极为高效。 - OpenSearch 映射: (未在代码中展示) 在生产中,你需要为
user-features
索引定义一个明确的 mapping,特别是将feature_vector
字段类型设置为knn_vector
以启用高效的向量搜索。 - 数据一致性: 代码中明确指出了写入非原子的风险。一个常见的错误是忽略这种部分失败,导致数据源不一致。我们的策略是,任何一步失败都向上抛出异常,让 Pulsar 消费者
negativeAcknowledge
消息,触发重试。结合 OpenSearch 文档的幂等ID,重试可以安全地覆盖旧数据。
部署:基于 Argo CD 的 GitOps 流程
代码写完后,我们需要可靠地将其部署到 Kubernetes。我们不直接使用 kubectl apply
,而是将所有 Kubernetes 清单文件(Deployment, Service, ConfigMap 等)存储在一个 Git 仓库中。
Git 仓库结构示例:
/apps
/feature-ingestion-service
/base
kustomization.yaml
deployment.yaml
service.yaml
configmap.yaml
/overlays
/staging
kustomization.yaml
configmap-patch.yaml
deployment-patch.yaml
/production
kustomization.yaml
configmap-patch.yaml
deployment-patch.yaml
configmap.yaml
(base):
apiVersion: v1
kind: ConfigMap
metadata:
name: feature-ingestion-config
data:
# 使用 application.conf 格式
application.conf: |
pulsar {
serviceUrl = "pulsar://pulsar.default.svc.cluster.local:6650"
topicName = "persistent://public/default/user_events"
subscriptionName = "feature-ingestion-subscription"
processing.concurrency = 16
}
database {
url = "jdbc:postgresql://timescaledb.default.svc.cluster.local:5432/features"
user = "user"
# 密码等敏感信息应使用 Kubernetes Secrets
}
opensearch {
host = "opensearch-cluster-master.default.svc.cluster.local"
port = 9200
}
deployment-patch.yaml
(production overlay):
apiVersion: apps/v1
kind: Deployment
metadata:
name: feature-ingestion-service
spec:
replicas: 5 # 生产环境增加副本数
template:
spec:
containers:
- name: service
resources:
requests:
cpu: "1"
memory: "2Gi"
limits:
cpu: "2"
memory: "2Gi"
Argo CD Application 定义:
然后,我们在 Argo CD 中创建一个 Application
CRD,指向这个 Git 仓库。
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: feature-ingestion-prod
namespace: argocd
spec:
project: default
source:
repoURL: 'https://github.com/your-org/infra-repo.git'
path: apps/feature-ingestion-service/overlays/production
targetRevision: HEAD
destination:
server: 'https://kubernetes.default.svc'
namespace: production
syncPolicy:
automated:
prune: true
selfHeal: true
syncOptions:
- CreateNamespace=true
这个流程的好处是:
- 版本控制: 基础设施的每一次变更都有 Git commit 记录,可追溯、可审计。
- 自动化与一致性: Argo CD 会持续监控 Git 仓库。一旦有新的 commit push 到
HEAD
,它会自动将变更同步到集群,确保集群状态与 Git 中定义的期望状态一致。 - 安全性: 开发人员无需直接访问 Kubernetes API Server,所有变更通过 Pull Request 进行审查。
局限性与未来迭代路径
这个架构解决了我们最初的查询性能问题,但它并非银弹。
数据一致性挑战: 虽然重试和幂等性写入缓解了部分问题,但在极端情况下(例如,一个服务长时间宕机后恢复),仍然可能出现数据不一致。一个更健壮的方案是引入一个协调服务或使用 Debezium 从 TimescaleDB 进行 CDC,将数据同步到 OpenSearch,从而建立单一数据源真相。
特征回填 (Backfilling): 这个事件驱动的管道非常适合处理实时数据,但对于历史数据的回填(例如,上线一个新特征,需要用过去一年的数据计算它)并不高效。通常需要一个并行的、基于批处理的路径(如 Spark Job)来直接读写数据存储。
Schema 演进: 当
UserEvent
的结构发生变化时,需要小心地管理 Pulsar Topic 的 Schema、数据库表的迁移和 OpenSearch mapping 的更新,以确保向后兼容。
未来的优化方向可能包括:为 Ktor 查询服务增加一个缓存层(如 Redis)来处理热点用户特征的访问;引入 KEDA (Kubernetes-based Event-Driven Autoscaling) 来根据 Pulsar Topic 的积压消息数自动伸缩消费者 Pod 的数量,从而更高效地应对流量高峰。