基于 Pulsar C# 与 Cilium 构建服务于 Kubeflow 的实时特征流处理架构


定义问题:从批处理到实时推理的架构鸿沟

项目背景很明确:我们需要将核心推荐系统的模型推理从每日批处理迁移到实时在线服务。当前的架构依赖 T+1 的 Spark 作业生成特征,这导致用户行为无法在24小时内影响推荐结果,业务增长已触及瓶颈。新的目标是,用户行为事件必须在100毫秒内转化为可用的特征,并被在线推理服务消费。整个数据管道的 p99 延迟必须控制在50毫秒以内,且系统需具备水平扩展能力以应对峰值十万级 QPS 的事件流。

这是一个典型的、对延迟和吞吐量有严苛要求的 MLOps 场景。问题不在于“做什么”,而在于“如何构建”一个稳定、高性能且在 Kubernetes 环境下易于运维的特征工程管道。

业界最常见的方案自然是第一个进入视线的。

  • 消息队列: Apache Kafka。毋庸置疑的王者,生态系统成熟,社区庞大。
  • 流处理: Flink 或纯 Python 服务 (如 Faust)。Flink 功能强大,但对于我们相对简单的“事件转换”场景来说,可能过于重。纯 Python 服务开发速度快,与 ML 生态无缝衔接。
  • 服务网格: Istio。提供服务间的流量管理、安全性和可观测性,是 Kubernetes 服务治理的事实标准之一。

这个方案看似无懈可击,但深入评估后,我们发现了几个与我们长期目标相悖的摩擦点:

  1. Kafka 的运维复杂度: 在自建 Kubernetes 环境中,管理 Zookeeper (尽管 KRaft 模式正在普及) 依然是一个负担。更关键的是,在流量剧增导致需要对 Topic 进行分区扩容时,Kafka 的分区再平衡(Rebalancing)过程对消费者组的“Stop-the-World”影响是生产环境中的一个已知痛点,可能导致数据处理延迟的瞬间飙升。
  2. Python 的性能瓶颈: 虽然 Python 在 ML 领域占主导地位,但对于需要极致低延迟的 CPU 密集型特征转换任务,全局解释器锁 (GIL) 的存在始终是一个绕不开的话题。即使使用多进程模型,跨进程通信的开销和内存占用也会成为新的瓶颈。
  3. Istio 的性能损耗: Istio 的 Sidecar 模型通过在每个 Pod 中注入一个代理容器来拦截所有流量。这种模式虽然功能强大,但也带来了不可忽视的性能开销:增加了额外的网络跳数,每个请求的延迟都会增加几个毫秒;同时,每个 Pod 都需要为 Envoy 代理分配额外的 CPU 和内存资源,在大规模部署时这是一笔巨大的成本。

对于一个追求极致低延迟的数据平面来说,方案A中每个组件引入的“微小”开销,累加起来足以让我们无法达成50ms的SLO目标。

方案B:为性能与运维而生的备选路径 (Pulsar + C#/.NET + Cilium)

我们决定探索一条不那么“主流”但可能更适合我们特定需求的路径。

  • 消息队列: Apache Pulsar。

    • 架构优势: Pulsar 的计算(Broker)与存储(BookKeeper)分离的架构是其核心优势。Broker 是无状态的,可以快速扩缩容。数据以分片(Segment)形式存储在 BookKeeper 中,添加分区不会像 Kafka 那样触发大规模的数据迁移和消费者停顿。这对我们需要频繁调整吞吐能力的场景至关重要。
    • 内置功能: 内置的多租户和分层存储功能非常吸引人。分层存储可以将旧数据无缝卸载到对象存储(如 S3),这对于模型需要回溯历史特征进行再训练的场景来说,极大地简化了数据生命周期管理,无需再维护一套独立的 ETL 流程将数据从消息队列导入数据湖。
  • 流处理: C# / .NET。

    • 性能: 这是一个在 ML 领域反直觉的选择,但却是经过深思熟虑的。现代 .NET (6/7/8) 的性能已经非常出色。JIT 编译器、高效的垃圾回收器以及对 Span<T>ValueTask 等底层优化的支持,使其在网络 IO 和计算密集型任务上能达到与 Go 或 Java 相当的性能水平,远超 Python。对于我们的特征转换逻辑——涉及反序列化、数值计算和字符串处理——.NET 是一个性能强劲的选项。
    • 团队技能: 我们的核心平台团队有深厚的 .NET 背景,这意味着我们可以更快地构建高质量、可维护的服务,并应用成熟的 DevOps 实践。
  • 网络与安全: Cilium。

    • eBPF 的革命: Cilium 基于 eBPF 技术,将网络、可观测性和安全能力直接植入 Linux 内核。与 Istio 的 Sidecar 模型不同,Cilium 没有用户空间的代理,数据包路径更短,延迟更低。对于我们的数据密集型管道,这种“零代理”模式带来的性能提升是实实在在的。
    • 身份感知安全: Cilium 的网络策略是基于服务身份(如 Kubernetes Service Account)而非 IP 地址。这使得安全策略的定义更具语义化且更安全,不受 Pod IP 变化的影响。

最终决策与架构概览

权衡之下,我们选择了方案 B。决策的核心逻辑是:在数据平面上,对每一个毫秒的延迟和每一分钱的资源成本都进行优化。 Pulsar 解决了 Kafka 在弹性伸缩和数据回溯方面的运维痛点;.NET 提供了满足低延迟要求的高性能计算能力;Cilium 则以最低的性能损耗提供了必要的网络安全与隔离。

最终的架构如下所示:

graph TD
    subgraph "客户端 / 事件源"
        A[Web/Mobile Apps]
    end

    subgraph "数据管道 (Pulsar on Kubernetes)"
        A -- gRPC/HTTP --> B(API Gateway)
        B -- "Raw Event (JSON/Protobuf)" --> C{pulsar://.../tenant/namespace/raw-events}
        C -- "Exclusive Subscription" --> D[C# Feature Engineering Service]
        D -- "Enriched Feature (Avro)" --> E{pulsar://.../tenant/namespace/enriched-features}
    end

    subgraph "ML 推理 (Kubeflow on Kubernetes)"
        F[KFServing InferenceService]
        E -- "Shared Subscription" --> F
    end

    subgraph "网络层 (Cilium CNI)"
        G((Linux Kernel with eBPF))
        D <--> G
        F <--> G
        C <--> G
        E <--> G
    end

    style D fill:#5b4282,stroke:#333,stroke-width:2px,color:#fff
    style F fill:#3c78d8,stroke:#333,stroke-width:2px,color:#fff

核心实现细节

1. 数据契约:使用 Avro 定义特征 Schema

我们选择 Avro 作为内部消息格式,因为它支持 Schema 演进,这对于需要频繁迭代特征的 ML 系统至关重要。Pulsar 内置了 Schema 注册表,可以很好地支持这一点。

enriched-feature.avsc:

{
    "namespace": "com.mycompany.features",
    "type": "record",
    "name": "UserInteractionFeature",
    "fields": [
        {"name": "userId", "type": "string"},
        {"name": "itemId", "type": "string"},
        {"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"},
        {"name": "eventType", "type": "string"},
        {"name": "userRecentClicks_1h", "type": "int"},
        {"name": "itemPopularity_24h", "type": "double"},
        {"name": "isFirstTimeInteraction", "type": "boolean"},
        {"name": "sessionFeatures", "type": {
            "type": "map",
            "values": "float"
        }}
    ]
}

2. C# 特征工程服务

这是管道的核心。服务消费原始事件,可能会调用外部服务(如 Redis)进行状态扩充,然后计算出新特征,最后将结构化的 Avro 消息发送到下一个 Pulsar Topic。

Dockerfile:

# Use the official .NET 8 SDK image for building
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src

# Copy csproj and restore dependencies
COPY *.csproj .
RUN dotnet restore

# Copy the remaining source code and build the application
COPY . .
RUN dotnet publish -c Release -o /app/publish --no-restore

# Use the official .NET 8 runtime image for the final stage
FROM mcr.microsoft.com/dotnet/aspnet:8.0
WORKDIR /app
COPY --from=build /app/publish .
ENTRYPOINT ["dotnet", "FeatureProcessor.dll"]

Pulsar 客户端与处理逻辑 (FeatureProcessingWorker.cs):

using DotPulsar;
using DotPulsar.Abstractions;
using DotPulsar.Extensions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Text.Json; // Assuming raw event is JSON

// Placeholder for our Avro-generated class
public record UserInteractionFeature { /* ... fields ... */ }
public record RawEvent { public string UserId { get; set; } /* ... */ }

public class FeatureProcessingWorker : BackgroundService
{
    private readonly ILogger<FeatureProcessingWorker> _logger;
    private readonly IPulsarClient _pulsarClient;

    // Topics are injected via configuration for better management
    private const string InputTopic = "persistent://public/default/raw-events";
    private const string OutputTopic = "persistent://public/default/enriched-features";
    private const string SubscriptionName = "feature-engineering-subscription";

    public FeatureProcessingWorker(ILogger<FeatureProcessingWorker> logger, IPulsarClient pulsarClient)
    {
        _logger = logger;
        _pulsarClient = pulsarClient;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // Define consumer with an exclusive subscription for ordered processing per partition
        // Failover subscription type could also be used for high availability.
        await using var consumer = _pulsarClient.NewConsumer(Schema.String)
            .Topic(InputTopic)
            .SubscriptionName(SubscriptionName)
            .SubscriptionType(SubscriptionType.Exclusive) 
            .Create();
            
        // Producer for the enriched features using Avro schema
        // The schema is automatically registered by the client
        await using var producer = _pulsarClient.NewProducer(Schema.Avro<UserInteractionFeature>())
            .Topic(OutputTopic)
            .Create();

        _logger.LogInformation("Worker started. Listening on topic {Topic}", InputTopic);

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                // Wait for the next message
                var message = await consumer.Receive(stoppingToken);
                
                // 1. De-serialization and validation
                var rawEvent = JsonSerializer.Deserialize<RawEvent>(message.Value());
                if (rawEvent is null) 
                {
                    _logger.LogWarning("Failed to deserialize message. MessageId: {MessageId}. Acknowledging to skip.", message.MessageId);
                    await consumer.Acknowledge(message, stoppingToken);
                    continue;
                }

                // 2. Core Feature Engineering Logic
                // This is where real business logic lives. It might involve:
                // - Looking up data in a fast cache like Redis or an in-memory store.
                // - Performing calculations.
                // - Aggregating windowed data.
                var enrichedFeature = await ProcessEvent(rawEvent, stoppingToken);

                // 3. Produce to output topic
                var metadata = new MessageMetadata { Key = enrichedFeature.UserId };
                var messageId = await producer.Send(metadata, enrichedFeature, stoppingToken);

                // 4. Acknowledge the original message only after successful processing and producing.
                // This ensures at-least-once processing semantics.
                await consumer.Acknowledge(message, stoppingToken);

                _logger.LogInformation("Processed event for UserId {UserId}, produced new message {MessageId}", enrichedFeature.UserId, messageId);
            }
            catch (OperationCanceledException)
            {
                // Expected when the service is shutting down
                _logger.LogInformation("Worker is stopping.");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "An unhandled exception occurred during message processing. The worker will attempt to reconnect.");
                // In a real scenario, we might use a dead-letter queue.
                // The consumer's automatic reconnection logic will handle Pulsar connection issues.
                await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); 
            }
        }
    }

    private async Task<UserInteractionFeature> ProcessEvent(RawEvent rawEvent, CancellationToken token)
    {
        // Simulating async work like fetching data from another service/cache
        await Task.Delay(5, token); // Simulate 5ms processing time

        // Example logic
        var feature = new UserInteractionFeature
        {
            UserId = rawEvent.UserId,
            // ... more complex feature calculation
            UserRecentClicks_1h = 10, // Fetched from a cache
            ItemPopularity_24h = 0.85, // Fetched from a cache
            IsFirstTimeInteraction = false,
        };
        return feature;
    }
}

3. Kubernetes 部署与 Cilium 网络策略

部署服务到 Kubernetes 是标准操作,但关键在于如何使用 Cilium 来保护它。

deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: feature-processor
  namespace: feature-pipeline
spec:
  replicas: 3
  selector:
    matchLabels:
      app: feature-processor
  template:
    metadata:
      labels:
        app: feature-processor
        # This identity label is crucial for Cilium policies
        team: data-science 
    spec:
      containers:
      - name: processor
        image: my-registry/feature-processor:1.0.0
        # Resource requests/limits are critical for production
        resources:
          requests:
            cpu: "500m"
            memory: "512Mi"
          limits:
            cpu: "1"
            memory: "1Gi"
        env:
        - name: PULSAR_SERVICE_URL
          value: "pulsar://pulsar-proxy.pulsar.svc.cluster.local:6650"

cilium-network-policy.yaml:

这是体现 Cilium 优势的地方。我们定义了一个策略,只允许 feature-processor Pods 访问 Pulsar Proxy,并只允许来自 Kubeflow 命名空间的 Pods 访问它(假设 Kubeflow 推理服务是消费者,虽然在我们的模型里是Pulsar->KF)。我们通过标签而不是IP来定义规则。

apiVersion: "cilium.io/v2"
kind: CiliumNetworkPolicy
metadata:
  name: feature-processor-policy
  namespace: feature-pipeline
spec:
  endpointSelector:
    matchLabels:
      app: feature-processor
  
  # Ingress: Allow traffic only from Pulsar brokers/proxies.
  # This is a simplified example. In a real Pulsar setup, brokers and proxies have specific labels.
  ingress:
  - fromEndpoints:
    - matchLabels:
        # Assuming pulsar proxy pods have this label
        app: pulsar-proxy
    toPorts:
    - ports:
      # If the service had an API, we would open its port here.
      # For a pure consumer/producer, ingress might not be needed from other apps.
      # But health checks from kubelet are needed.
      - port: "80"
        protocol: TCP

  # Egress: The most critical part.
  # Allow traffic only to Pulsar and potentially a Redis cache.
  egress:
  - toEndpoints:
    # Rule 1: Allow communication with Pulsar brokers
    - matchLabels:
        "k8s:io.kubernetes.pod.namespace": pulsar
        "k8s:app.kubernetes.io/name": pulsar
    toPorts:
    - ports:
      - port: "6650" # Pulsar binary protocol
        protocol: TCP
      - port: "8080" # Pulsar HTTP admin/lookup
        protocol: TCP
  # Rule 2: Allow DNS resolution from kube-dns
  - toEndpoints:
    - matchLabels:
        "k8s:io.kubernetes.pod.namespace": kube-system
        "k8s:k8s-app": kube-dns
    toPorts:
    - ports:
      - port: "53"
        protocol: UDP
      rules:
        dns:
        - matchPattern: "*"

这个策略实现了零信任网络的一个基本原则:默认拒绝所有流量,只明确允许必要的通信路径。这一切都在内核层面通过eBPF高效完成,没有任何Sidecar的开销。

架构的局限性与未来展望

这套架构并非银弹。最显著的局限性在于,我们选择了在 C# 生态中进行特征工程,这意味着我们无法像在 Python 中那样方便地使用 scikit-learn, pandasnumpy 等库。任何复杂的特征逻辑都需要我们自己实现或寻找 .NET 等效库,这无疑增加了开发成本。对于需要复杂预处理步骤(如文本向量化)的模型,这种技术选型可能不适用。

其次,虽然 Cilium 性能优越,但 eBPF 本身是一个较新的技术领域,对团队的技能要求更高。排查内核层面的网络问题比排查 Envoy 代理的日志要困难,需要对底层网络和 Linux 内核有更深入的理解。

未来的迭代方向是明确的。首先,可以引入一个真正的 Feature Store 解决方案(如 Feast),将我们的 C# 服务作为其数据源之一。这能提供特征的版本化、发现和离线/在线一致性等高级功能。其次,可以探索使用 Pulsar Functions 来承载更简单的、无状态的特征转换逻辑,进一步减少维护一个完整微服务的开销。对于某些计算密集的 UDF(User-Defined Function),甚至可以探索将其编译为 WebAssembly 模块,由 .NET 服务动态加载执行,以实现更好的性能和安全隔离。


  目录