基于 TypeScript gRPC 与 ScyllaDB 构建服务于 Python NumPy 的高性能特征存储架构


处理实时机器学习特征的挑战在于其双重性:一方面需要能够支撑海量事件写入的吞吐能力,另一方面则要求在模型进行在线推理时,能够以个位数毫秒的延迟提供最新的特征向量。当团队的技术栈呈现异构化时——例如,核心API团队精通TypeScript与Node.js生态,而算法团队则完全依赖Python与NumPy——这个挑战就演变成了复杂的架构权衡问题。

我们的目标是构建一个系统,它不仅要满足上述严苛的性能指标,还必须无缝地桥接TypeScript和Python这两个世界,让各自的优势得以最大化发挥。

定义复杂技术问题:异构技术栈下的实时特征困境

一个典型的场景是:用户行为事件(点击、浏览、购买)以每秒数十万次的频率涌入系统,我们需要基于这些事件实时计算出用户的特征向量,例如“最近5分钟点击次数”、“过去1小时内购买商品均价”等。

  • 写入路径: 高并发、高吞吐的事件流摄入。
  • 读取路径: 对特定实体(如用户ID)的特征向量进行低延迟(p99 < 10ms)的点查询。
  • 计算核心: 涉及时间窗口、滚动聚合等数值计算,这是NumPy的绝对强项。
  • 服务接口: 必须融入现有的TypeScript微服务体系,由Node.js提供API网关。

方案A:纯Python技术栈的优劣分析

最直接的方案是完全拥抱Python生态。使用像FastAPI或gRPC-python这样的框架构建服务,内部直接调用NumPy进行计算,并使用Python驱动连接ScyllaDB。

优势:

  1. 开发同质化: 无需跨语言协作,数据在内存中无需序列化/反序列化即可在服务层和计算层之间流转。
  2. 生态成熟: Python在数据科学和数值计算领域的库支持无与伦比,开发效率高。
  3. 部署简单: 单一技术栈意味着更简单的CI/CD流水线和容器化策略。

劣势:

  1. I/O瓶颈: 尽管有asyncio,但Python的全局解释器锁(GIL)在处理大规模并发I/O密集型任务时,相比Node.js的事件循环模型仍然存在理论上的劣势。对于一个需要同时处理成千上万个长连接的API网关来说,这可能成为性能天花板。
  2. 团队壁垒: 这意味着要求TypeScript团队学习并维护一个Python服务,或者将API网关的职责也交给Python团队,这都可能违背组织架构和团队专业分工的初衷。

方案B:纯TypeScript技术栈的优劣分析

另一个极端是尝试在Node.js生态中完成所有事情。使用node-scylladb驱动,并寻找JavaScript的数值计算库(如ndarraynumjs)来替代NumPy。

优势:

  1. 极致I/O性能: Node.js的非阻塞事件循环模型非常适合构建高并发的API网关和数据摄入服务。
  2. 技术栈统一: 完美融入现有微服务体系,便于维护和监控。

劣势:

  1. 计算能力短板: 这是该方案的致命缺陷。JavaScript的数值计算生态系统远不及Python成熟和高效。无论是库的功能丰富度、社区支持,还是底层计算性能(尤其是在涉及复杂矩阵运算时),都与NumPy/SciPy相去甚远。强行使用JS库实现复杂的特征工程,无异于用锤子拧螺丝,不仅开发效率低下,运行性能也堪忧。

最终选择:基于gRPC的TypeScript-Python混合架构

权衡之后,我们决定采用一种“集两家之长”的混合(Polyglot)架构。这种架构的核心思想是让每个组件都做它最擅长的事情。

  • TypeScript (Node.js) 服务: 作为系统的入口和API网关。它负责处理所有外部请求、协议转换、身份验证,以及与ScyllaDB进行简单的键值存取。它发挥了Node.js在高并发I/O上的全部优势。
  • Python (NumPy) 服务: 作为后端的专用计算引擎。它不直接对外暴露,而是通过高性能的RPC协议接收来自TypeScript服务的计算任务。所有复杂的数值计算、特征衍生都在这里完成。
  • ScyllaDB: 作为统一的存储后端,为两个服务提供高吞吐、低延迟的数据持久化。
  • gRPC & Protocol Buffers: 作为连接TypeScript和Python两个世界的桥梁。它提供了强类型的服务定义、高效的二进制序列化,以及跨语言的代码生成能力,是构建高性能多语言微服务的黄金标准。

最酷的是,这种架构不仅解决了技术栈的冲突,还通过明确的职责分离,让系统变得异常清晰和可扩展。

graph TD
    subgraph "客户端"
        A[Client/Upstream Service]
    end

    subgraph "TypeScript API网关 (Node.js)"
        B(gRPC/HTTP Server)
        C{Request Routing}
        D[ScyllaDB Driver]
        E[gRPC Client]
    end

    subgraph "Python 计算服务"
        F(gRPC Server)
        G[NumPy Feature Logic]
        H[ScyllaDB Driver]
    end

    subgraph "数据存储层"
        I[(ScyllaDB Cluster)]
    end

    A -- "REST/gRPC Request" --> B
    B --> C
    C -- "简单读写 (e.g., GetRawEvent)" --> D
    C -- "复杂计算 (e.g., GetFeatureVector)" --> E
    D -- "CQL" --> I
    E -- "gRPC Call" --> F
    F --> G
    G -- "计算" --> H
    H -- "CQL (读写特征)" --> I

核心实现概览

1. ScyllaDB 数据模型设计

为了支撑快速读写,我们的表结构设计必须围绕查询模式进行。

// 使用一个专用的 keyspace
CREATE KEYSPACE IF NOT EXISTS feature_store
WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3 };

USE feature_store;

-- 存储原始事件流,用于后续的特征计算
-- entity_id 作为分区键,保证同一个实体的数据落在同一个节点上
-- event_time 作为聚簇键,保证事件按时间倒序排列,便于查询最近的N个事件
CREATE TABLE IF NOT EXISTS raw_events (
    entity_id text,
    event_time timestamp,
    event_type text,
    payload map<text, text>,
    PRIMARY KEY (entity_id, event_time)
) WITH CLUSTERING ORDER BY (event_time DESC);

-- 存储计算好的特征向量
-- 同样以 entity_id 作为分区键,实现快速点查
-- features 是一个冻结的 UDT,这能提升性能,因为它被当作一个不可变的blob处理
CREATE TYPE IF NOT EXISTS feature_value (
    value_double double,
    value_string text,
    value_bytes blob
);

CREATE TABLE IF NOT EXISTS feature_vectors (
    entity_id text PRIMARY KEY,
    features map<text, frozen<feature_value>>,
    last_updated timestamp
);

2. Protocol Buffers & gRPC 服务定义

feature_store.proto 文件是整个系统的契约,它定义了服务、方法和数据结构。

syntax = "proto3";

package featurestore;

import "google/protobuf/timestamp.proto";
import "google/protobuf/struct.proto";

// 定义特征服务
service FeatureStoreService {
  // 摄入一个原始事件
  rpc IngestEvent (IngestEventRequest) returns (IngestEventResponse);

  // 获取一个实体的特征向量
  rpc GetFeatureVector (GetFeatureVectorRequest) returns (GetFeatureVectorResponse);
}

// 事件摄入请求
message IngestEventRequest {
  string entity_id = 1;
  google.protobuf.Timestamp event_time = 2;
  string event_type = 3;
  map<string, string> payload = 4;
}

message IngestEventResponse {
  bool success = 1;
  string message = 2;
}

// 特征向量获取请求
message GetFeatureVectorRequest {
  string entity_id = 1;
  // 请求需要哪些特征
  repeated string feature_names = 2;
}

// 特征值,使用 oneof 优雅地处理不同类型
// 这是个很棒的实践,避免了为每种类型创建一个字段
message FeatureValue {
    oneof value {
        double double_value = 1;
        string string_value = 2;
        int64 int64_value = 3;
        bool bool_value = 4;
        // 对于 NumPy array 这种复杂结构,序列化成 bytes 是最高效的方式
        bytes bytes_value = 5;
    }
}

// 特征向量获取响应
message GetFeatureVectorResponse {
  string entity_id = 1;
  map<string, FeatureValue> features = 2;
  google.protobuf.Timestamp last_updated = 3;
}

3. Python gRPC 计算服务实现

这是系统的“大脑”,负责所有繁重的计算任务。

# feature_service.py
import grpc
import logging
from concurrent import futures
import numpy as np
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import SimpleStatement

# 导入生成的 gRPC 代码
import feature_store_pb2
import feature_store_pb2_grpc

# --- 配置 ---
# 在真实项目中,这些应该来自配置文件或环境变量
SCYLLA_HOSTS = ['127.0.0.1']
SCYLLA_KEYSPACE = 'feature_store'
SERVER_PORT = 50051
LOG_LEVEL = logging.INFO

# --- 日志设置 ---
logging.basicConfig(level=LOG_LEVEL, format='%(asctime)s - %(levelname)s - %(message)s')

class FeatureStoreServiceImpl(feature_store_pb2_grpc.FeatureStoreServiceServicer):
    """
    gRPC 服务的具体实现
    """
    def __init__(self, scylla_session):
        self.session = scylla_session
        # 预编译 CQL 语句是 ScyllaDB/Cassandra 的最佳实践
        self.get_recent_events_stmt = self.session.prepare(
            "SELECT payload FROM raw_events WHERE entity_id = ? LIMIT ?"
        )
        self.update_features_stmt = self.session.prepare(
            "UPDATE feature_vectors SET features = features + ?, last_updated = toTimestamp(now()) WHERE entity_id = ?"
        )
        self.get_features_stmt = self.session.prepare(
            "SELECT features, last_updated FROM feature_vectors WHERE entity_id = ?"
        )


    def _calculate_avg_purchase_amount(self, entity_id: str) -> float:
        """
        一个特征计算的例子:计算最近10次购买的平均金额
        """
        try:
            # 这里的业务逻辑是核心,NumPy 在此大放异彩
            rows = self.session.execute(self.get_recent_events_stmt, (entity_id, 100)) # 多取一些以过滤
            purchase_events = [
                float(row.payload.get('amount', '0.0')) 
                for row in rows if row.payload.get('type') == 'purchase'
            ]
            
            if not purchase_events:
                return 0.0
            
            # 使用 NumPy 进行高效计算
            amounts = np.array(purchase_events[:10]) # 只取最近10次购买
            avg_amount = np.mean(amounts)
            return float(avg_amount)
        except Exception as e:
            logging.error(f"Failed to calculate avg_purchase_amount for {entity_id}: {e}")
            return 0.0 # 在生产中需要更精细的错误处理和默认值策略

    def GetFeatureVector(self, request, context):
        """
        处理获取特征向量的请求
        """
        entity_id = request.entity_id
        logging.info(f"Received GetFeatureVector request for entity: {entity_id}")
        
        # 这是一个路由逻辑,根据请求的 feature_names 来决定执行哪些计算
        # 这彻底改变了工作方式,我们可以动态添加新的特征计算函数,而无需修改接口
        calculated_features = {}
        if "avg_purchase_amount_10" in request.feature_names:
            avg_amount = self._calculate_avg_purchase_amount(entity_id)
            calculated_features["avg_purchase_amount_10"] = feature_store_pb2.FeatureValue(double_value=avg_amount)

        # 可以在这里从 ScyllaDB 读取已经预计算好的特征
        # ...
        
        # 将新计算的特征更新回 ScyllaDB (可选,取决于业务是实时算还是读缓存)
        # self.session.execute(self.update_features_stmt, (calculated_features, entity_id))
        
        response = feature_store_pb2.GetFeatureVectorResponse(
            entity_id=entity_id,
            features=calculated_features
        )
        return response

    def IngestEvent(self, request, context):
        # 在这个架构中,事件摄入通常由 TypeScript 网关直接写入 ScyllaDB
        # Python 服务主要负责按需计算,所以这里可以只是一个空实现或用于触发异步计算
        logging.warning("IngestEvent called on Python service, but it's not its primary responsibility.")
        return feature_store_pb2.IngestEventResponse(success=True, message="Handled by computation service if needed.")


def serve():
    """
    启动 gRPC 服务器
    """
    # 初始化 ScyllaDB 连接
    # 在生产环境中,需要配置负载均衡策略、重试策略等
    cluster = Cluster(SCYLLA_HOSTS)
    session = cluster.connect(SCYLLA_KEYSPACE)
    logging.info(f"Connected to ScyllaDB cluster at {SCYLLA_HOSTS}")

    # 创建 gRPC 服务器
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    feature_store_pb2_grpc.add_FeatureStoreServiceServicer_to_server(
        FeatureStoreServiceImpl(session), server
    )
    server.add_insecure_port(f'[::]:{SERVER_PORT}')
    logging.info(f"Starting Python gRPC server on port {SERVER_PORT}")
    server.start()
    server.wait_for_termination()

if __name__ == '__main__':
    serve()

4. TypeScript gRPC API 网关实现

这是系统的“门面”,负责与外部世界打交道。

// index.ts
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
import { promisify } from 'util';
import { Cluster, auth, types } from 'cassandra-driver';
import { v4 as uuidv4 } from 'uuid';

// --- 配置 ---
const PROTO_PATH = './feature_store.proto';
const PYTHON_SERVICE_ADDR = 'localhost:50051';
const SERVER_PORT = 50050;
const SCYLLA_HOSTS = ['127.0.0.1'];
const SCYLLA_KEYSPACE = 'feature_store';

// --- 日志 ---
// 使用 pino 或 winston 等成熟的日志库
const logger = {
    info: (msg: string) => console.log(`[INFO] ${msg}`),
    error: (msg: string, err?: any) => console.error(`[ERROR] ${msg}`, err),
};

// --- 加载 Proto 和 gRPC 客户端 ---
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
    keepCase: true,
    longs: String,
    enums: String,
    defaults: true,
    oneofs: true,
});
const featureStoreProto = grpc.loadPackageDefinition(packageDefinition).featurestore as any;
const pythonServiceClient = new featureStoreProto.FeatureStoreService(
    PYTHON_SERVICE_ADDR,
    grpc.credentials.createInsecure()
);

// 将回调风格的 gRPC 调用转换为 Promise,这是现代 Node.js 开发的关键
const getFeatureVectorAsync = promisify(pythonServiceClient.GetFeatureVector).bind(pythonServiceClient);

// --- ScyllaDB 客户端 ---
const scyllaClient = new Cluster({
    contactPoints: SCYLLA_HOSTS,
    localDataCenter: 'datacenter1', // 根据你的配置修改
    keyspace: SCYLLA_KEYSPACE,
    // 生产环境需要配置健全的重试和负载均衡策略
    policies: {
        // ...
    },
});

// --- gRPC 服务实现 (TypeScript 部分) ---
const featureStoreServerImpl = {
    // 事件摄入由 TS 服务直接处理,发挥 Node.js 的 I/O 优势
    ingestEvent: async (call: grpc.ServerUnaryCall<any, any>, callback: grpc.sendUnaryMessage<any>) => {
        const req = call.request;
        logger.info(`Ingesting event for entity: ${req.entity_id}`);
        try {
            const query = 'INSERT INTO raw_events (entity_id, event_time, event_type, payload) VALUES (?, ?, ?, ?)';
            const params = [req.entity_id, req.event_time.toDate(), req.event_type, req.payload];
            await scyllaClient.execute(query, params, { prepare: true });
            callback(null, { success: true, message: 'Event ingested' });
        } catch (err) {
            logger.error('Failed to ingest event to ScyllaDB', err);
            callback({
                code: grpc.status.INTERNAL,
                message: 'Failed to write to database.',
            });
        }
    },

    // 获取特征向量的请求被转发到 Python 服务
    getFeatureVector: async (call: grpc.ServerUnaryCall<any, any>, callback: grpc.sendUnaryMessage<any>) => {
        const req = call.request;
        logger.info(`Forwarding GetFeatureVector request for entity: ${req.entity_id}`);
        try {
            // 这里的调用是整个架构的核心:TypeScript 服务作为代理,将计算任务委托给 Python
            const response = await getFeatureVectorAsync(req);
            callback(null, response);
        } catch (err: any) {
            logger.error('Error calling Python computation service', err);
            callback({
                code: grpc.status.INTERNAL,
                message: `Error from upstream computation service: ${err.details}`,
            });
        }
    },
};

// --- 启动服务器 ---
function main() {
    const server = new grpc.Server();
    server.addService(featureStoreProto.FeatureStoreService.service, featureStoreServerImpl);
    
    server.bindAsync(`0.0.0.0:${SERVER_PORT}`, grpc.ServerCredentials.createInsecure(), (err, port) => {
        if (err) {
            logger.error(`Server failed to start on port ${SERVER_PORT}`, err);
            return;
        }
        logger.info(`TypeScript gRPC Gateway started on port ${port}`);
        server.start();
    });

    process.on('SIGTERM', () => {
        logger.info('Shutting down server...');
        server.tryShutdown(() => {
            scyllaClient.shutdown();
        });
    });
}

main();

架构的扩展性与局限性

这个架构最迷人的地方在于它的扩展性。当需要添加新的特征计算逻辑时,我们只需要在Python服务中增加一个新的计算函数,并更新Protobuf契约(如果需要新的数据类型),TypeScript网关的代码几乎不需要改动。我们可以独立地扩展Python计算集群的规模以应对更复杂的计算任务,也可以独立扩展TypeScript网关的实例数以应对更高的API并发。

然而,这种架构并非没有代价。其主要的局限性在于引入了更高的运维复杂性。我们现在需要维护两个不同技术栈的服务,包括它们的构建、部署、监控和日志聚合。TypeScript和Python服务之间的gRPC调用引入了网络延迟,尽管在现代数据中心网络中这通常是亚毫秒级的,但对于某些极端低延迟的场景,这仍然是一个需要考虑的因素。此外,跨语言的调试和问题定位也比单体应用更具挑战性。这个架构的价值,只有在系统规模、团队结构和性能要求达到一定程度时,才能真正体现出来。


  目录