基于 Pulsar, TimescaleDB 与 OpenSearch 构建事件驱动的混合存储特征管道


我们的机器学习模型推理服务遇到了一个棘手的性能瓶颈。它需要实时访问两种截然不同的特征数据:一种是基于时间窗口的用户行为聚合特征(例如,“过去15分钟内用户的点击次数”),另一种是基于内容的复杂文本与向量化特征(例如,“搜索与用户历史画像最相关的商品”)。最初的架构试图用单一的 PostgreSQL 实例来满足这两种查询需求,结果是在高并发下,复杂的 JOIN 和全文搜索查询严重拖慢了时间窗口函数的计算,反之亦然。

这是一个典型的查询模式冲突问题。时间序列查询需要高效的数据分区、索引和专为时序优化的函数;而搜索查询则需要倒排索引、相关性评分和向量相似度计算。强行将它们糅合在同一个存储引擎中,最终只会得到一个在两个方面都表现平庸的系统。我们需要重新设计整个特征管道,从数据注入到存储,再到最终的查询服务。

架构决策:从同步请求到事件驱动,从单一存储到混合存储

在真实项目中,任何架构决策都是一系列权衡。我们评估了两种核心方案。

方案A:增强型同步请求-响应模型

这个方案试图在现有架构上进行改良。模型服务通过 RPC 直接调用一个特征服务。该服务收到请求后,实时从多个数据源(可能是经过优化的 PostgreSQL、一个独立的搜索引擎)拉取原始数据,动态计算特征,然后返回结果。

  • 优点: 架构相对简单,逻辑内聚。对于某些特征,可以保证数据绝对最新。
  • 缺点:
    1. 高延迟: 每次请求都需要实时计算,无法满足我们对 p99 延迟 < 50ms 的要求。
    2. 计算资源浪费: 同样的时间窗口聚合可能会被重复计算成千上万次。
    3. 系统强耦合: 推理服务与底层数据源紧密耦合,任何数据源的抖动都会直接冲击在线服务。

方案B:事件驱动的混合存储管道 (CQRS 变体)

这个方案彻底改变了数据流。我们将特征的计算与查询分离。系统中的原始行为事件(如点击、浏览、下单)被投递到一个消息队列中。一个独立的消费者服务订阅这些事件,实时地预计算特征,并将它们写入为不同查询模式优化的多个存储系统中。查询服务则直接从这些预计算好的、优化的存储中读取数据。

  • 优点:
    1. 低延迟查询: 查询时无需复杂计算,直接读取预计算结果。
    • 系统解耦: 通过消息队列将数据生产者与消费者解耦,提高了系统的弹性和可扩展性。
    • 专用存储: 可以为每种查询模式选择最合适的存储引擎,发挥其最大效能。
  • 缺点:
    1. 架构复杂性增加: 引入了消息队列、多个数据存储和消费者服务。
    2. 数据一致性: 写入多个系统时需要处理最终一致性问题。
    3. 数据延迟: 查询到的特征数据总会比实时事件有微小延迟(通常在毫秒到秒级)。

对于我们的场景,推理服务的低延迟是首要目标。因此,我们最终选择了 方案B。这引出了一系列技术选型决策:

  1. 消息队列: 我们需要一个支持持久化、分区、以及独立消费确认机制的系统。Apache Pulsar 成为了首选。它的分层存储(BookKeeper + Tiered Storage)和 Topic 级别策略控制非常适合我们这种多业务线、多数据等级的场景。相比 Kafka,Pulsar 的原生多租户和 Geo-replication 特性为未来的平台化演进留下了空间。

  2. 时间序列存储: 对于时间窗口聚合查询,TimescaleDB 是一个理想选择。它作为 PostgreSQL 的插件,让我们能在享受 PG 生态的同时,获得自动化的时序表分区(Hypertables)、高效的压缩以及强大的时序分析函数(如 time_bucket, gap_fill)。

  3. 搜索与复杂查询存储: 对于文本、JSON 和向量特征的搜索,OpenSearch (一个 Elasticsearch 的开源分支) 是不二之选。其强大的倒排索引和聚合能力,以及对向量搜索的支持,完美匹配我们的需求。

  4. 消费与查询服务: 我们需要一个轻量级、高性能的异步框架来构建消费者和 API 服务。Ktor 基于 Kotlin Coroutines,提供了出色的并发性能和简洁的 DSL,非常适合构建 IO 密集型的网络应用。

  5. 部署与运维: 整个系统将运行在 Kubernetes 上。为了实现声明式的、可追溯的部署,我们采用 GitOps 模式,使用 Argo CD 来同步 Git 仓库中的配置与集群状态。

整体架构如下:

graph TD
    subgraph "事件源 (Event Sources)"
        AppServer[应用服务]
        DataPipeline[离线数据管道]
    end

    subgraph "消息队列 (Message Queue)"
        Pulsar(Pulsar Topic: user_events)
    end

    subgraph "特征处理与写入服务 (Ktor)"
        id1(Feature Ingestion Service)
    end

    subgraph "混合存储 (Hybrid Storage)"
        TimescaleDB[(TimescaleDB
Hypertables)] OpenSearch[(OpenSearch
Indexes)] end subgraph "特征查询服务 (Ktor)" id2(Feature Query Service API) end subgraph "消费方 (Consumers)" MLModel[机器学习模型] end AppServer -- "原始行为事件" --> Pulsar DataPipeline -- "批量事件" --> Pulsar Pulsar -- "订阅事件" --> id1 id1 -- "写入时序特征" --> TimescaleDB id1 -- "写入搜索特征" --> OpenSearch MLModel -- "HTTP/gRPC 请求" --> id2 id2 -- "查询时序聚合" --> TimescaleDB id2 -- "查询复杂特征" --> OpenSearch

核心实现:Ktor 特征注入服务

这是整个管道的核心。它负责消费 Pulsar 的消息,解析事件,然后将数据分别写入 TimescaleDB 和 OpenSearch。

1. 项目设置与依赖

我们使用 Gradle 构建 Ktor 项目。关键依赖包括 Ktor 客户端/服务端库、Pulsar 客户端、Kotlinx Serialization、Exposed (Kotlin SQL 框架) 和 OpenSearch Java 客户端。

build.gradle.kts:

// ... plugins and repositories
dependencies {
    // Ktor Core
    implementation("io.ktor:ktor-server-core-jvm:$ktor_version")
    implementation("io.ktor:ktor-server-netty-jvm:$ktor_version")
    implementation("io.ktor:ktor-server-content-negotiation:$ktor_version")
    implementation("io.ktor:ktor-serialization-kotlinx-json:$ktor_version")

    // Pulsar Client
    implementation("org.apache.pulsar:pulsar-client:$pulsar_version")

    // Database: Exposed for TimescaleDB
    implementation("org.jetbrains.exposed:exposed-core:$exposed_version")
    implementation("org.jetbrains.exposed:exposed-dao:$exposed_version")
    implementation("org.jetbrains.exposed:exposed-jdbc:$exposed_version")
    implementation("org.postgresql:postgresql:42.5.0") // PG driver

    // OpenSearch Client
    implementation("org.opensearch.client:opensearch-java:$opensearch_version")
    implementation("com.fasterxml.jackson.core:jackson-databind:2.13.3") // Required by OpenSearch client

    // Logging
    implementation("ch.qos.logback:logback-classic:$logback_version")
    
    // ... test dependencies
}

2. Pulsar 消费者实现

我们需要一个健壮的、可容错的消费者。这里的关键是利用 Kotlin Coroutines 来实现高并发处理,同时确保正确的消息确认 (ack) 逻辑。

import io.ktor.server.config.*
import kotlinx.coroutines.*
import kotlinx.serialization.json.Json
import org.apache.pulsar.client.api.*
import org.slf4j.LoggerFactory
import java.io.Closeable
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

// 数据模型,与事件JSON结构对应
@kotlinx.serialization.Serializable
data class UserEvent(
    val userId: String,
    val eventType: String,
    val timestamp: Long,
    val properties: Map<String, String>
)

// 数据写入服务的接口
interface FeatureWriter {
    suspend fun write(event: UserEvent)
}

class PulsarEventConsumer(
    private val config: ApplicationConfig,
    private val featureWriter: FeatureWriter
) : Closeable {

    private val logger = LoggerFactory.getLogger(javaClass)
    private val pulsarClient: PulsarClient
    private val consumer: Consumer<ByteArray>

    // 使用一个固定大小的 Coroutine Dispatcher 来限制并发处理的消息数
    private val processingDispatcher = Executors.newFixedThreadPool(
        config.property("pulsar.processing.concurrency").getString().toInt()
    ).asCoroutineDispatcher()
    
    private val consumerScope = CoroutineScope(SupervisorJob() + processingDispatcher)

    init {
        val serviceUrl = config.property("pulsar.serviceUrl").getString()
        val topicName = config.property("pulsar.topicName").getString()
        val subscriptionName = config.property("pulsar.subscriptionName").getString()

        pulsarClient = PulsarClient.builder()
            .serviceUrl(serviceUrl)
            .build()

        consumer = pulsarClient.newConsumer()
            .topic(topicName)
            .subscriptionName(subscriptionName)
            .subscriptionType(SubscriptionType.Shared) // 多个 Pod 实例可以共享消费
            .ackTimeout(1, TimeUnit.MINUTES) // 适当增加ack超时,防止复杂处理导致重试
            .deadLetterPolicy(
                DeadLetterPolicy.builder()
                    .maxRedeliverCount(5) // 最多重试5次
                    .deadLetterTopic("${topicName}-dlq") // 发送到死信队列
                    .build()
            )
            .subscribe()
        
        logger.info("Pulsar consumer started for topic '$topicName' with subscription '$subscriptionName'")
    }

    fun start() {
        consumerScope.launch {
            while (isActive) {
                try {
                    // 异步接收消息
                    val message = consumer.receiveAsync().await()
                    
                    // 在独立的Job中处理,避免一个消息的处理失败阻塞整个循环
                    launch {
                        processMessage(message)
                    }
                } catch (e: PulsarClientException.AlreadyClosedException) {
                    logger.warn("Consumer is closed. Exiting receive loop.")
                    break
                } catch (e: Exception) {
                    logger.error("Error receiving message from Pulsar. Retrying...", e)
                    delay(1000) // 避免快速失败循环
                }
            }
        }
    }

    private suspend fun processMessage(message: Message<ByteArray>) {
        try {
            val event = Json.decodeFromString<UserEvent>(String(message.data))
            
            // 核心逻辑:调用写入服务
            featureWriter.write(event)

            // 处理成功,确认消息
            consumer.acknowledgeAsync(message.messageId).await()
            logger.debug("Successfully processed and acknowledged message: ${message.messageId}")

        } catch (e: Exception) {
            // 这里的坑:任何处理异常都必须捕获,否则协程会失败,可能导致消费停止
            logger.error("Failed to process message ${message.messageId}. It will be negatively acknowledged.", e)
            // 发送否定确认,Pulsar会根据策略重发或发送到DLQ
            consumer.negativeAcknowledge(message.messageId)
        }
    }

    override fun close() {
        logger.info("Closing Pulsar consumer...")
        consumerScope.cancel()
        consumer.close()
        pulsarClient.close()
        processingDispatcher.close()
        logger.info("Pulsar consumer closed.")
    }
}

// 使用 CompletableFuture.await() 扩展函数
suspend fun <T> java.util.concurrent.CompletableFuture<T>.await(): T =
    suspendCancellableCoroutine { cont ->
        whenComplete { result, exception ->
            if (exception == null) {
                cont.resume(result) {}
            } else {
                cont.resumeWithException(exception)
            }
        }
    }

这个消费者有几个关键点:

  • 并发控制: 使用固定大小的 CoroutineDispatcher (processingDispatcher) 来防止过多的消息同时处理,避免压垮下游数据库。
  • 容错: 设置了 DeadLetterPolicy。如果一个消息处理失败超过5次,它会被自动发送到死信队列,避免有毒消息阻塞整个管道。
  • 生命周期管理: 实现了 Closeable 接口,确保在应用关闭时能优雅地释放资源。
  • 异步化: receiveAsyncacknowledgeAsync 配合 await() 扩展函数,无缝地将 Pulsar 的 CompletableFuture API 融入协程世界。

3. 混合存储写入器

FeatureWriter 的实现需要同时写入 TimescaleDB 和 OpenSearch。这是一个挑战,因为它们不是一个原子操作。在真实项目中,这里可能需要引入 Saga 模式或至少保证幂等性来处理部分失败。

import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction
import org.opensearch.client.opensearch.OpenSearchAsyncClient
import org.opensearch.client.opensearch.core.IndexRequest
import java.time.Instant
import java.time.ZoneOffset

class HybridStorageFeatureWriter(
    private val db: Database,
    private val osClient: OpenSearchAsyncClient
) : FeatureWriter {

    private val logger = LoggerFactory.getLogger(javaClass)
    private val indexName = "user-features"

    // Exposed Table Definition for TimescaleDB
    object UserEventsTable : Table("user_events") {
        val timestamp = long("time").index()
        val userId = varchar("user_id", 128)
        val eventType = varchar("event_type", 64)
        val properties = text("properties") // Store as JSONB in production
    }

    init {
        // 在生产环境中,表结构应该由迁移工具管理 (e.g., Flyway, Liquibase)
        // 这里只是为了演示
        TransactionManager.manager.defaultIsolationLevel = java.sql.Connection.TRANSACTION_SERIALIZABLE
        transaction(db) {
            SchemaUtils.create(UserEventsTable)
            // 最关键的一步:将普通表转换为超表 (Hypertable)
            exec("SELECT create_hypertable('user_events', 'time', if_not_exists => TRUE, chunk_time_interval => 86400000);") // 1 day interval
        }
    }

    override suspend fun write(event: UserEvent) {
        // 使用 withContext 确保数据库和OS客户端在正确的IO线程池中执行
        withContext(Dispatchers.IO) {
            // 写入操作不是原子的,需要考虑补偿逻辑
            // 1. 写入 OpenSearch
            writeToOpenSearch(event)
            // 2. 写入 TimescaleDB
            writeToTimescaleDB(event)
        }
    }

    private suspend fun writeToOpenSearch(event: UserEvent) {
        val featureDoc = mapOf(
            "user_id" to event.userId,
            "event_type" to event.eventType,
            "@timestamp" to Instant.ofEpochMilli(event.timestamp).toString(),
            "properties" to event.properties,
            // 假设我们在这里生成了一个向量特征
            "feature_vector" to generateDummyVector() 
        )

        val request = IndexRequest.Builder<Map<String, Any>>()
            .index(indexName)
            .id("${event.userId}_${event.timestamp}") // 构建幂等ID
            .document(featureDoc)
            .build()
        
        try {
            val response = osClient.index(request).await()
            logger.debug("Indexed doc ${response.id()} to OpenSearch. Result: ${response.result()}")
        } catch (e: Exception) {
            logger.error("Failed to index document to OpenSearch for user ${event.userId}", e)
            // 这里的坑:写入OpenSearch失败,但可能已经写入TimescaleDB,或反之。
            // 生产系统需要一个补偿任务或重试机制来修复这种不一致。
            throw e // 向上抛出异常,让消费者 NACK 消息
        }
    }

    private suspend fun writeToTimescaleDB(event: UserEvent) {
        newSuspendedTransaction(db = db) {
            UserEventsTable.insert {
                it[timestamp] = event.timestamp
                it[userId] = event.userId
                it[eventType] = event.eventType
                it[properties] = Json.encodeToString(event.properties)
            }
        }
    }
    
    private fun generateDummyVector(): FloatArray = FloatArray(128) { Math.random().toFloat() }
}

这里的关键权衡:

  • TimescaleDB 设置: create_hypertable 是 TimescaleDB 的核心。它会自动根据 time 列将大表在物理上切分成小块(chunks),使得时间范围查询和数据维护(如删除旧数据)极为高效。
  • OpenSearch 映射: (未在代码中展示) 在生产中,你需要为 user-features索引定义一个明确的 mapping,特别是将 feature_vector 字段类型设置为 knn_vector 以启用高效的向量搜索。
  • 数据一致性: 代码中明确指出了写入非原子的风险。一个常见的错误是忽略这种部分失败,导致数据源不一致。我们的策略是,任何一步失败都向上抛出异常,让 Pulsar 消费者 negativeAcknowledge 消息,触发重试。结合 OpenSearch 文档的幂等ID,重试可以安全地覆盖旧数据。

部署:基于 Argo CD 的 GitOps 流程

代码写完后,我们需要可靠地将其部署到 Kubernetes。我们不直接使用 kubectl apply,而是将所有 Kubernetes 清单文件(Deployment, Service, ConfigMap 等)存储在一个 Git 仓库中。

Git 仓库结构示例:

/apps
  /feature-ingestion-service
    /base
      kustomization.yaml
      deployment.yaml
      service.yaml
      configmap.yaml
    /overlays
      /staging
        kustomization.yaml
        configmap-patch.yaml
        deployment-patch.yaml
      /production
        kustomization.yaml
        configmap-patch.yaml
        deployment-patch.yaml

configmap.yaml (base):

apiVersion: v1
kind: ConfigMap
metadata:
  name: feature-ingestion-config
data:
  # 使用 application.conf 格式
  application.conf: |
    pulsar {
      serviceUrl = "pulsar://pulsar.default.svc.cluster.local:6650"
      topicName = "persistent://public/default/user_events"
      subscriptionName = "feature-ingestion-subscription"
      processing.concurrency = 16
    }
    database {
      url = "jdbc:postgresql://timescaledb.default.svc.cluster.local:5432/features"
      user = "user"
      # 密码等敏感信息应使用 Kubernetes Secrets
    }
    opensearch {
      host = "opensearch-cluster-master.default.svc.cluster.local"
      port = 9200
    }

deployment-patch.yaml (production overlay):

apiVersion: apps/v1
kind: Deployment
metadata:
  name: feature-ingestion-service
spec:
  replicas: 5 # 生产环境增加副本数
  template:
    spec:
      containers:
      - name: service
        resources:
          requests:
            cpu: "1"
            memory: "2Gi"
          limits:
            cpu: "2"
            memory: "2Gi"

Argo CD Application 定义:
然后,我们在 Argo CD 中创建一个 Application CRD,指向这个 Git 仓库。

apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
  name: feature-ingestion-prod
  namespace: argocd
spec:
  project: default
  source:
    repoURL: 'https://github.com/your-org/infra-repo.git'
    path: apps/feature-ingestion-service/overlays/production
    targetRevision: HEAD
  destination:
    server: 'https://kubernetes.default.svc'
    namespace: production
  syncPolicy:
    automated:
      prune: true
      selfHeal: true
    syncOptions:
    - CreateNamespace=true

这个流程的好处是:

  1. 版本控制: 基础设施的每一次变更都有 Git commit 记录,可追溯、可审计。
  2. 自动化与一致性: Argo CD 会持续监控 Git 仓库。一旦有新的 commit push 到 HEAD,它会自动将变更同步到集群,确保集群状态与 Git 中定义的期望状态一致。
  3. 安全性: 开发人员无需直接访问 Kubernetes API Server,所有变更通过 Pull Request 进行审查。

局限性与未来迭代路径

这个架构解决了我们最初的查询性能问题,但它并非银弹。

  1. 数据一致性挑战: 虽然重试和幂等性写入缓解了部分问题,但在极端情况下(例如,一个服务长时间宕机后恢复),仍然可能出现数据不一致。一个更健壮的方案是引入一个协调服务或使用 Debezium 从 TimescaleDB 进行 CDC,将数据同步到 OpenSearch,从而建立单一数据源真相。

  2. 特征回填 (Backfilling): 这个事件驱动的管道非常适合处理实时数据,但对于历史数据的回填(例如,上线一个新特征,需要用过去一年的数据计算它)并不高效。通常需要一个并行的、基于批处理的路径(如 Spark Job)来直接读写数据存储。

  3. Schema 演进:UserEvent 的结构发生变化时,需要小心地管理 Pulsar Topic 的 Schema、数据库表的迁移和 OpenSearch mapping 的更新,以确保向后兼容。

未来的优化方向可能包括:为 Ktor 查询服务增加一个缓存层(如 Redis)来处理热点用户特征的访问;引入 KEDA (Kubernetes-based Event-Driven Autoscaling) 来根据 Pulsar Topic 的积压消息数自动伸缩消费者 Pod 的数量,从而更高效地应对流量高峰。


  目录