在AWS上构建服务于TensorFlow模型的实时时序特征存储架构


项目初期,时序预测模型在Jupyter Notebook里表现完美,验证集上的MAE和RMSE指标都无可挑剔。然而,当模型被部署为线上服务后,灾难降临了。线上模型的性能与离线评估结果存在巨大鸿沟,告警邮件塞满了收件箱。问题的根源很快被定位:训练-服务偏斜(Training-Serving Skew)。离线训练时,我们用Pandas对整个数据集进行窗口计算、特征衍生;而在线上服务中,为了满足低延迟要求,实时计算特征的逻辑由另一位工程师在API服务中匆忙实现。两套独立的特征工程代码,即使逻辑上意图一致,也因浮点数精度、边界处理、数据窗口的微小差异,产生了截然不同的特征值,最终导致了模型的线上“失忆”。

这个坑踩得并不冤枉,它是许多机器学习项目从实验走向生产的必经之路。解决方案也相对明确:我们需要一个统一的、中心化的系统来管理特征的生命周期,确保训练和推理使用完全一致的特征数据。这就是构建一个实时时序特征存储(Feature Store)的根本动机。

我们的目标是设计一个架构,它能从实时数据流中摄取原始时序数据,计算衍生特征,然后将这些特征同时写入一个低延迟的在线存储和一个高吞吐的离线存储。TensorFlow训练任务将从离线存储中拉取批量的、历史悠久的特征集;而在线预测服务则从在线存储中毫秒级检索最新的特征向量。整个过程,特征计算逻辑只存在于一个地方。

技术选型上,我们立足于AWS生态系统,因为它提供了一套完整的、解耦的托管服务,能让我们像搭乐高一样构建数据管道,而无需过多关注底层基础设施的运维。

graph TD
    subgraph "数据源 (e.g., IoT设备/交易系统)"
        A[原始时序数据生产者]
    end

    subgraph "AWS 实时管道"
        A -- JSON records --> B(AWS Kinesis Data Streams)
        B -- events --> C{AWS Lambda: Feature Engineering}
        C -- computed features --> D(AWS Kinesis Data Firehose)
        C -- computed features --> E(Amazon DynamoDB - Online Store)
        D -- batch write --> F(Amazon S3 - Offline Store)
    end

    subgraph "模型训练 (离线)"
        F -- historical features --> G[SageMaker Training Job / EC2]
        G -- tf.data.Dataset --> H(TensorFlow模型训练)
        H -- SavedModel --> I(S3 Model Artifacts)
    end

    subgraph "实时预测 (在线)"
        J[API Gateway] -- request --> K{AWS Lambda: Inference}
        K -- get latest features --> E
        K -- load model --> I
        K -- predict() --> L[TensorFlow模型推理]
        L -- prediction --> K
        K -- response --> J
    end

    style C fill:#f9f,stroke:#333,stroke-width:2px
    style E fill:#ccf,stroke:#333,stroke-width:2px
    style F fill:#cff,stroke:#333,stroke-width:2px

这套架构的核心是Lambda函数Feature Engineering,它消费Kinesis流中的原始数据,承担了所有的特征计算逻辑,并将结果兵分两路:一路写入DynamoDB作为在线存储,满足低延迟查询需求;另一路通过Firehose批量写入S3作为离线存储,用于模型训练和分析。

一、数据摄取与特征工程核心实现

假设我们正在处理一个金融交易场景,原始数据流包含每笔交易的价格和数量。

数据生产者 (producer.py)

这是一个模拟脚本,持续向Kinesis流发送原始交易数据。在真实项目中,这可能是你的交易引擎或者撮合系统。

import json
import boto3
import uuid
import random
import time
import os
import logging

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 从环境变量获取配置,这是生产实践
KINESIS_STREAM_NAME = os.environ.get("KINESIS_STREAM_NAME", "raw_trades_stream")
AWS_REGION = os.environ.get("AWS_REGION", "us-east-1")

try:
    kinesis_client = boto3.client('kinesis', region_name=AWS_REGION)
except Exception as e:
    logging.error(f"Failed to create Kinesis client: {e}")
    exit(1)

def send_trade_record(instrument_id: str):
    """
    生成并发送单条交易记录。
    """
    data = {
        'instrument_id': instrument_id,
        'trade_id': str(uuid.uuid4()),
        'price': round(random.uniform(100.0, 105.0), 4),
        'volume': random.randint(1, 100),
        'timestamp': int(time.time() * 1000) # 毫秒级时间戳
    }
    
    payload = json.dumps(data)
    partition_key = instrument_id

    try:
        response = kinesis_client.put_record(
            StreamName=KINESIS_STREAM_NAME,
            Data=payload.encode('utf-8'),
            PartitionKey=partition_key
        )
        logging.info(f"Sent record {data['trade_id']} for {instrument_id}. SequenceNumber: {response['SequenceNumber']}")
    except kinesis_client.exceptions.ResourceNotFoundException:
        logging.error(f"Kinesis stream '{KINESIS_STREAM_NAME}' not found.")
        # 在生产环境中,这里可能需要更复杂的重试或告警逻辑
        time.sleep(5) # 简单等待
    except Exception as e:
        logging.error(f"Error sending record to Kinesis: {e}")

if __name__ == "__main__":
    instruments = ["BTC-USD", "ETH-USD", "SOL-USD"]
    if not KINESIS_STREAM_NAME:
        logging.error("KINESIS_STREAM_NAME environment variable not set.")
        exit(1)

    while True:
        instrument = random.choice(instruments)
        send_trade_record(instrument)
        time.sleep(random.uniform(0.1, 0.5))

特征工程Lambda (feature_engineering_lambda/app.py)

这是整个系统的“心脏”。它必须是无状态的,但又要能计算有状态的窗口特征(如移动平均线)。这里的关键技巧是利用DynamoDB不仅作为在线特征的最终存储,也作为计算过程中的状态存储。

import json
import base64
import os
import boto3
import logging
from decimal import Decimal
from typing import List, Dict, Any

# 日志配置
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# 从环境变量加载配置
DYNAMODB_TABLE_NAME = os.environ.get("DYNAMODB_TABLE_NAME", "timeseries_feature_store")
FIREHOSE_STREAM_NAME = os.environ.get("FIREHOSE_STREAM_NAME", "features_to_s3_stream")
MOVING_AVG_WINDOW = int(os.environ.get("MOVING_AVG_WINDOW", 10))

# 初始化AWS客户端
# Lambda容器重用时,这些客户端可以被复用,提高性能
try:
    dynamodb_resource = boto3.resource('dynamodb')
    firehose_client = boto3.client('firehose')
    feature_table = dynamodb_resource.Table(DYNAMODB_TABLE_NAME)
except Exception as e:
    logger.error(f"Failed to initialize AWS clients: {e}")
    # 初始失败,后续调用会持续出错,这是一个致命错误
    raise e

def calculate_features(instrument_id: str, new_price: Decimal) -> Dict[str, Any]:
    """
    计算新特征。核心逻辑:从DynamoDB读取最近N个价格,计算移动平均。
    这是一个有状态的计算,状态被外部化存储在DynamoDB中。
    """
    # 1. 读取最近的价格历史用于计算
    # 在真实项目中,这里的查询性能至关重要
    try:
        response = feature_table.query(
            KeyConditionExpression=boto3.dynamodb.conditions.Key('instrument_id').eq(instrument_id),
            ScanIndexForward=False, # 按时间戳降序排序
            Limit=MOVING_AVG_WINDOW - 1
        )
        # 从查询结果中提取价格,并转换为Decimal类型
        # 老数据可能不存在'price'字段,需要做健壮性处理
        price_history = [item['price'] for item in response.get('Items', []) if 'price' in item]
    except Exception as e:
        logger.error(f"Error querying DynamoDB for {instrument_id}: {e}")
        # 查询失败,无法计算特征,返回空字典或抛出异常
        return {}

    # 2. 合并新价格并计算SMA
    all_prices = [new_price] + price_history
    
    # 一个常见的错误是,当历史数据不足窗口大小时,如何处理。
    # 这里的策略是,有多少数据就用多少数据计算,而不是等到窗口填满。
    if not all_prices:
        return {} # 无法计算
        
    sma = sum(all_prices) / len(all_prices)

    # 3. 可以在这里添加更多特征计算,如RSI, MACD等
    # ...

    return {
        'sma_10': sma
    }

def handler(event, context):
    """
    Lambda主处理函数。
    """
    records_to_firehose = []

    for record in event['Records']:
        try:
            # Kinesis record data is base64 encoded
            payload_str = base64.b64decode(record['kinesis']['data']).decode('utf-8')
            trade_data = json.loads(payload_str)

            instrument_id = trade_data['instrument_id']
            timestamp = trade_data['timestamp']
            price = Decimal(str(trade_data['price'])) # 使用Decimal保证精度

            # 1. 计算新特征
            features = calculate_features(instrument_id, price)
            if not features:
                logger.warning(f"Failed to calculate features for {instrument_id}, skipping.")
                continue

            # 2. 构造要写入存储的完整记录
            feature_record = {
                'instrument_id': instrument_id,
                'timestamp': timestamp,
                'price': price, # 保存原始价格
                'volume': trade_data['volume'],
                **features # 合并计算出的特征
            }

            # 3. 写入DynamoDB (在线存储)
            # 使用Decimal类型写入DynamoDB
            feature_table.put_item(Item=json.loads(json.dumps(feature_record), parse_float=Decimal))
            
            # 4. 准备写入Firehose (离线存储)
            # Firehose要求数据以\n结尾
            # 这里的 schema (instrument_id, timestamp, price, sma_10) 必须和 S3 上的 Athena/Glue 表 schema 保持一致
            firehose_payload = (json.dumps(feature_record, default=str) + '\n').encode('utf-8')
            records_to_firehose.append({'Data': firehose_payload})

        except json.JSONDecodeError as e:
            logger.error(f"Failed to decode JSON payload: {e}")
            continue # 跳过格式错误的数据
        except Exception as e:
            logger.error(f"Error processing record: {e}", exc_info=True)
            # 在生产环境中,应将失败的记录发送到死信队列(DLQ)
            # 这里我们选择继续处理下一条记录
            continue
    
    # 5. 批量写入Firehose
    if records_to_firehose:
        try:
            firehose_client.put_record_batch(
                DeliveryStreamName=FIREHOSE_STREAM_NAME,
                Records=records_to_firehose
            )
            logger.info(f"Successfully sent {len(records_to_firehose)} records to Firehose.")
        except Exception as e:
            logger.error(f"Failed to send batch to Firehose: {e}")
            # 如果批量发送失败,可能需要重试策略或将整个批次发送到DLQ
            # 这对于保证数据不丢失至关重要
            raise e

    return {'statusCode': 200, 'body': json.dumps('Processing complete')}

这个Lambda函数的设计体现了几个生产级的考量:

  • 配置外部化: 所有配置项(表名,流名,窗口大小)都通过环境变量传入,符合12-Factor App原则。
  • 错误处理: 对JSON解析、AWS API调用等都做了异常捕获。一个健壮的系统必须假设任何外部调用都可能失败。
  • 精度: 使用Decimal类型处理金融数据,避免了浮点数计算的精度陷阱。
  • 状态外部化: Lambda本身是无状态的,但通过读写DynamoDB实现了有状态的窗口计算,这是Serverless架构处理流式计算的常用模式。
  • 批量操作: 对Firehose的写入采用了put_record_batch,这比单条写入更高效,成本也更低。

二、模型训练:消费离线特征

当数据通过Firehose稳定地沉淀到S3后(通常配置为按天或按小时分区),我们就可以启动一个训练任务。这个任务的核心是,它只读取S3上已经计算好的特征,而不再执行任何特征工程逻辑。

import boto3
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np

# 假设S3数据路径为 s3://my-feature-store-bucket/features/YYYY/MM/DD/
S3_BUCKET = "my-feature-store-bucket"
S3_PREFIX = "features/" # 训练脚本会扫描这个前缀下的所有分区

def load_features_from_s3(bucket: str, prefix: str) -> pd.DataFrame:
    """
    从S3加载所有特征数据。
    在真实项目中,数据量巨大时,应使用AWS Glue, Spark on EMR, 或Dask来处理。
    这里为了演示,我们使用pandas简化处理。
    """
    s3 = boto3.client('s3')
    paginator = s3.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
    
    df_list = []
    for page in pages:
        for obj in page.get('Contents', []):
            if obj['Key'].endswith('.json.gz'): # Firehose输出的可能是压缩文件
                obj_path = f"s3://{bucket}/{obj['Key']}"
                df_list.append(pd.read_json(obj_path, lines=True, compression='gzip'))

    if not df_list:
        raise ValueError("No feature data found in S3 path.")
        
    full_df = pd.concat(df_list, ignore_index=True)
    # 按品种和时间排序,这是时序模型训练前的必要步骤
    full_df = full_df.sort_values(by=['instrument_id', 'timestamp']).reset_index(drop=True)
    return full_df

def create_tf_dataset(df: pd.DataFrame, target_instrument: str, sequence_length: int, batch_size: int):
    """
    为特定品种创建TensorFlow时间序列数据集。
    """
    instrument_df = df[df['instrument_id'] == target_instrument].copy()
    
    features = ['price', 'volume', 'sma_10']
    target = 'price' # 预测未来的价格

    # 特征标准化
    scaler = StandardScaler()
    instrument_df[features] = scaler.fit_transform(instrument_df[features])

    # 创建时序样本
    X, y = [], []
    data = instrument_df[features].values
    target_data = instrument_df[target].values

    for i in range(len(data) - sequence_length):
        X.append(data[i:(i + sequence_length)])
        y.append(target_data[i + sequence_length])

    X, y = np.array(X), np.array(y)
    
    # 创建tf.data.Dataset
    dataset = tf.data.Dataset.from_tensor_slices((X, y))
    dataset = dataset.shuffle(buffer_size=1024).batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset, scaler


# --- 主训练流程 ---
# 1. 加载特征
df = load_features_from_s3(S3_BUCKET, S3_PREFIX)

# 2. 为BTC-USD构建数据集
btc_dataset, btc_scaler = create_tf_dataset(df, 'BTC-USD', sequence_length=60, batch_size=32)

# 3. 定义并训练模型 (一个简单的LSTM)
model = tf.keras.models.Sequential([
    tf.keras.layers.LSTM(50, return_sequences=True, input_shape=(60, 3)),
    tf.keras.layers.LSTM(50),
    tf.keras.layers.Dense(1)
])
model.compile(optimizer='adam', loss='mae')

model.fit(btc_dataset, epochs=10)

# 4. 保存模型和Scaler
model.save('btc_price_predictor/1') # TensorFlow Serving格式
import joblib
joblib.dump(btc_scaler, 'btc_price_predictor/scaler.joblib')

# 上传到S3...
# boto3.client('s3').upload_file(...)

这段训练代码的关键点在于load_features_from_s3函数。它直接消费S3上的、由特征工程Lambda生成的“熟数据”,从而在源头上杜绝了训练-服务偏斜。

三、实时预测:消费在线特征

在线预测服务由API Gateway和Lambda组成,目标是实现p99延迟在100ms以内。

# inference_lambda/app.py
import json
import os
import boto3
import numpy as np
import tensorflow as tf
import joblib
from decimal import Decimal

# 初始化 - 在Lambda全局作用域完成,利用执行环境复用
DYNAMODB_TABLE_NAME = os.environ.get("DYNAMODB_TABLE_NAME", "timeseries_feature_store")
SEQUENCE_LENGTH = int(os.environ.get("SEQUENCE_LENGTH", 60))
MODEL_PATH = os.environ.get("MODEL_PATH", "./btc_price_predictor") # 模型文件打包在Lambda层或镜像中

try:
    dynamodb_resource = boto3.resource('dynamodb')
    feature_table = dynamodb_resource.Table(DYNAMODB_TABLE_NAME)
    
    # 加载模型和scaler。这个操作很耗时,所以必须放在handler之外
    model = tf.saved_model.load(os.path.join(MODEL_PATH, "1"))
    scaler = joblib.load(os.path.join(MODEL_PATH, "scaler.joblib"))

except Exception as e:
    # 致命错误,如果模型加载失败,服务无法正常工作
    print(f"FATAL: Failed to load model or scaler: {e}")
    raise e

def handler(event, context):
    try:
        # 从API Gateway获取请求参数
        instrument_id = event['queryStringParameters']['instrument_id']
    except (KeyError, TypeError):
        return {'statusCode': 400, 'body': json.dumps({'error': 'instrument_id query parameter is required'})}
    
    try:
        # 1. 从DynamoDB获取最新的特征序列
        response = feature_table.query(
            KeyConditionExpression=boto3.dynamodb.conditions.Key('instrument_id').eq(instrument_id),
            ScanIndexForward=False,
            Limit=SEQUENCE_LENGTH
        )
        items = response.get('Items', [])
        
        if len(items) < SEQUENCE_LENGTH:
            return {'statusCode': 404, 'body': json.dumps({'error': f'Not enough data to predict for {instrument_id}. Required: {SEQUENCE_LENGTH}, Found: {len(items)}'})}
        
        # 2. 准备模型输入
        # DynamoDB返回的顺序是降序的,模型需要升序
        items.reverse()
        
        # 将Decimal转换为float,并构建特征矩阵
        features_to_scale = np.array(
            [[float(item['price']), float(item['volume']), float(item['sma_10'])] for item in items]
        )
        
        # 3. 使用训练时保存的scaler进行标准化
        # 这里的坑在于:必须使用与训练时完全相同的scaler实例
        scaled_features = scaler.transform(features_to_scale)
        
        # 4. 模型推理
        # 输入需要增加一个batch维度
        input_tensor = tf.constant(np.expand_dims(scaled_features, axis=0), dtype=tf.float32)
        
        # TensorFlow Serving加载的模型需要指定签名
        # 'serving_default' 是默认签名
        prediction = model.signatures['serving_default'](input_tensor)
        # 输出通常是一个字典,需要根据模型定义来提取
        predicted_value_scaled = prediction['dense_1'].numpy()[0][0]
        
        # 5. 反标准化得到真实预测值
        # scaler.inverse_transform需要一个与原始特征维度相同的数组
        dummy_for_inverse = np.zeros((1, scaled_features.shape[1]))
        dummy_for_inverse[0, 0] = predicted_value_scaled # price在第一列
        predicted_price = scaler.inverse_transform(dummy_for_inverse)[0, 0]

        return {
            'statusCode': 200,
            'body': json.dumps({
                'instrument_id': instrument_id,
                'predicted_price': predicted_price,
                'last_timestamp': items[-1]['timestamp']
            })
        }
        
    except Exception as e:
        print(f"Error during inference for {instrument_id}: {e}")
        return {'statusCode': 500, 'body': json.dumps({'error': 'Internal server error during prediction'})}

这个推理Lambda同样贯彻了“只消费,不计算”的原则。它从DynamoDB获取的sma_10特征,与训练时从S3读取的sma_10,均源自同一个特征工程Lambda的同一份计算逻辑,从而彻底解决了训练-服务偏斜问题。

遗留问题与架构的边界

这套架构虽然解决了核心痛点,但并非银弹。它的局限性非常明显:

  1. 特征回填 (Backfilling): 当我们修改或增加一个新的特征计算逻辑时(比如将SMA_10改为SMA_20),这套架构无法自动为历史数据重新计算新特征。需要额外开发一个批处理任务(例如一个Glue Job),读取S3的原始数据,应用新逻辑,并将结果写回S3的一个新分区。这是一个复杂且耗时的过程。
  2. 成本: Kinesis Data Streams和DynamoDB在规模化使用时成本不菲。对于吞吐量极高但对延迟不那么敏感的场景,或许可以考虑使用Kinesis Firehose直接对接S3,然后用批处理任务进行特征工程,但这会牺牲实时性。
  3. 复杂性: 虽然每个组件都是托管服务,但整个系统的链路变长,组件间的依赖关系、权限配置、监控告警变得更加复杂,对团队的DevOps和SRE能力提出了更高要求。
  4. 特征发现与治理: 随着特征数量增多,如何让算法工程师方便地发现、复用和理解这些特征,成了一个新问题。这通常需要引入一个元数据管理层,也就是更成熟的Feature Store(如Feast, Tecton, SageMaker Feature Store)会提供的功能。

我们当前构建的,可以说是一个轻量级的、定制化的Feature Store。它用云原生服务解决了最关键的训练-服务偏斜问题,为模型上线提供了稳定性保障。未来的迭代方向,可能是在这个基础上引入更完善的特征治理,或者在成本与性能之间做出权衡,评估转向更专业的托管Feature Store服务的可行性。


  目录