团队的AI模型迭代对实时特征的需求越来越迫切,原有的T+1批量ETL流程已经成为瓶颈。我们需要一个能够接收实时用户行为事件,处理后写入特征存储,并能在毫秒级延迟内被线上模型查询的管道。这个任务落到了我们平台工程团队。痛点很明确:我们需要一个高性能、低延迟的特征写入服务。
技术痛点与初步构想
最初的构想很简单:一个微服务接收HTTP请求,将特征数据写入数据库。但问题马上来了:写入哪个数据库?
- 事务性与审计需求: 我们需要一个可靠的地方记录每一条原始特征的写入日志,用于审计、问题追溯和模型再训练。这要求强事务性,关系型数据库如PostgreSQL是天然的选择。团队对JPA/Hibernate技术栈非常熟悉,这能保证开发效率。
- 高性能查询需求: 线上模型进行推理时,需要在一个请求的几十毫秒内拉取一个用户或项目的数十甚至上百个特征。这些查询通常是宽表、聚合类的分析型查询。让PostgreSQL同时承担这种高并发的OLAP查询,显然会把它压垮。
这就导向了一个典型的双写(Dual-Write)场景:同一份数据,或其不同形态,需要写入两个不同的存储系统。一个用于事务和记录(OLTP),一个用于分析和查询(OLAP)。
我们的技术选型决策很快就清晰了:
- 应用框架: Micronaut。它的预编译(AOT)特性带来了极快的启动速度和更低的内存占用,非常适合部署在资源受限的容器化环境中,这在云原生和AI平台降本增效的背景下至关重要。
- OLTP存储: PostgreSQL,通过JPA/Hibernate进行访问。这是团队的舒适区,稳定可靠。
- OLAP存储: ClickHouse。其列式存储和强大的聚合性能,是为我们这种场景量身定做的。
- 运行环境: Kubernetes,并已部署了Linkerd作为服务网格。这意味着我们不需要在应用代码中处理mTLS、重试、分布式追踪的上下文传递等问题,可以更专注于业务逻辑。服务网格的存在,是这次架构选型的一个重要前提。
核心挑战在于,如何优雅地实现这个从Micronaut服务到PostgreSQL和ClickHouse的双写管道,并确保其在生产环境中的可维护性和可观测性。
架构与流程设计
我们将构建一个feature-ingestion-service
。它暴露一个RESTful API端点,接收特征数据。内部逻辑需要在一个事务性上下文中,先将一条操作记录写入PostgreSQL,然后将处理后的特征宽表数据写入ClickHouse。
sequenceDiagram participant Client as 外部事件源 participant Ingress as K8s Ingress participant ServiceMesh as 服务网格代理 (Sidecar) participant Service as feature-ingestion-service participant PostgreSQL participant ClickHouse Client->>Ingress: POST /api/v1/features Ingress->>ServiceMesh: 转发请求 ServiceMesh->>Service: 注入追踪头并转发 activate Service Service->>PostgreSQL: 开始事务 Service->>PostgreSQL: INSERT INTO feature_audit_log (...) Service->>ClickHouse: INSERT INTO feature_wide_table (...) alt 数据写入成功 PostgreSQL-->>Service: OK ClickHouse-->>Service: OK Service->>PostgreSQL: 提交事务 Service-->>ServiceMesh: HTTP 202 Accepted ServiceMesh-->>Ingress: 响应 Ingress-->>Client: 响应 else ClickHouse写入失败 ClickHouse-->>Service: Exception Service->>PostgreSQL: 回滚事务 Service-->>ServiceMesh: HTTP 500 Internal Server Error ServiceMesh-->>Ingress: 响应 Ingress-->>Client: 响应 end deactivate Service
这个流程的关键在于事务边界。我们将PostgreSQL的写入操作作为主干,包裹在本地事务中。ClickHouse的写入操作也在此事务内执行。如果ClickHouse写入失败,整个操作会回滚,PostgreSQL的审计日志也不会被持久化。这保证了“至少”审计日志和特征数据是一致的。但反过来,如果ClickHouse写入成功,而PostgreSQL的事务提交失败(虽然概率极低),就会产生数据不一致。对于我们的场景,特征数据允许有极短时间的不一致和最终一致性,这个风险在当前阶段是可以接受的。
步骤化实现
1. 项目依赖与配置
首先是build.gradle.kts
文件,我们需要引入Micronaut、JPA、PostgreSQL驱动和ClickHouse的JDBC驱动。
// build.gradle.kts
plugins {
// ... Micronaut standard plugins
id("io.micronaut.application") version "4.2.1"
id("io.micronaut.aot") version "4.2.1"
id("com.google.devtools.ksp") version "1.9.22-1.0.17"
}
dependencies {
// Micronaut core
implementation("io.micronaut:micronaut-http-client")
implementation("io.micronaut.serde:micronaut-serde-jackson")
// Database & JPA
implementation("io.micronaut.data:micronaut-data-hibernate-jpa")
implementation("io.micronaut.sql:micronaut-jdbc-hikari")
runtimeOnly("org.postgresql:postgresql")
// ClickHouse JDBC Driver
// ClickHouse的驱动需要单独引入,它不通过JPA管理
implementation("com.clickhouse:clickhouse-jdbc:0.5.0")
// Logging and other utilities
runtimeOnly("ch.qos.logback:logback-classic")
// ... other dependencies
}
接下来是application.yml
的配置。这是核心,我们需要配置两个数据源。
# application.yml
micronaut:
application:
name: feature-ingestion-service
server:
port: 8080
# 启用AOT优化,在生产环境中至关重要
aot:
enabled: true
# ... more AOT configs
# JPA和PostgreSQL的数据源配置
datasources:
default:
url: jdbc:postgresql://${POSTGRES_HOST:localhost}:${POSTGRES_PORT:5432}/${POSTGRES_DB:features}
driverClassName: org.postgresql.Driver
username: ${POSTGRES_USER:user}
password: ${POSTGRES_PASSWORD:password}
schema-generate: CREATE_DROP # In dev, use 'UPDATE' or 'NONE' in prod
dialect: POSTGRES
jpa:
default:
properties:
hibernate:
hbm2ddl:
auto: update
show_sql: true
# ClickHouse的自定义配置,我们不把它作为Micronaut的'datasource'
# 而是作为独立的配置项,方便在代码中注入和管理
clickhouse:
url: jdbc:clickhouse://${CLICKHOUSE_HOST:localhost}:${CLICKHOUSE_PORT:8123}/${CLICKHOUSE_DB:features}
username: ${CLICKHOUSE_USER:default}
password: ${CLICKHOUSE_PASSWORD:}
# ClickHouse JDBC的一些重要参数
properties:
socket_timeout: 30000
# 启用数据压缩可以显著降低网络IO
compress: 1
# 对于批量写入,设置合理的批大小
batch_size: 10000
2. 数据模型定义
我们需要为PostgreSQL定义一个JPA实体,用于审计日志。
// src/main/java/com/example/feature/store/AuditLog.java
package com.example.feature.store;
import jakarta.persistence.*;
import java.time.Instant;
import io.micronaut.data.annotation.DateCreated;
@Entity
@Table(name = "feature_audit_log")
public class AuditLog {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, updatable = false)
private String entityId;
@Column(nullable = false, updatable = false)
private String featureName;
// 使用TEXT或JSONB类型存储原始事件体,便于追溯
@Column(columnDefinition = "TEXT", nullable = false)
private String rawPayload;
@DateCreated
@Column(nullable = false, updatable = false)
private Instant createdAt;
// Getters and Setters ...
}
同时,我们需要在ClickHouse中创建对应的宽表。DDL如下:
-- ClickHouse DDL
CREATE TABLE features.user_features (
`user_id` String,
`event_timestamp` DateTime64(3, 'UTC'),
`feature_a_value` Float64,
`feature_b_count_7d` UInt32,
`feature_c_latest_category` String,
`ingestion_time` DateTime DEFAULT now()
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_timestamp)
ORDER BY (user_id, event_timestamp);
这里的ENGINE = MergeTree()
是ClickHouse性能的关键。我们通过user_id
和event_timestamp
排序,这能极大地加速针对特定用户的时序特征查询。
3. 双写核心服务
这是整个系统的核心逻辑。我们创建一个FeatureIngestionService
,它注入JPA的Repository和我们自己管理的ClickHouse连接。
首先是ClickHouse的连接管理器。在真实项目中,直接用JDBC DriverManager
不是一个好主意,因为它没有连接池。这里我们为了简化示例,直接创建连接。生产环境中,应该使用一个支持ClickHouse的连接池库,如HikariCP(配置为非标准JDBC驱动)或Ural-CrDP。
// src/main/java/com/example/feature/clickhouse/ClickHouseClient.java
package com.example.feature.clickhouse;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import jakarta.inject.Singleton;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import com.clickhouse.jdbc.ClickHouseDataSource;
import java.util.Properties;
@Factory
public class ClickHouseClientFactory {
private final ClickHouseConfig config;
public ClickHouseClientFactory(ClickHouseConfig config) {
this.config = config;
}
@Bean(preDestroy = "close") // 虽然没实现close,但这是好的实践
@Singleton
public ClickHouseDataSource clickHouseDataSource() throws SQLException {
// 使用ClickHouse官方推荐的DataSource方式
var props = new Properties();
props.putAll(config.getProperties());
return new ClickHouseDataSource(config.getUrl(), props);
}
}
// src/main/java/com/example/feature/clickhouse/ClickHouseConfig.java
// 用于注入application.yml中的配置
package com.example.feature.clickhouse;
import io.micronaut.context.annotation.ConfigurationProperties;
import java.util.Map;
@ConfigurationProperties("clickhouse")
public interface ClickHouseConfig {
String getUrl();
Map<String, String> getProperties();
}
现在是核心的双写服务实现。
// src/main/java/com/example/feature/store/FeatureIngestionService.java
package com.example.feature.store;
import com.example.feature.dto.FeatureIngestionRequest;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micronaut.transaction.annotation.Transactional;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.clickhouse.jdbc.ClickHouseDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
@Singleton
public class FeatureIngestionService {
private static final Logger LOG = LoggerFactory.getLogger(FeatureIngestionService.class);
private final AuditLogRepository auditLogRepository;
private final ClickHouseDataSource clickHouseDataSource;
private final ObjectMapper objectMapper;
// ClickHouse的写入SQL模板
private static final String CLICKHOUSE_INSERT_SQL =
"INSERT INTO user_features (user_id, event_timestamp, feature_a_value, feature_b_count_7d, feature_c_latest_category) VALUES (?, ?, ?, ?, ?)";
public FeatureIngestionService(AuditLogRepository auditLogRepository,
ClickHouseDataSource clickHouseDataSource,
ObjectMapper objectMapper) {
this.auditLogRepository = auditLogRepository;
this.clickHouseDataSource = clickHouseDataSource;
this.objectMapper = objectMapper;
}
/**
* 核心双写逻辑。
* 使用Micronaut的@Transactional注解,确保对PostgreSQL的写入是事务性的。
* ClickHouse的写入在此事务内执行。如果ClickHouse写入失败,将抛出异常,
* 导致整个方法事务回滚。
*/
@Transactional
public void ingest(FeatureIngestionRequest request) {
// 步骤 1: 将原始请求序列化,用于审计日志
String rawPayload;
try {
rawPayload = objectMapper.writeValueAsString(request);
} catch (JsonProcessingException e) {
// 在真实项目中,这种序列化失败是严重的系统问题
throw new IngestionProcessingException("Failed to serialize request payload", e);
}
// 步骤 2: 创建并保存审计日志到PostgreSQL
AuditLog log = new AuditLog();
log.setEntityId(request.getUserId());
log.setFeatureName("user_features_batch"); // 假设是批量特征
log.setRawPayload(rawPayload);
auditLogRepository.save(log);
LOG.debug("Audit log saved for entityId: {}", request.getUserId());
// 步骤 3: 写入数据到ClickHouse
try (Connection connection = clickHouseDataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(CLICKHOUSE_INSERT_SQL)) {
ps.setString(1, request.getUserId());
ps.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(request.getEventTimestamp())));
ps.setDouble(3, request.getFeatureAValue());
ps.setInt(4, request.getFeatureBCount7d());
ps.setString(5, request.getFeatureCLatestCategory());
ps.executeUpdate();
LOG.info("Feature data inserted into ClickHouse for userId: {}", request.getUserId());
} catch (SQLException e) {
// 这是关键的错误处理点
// 抛出一个运行时异常,这将触发@Transactional的回滚
LOG.error("Failed to insert feature data into ClickHouse for userId: {}. Triggering transaction rollback.", request.getUserId(), e);
throw new IngestionPersistenceException("ClickHouse write failed", e);
}
// 如果代码执行到这里,说明两个写入操作在逻辑上都已“准备好”
// Micronaut的事务管理器将在方法退出时提交PostgreSQL的事务
}
// 自定义异常类,便于区分错误来源
public static class IngestionProcessingException extends RuntimeException {
public IngestionProcessingException(String message, Throwable cause) { super(message, cause); }
}
public static class IngestionPersistenceException extends RuntimeException {
public IngestionPersistenceException(String message, Throwable cause) { super(message, cause); }
}
}
// 定义JPA Repository接口
// src/main/java/com/example/feature/store/AuditLogRepository.java
package com.example.feature.store;
import io.micronaut.data.jdbc.annotation.JdbcRepository;
import io.micronaut.data.model.query.builder.sql.Dialect;
import io.micronaut.data.repository.CrudRepository;
@JdbcRepository(dialect = Dialect.POSTGRES)
public interface AuditLogRepository extends CrudRepository<AuditLog, Long> {
}
4. API 端点
最后,我们创建一个简单的Controller来暴露服务。
// src/main/java/com/example/feature/api/FeatureController.java
package com.example.feature.api;
import com.example.feature.dto.FeatureIngestionRequest;
import com.example.feature.store.FeatureIngestionService;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import jakarta.validation.Valid;
@Controller("/api/v1/features")
public class FeatureController {
private final FeatureIngestionService ingestionService;
public FeatureController(FeatureIngestionService ingestionService) {
this.ingestionService = ingestionService;
}
@Post
public HttpResponse<Void> ingestFeature(@Body @Valid FeatureIngestionRequest request) {
ingestionService.ingest(request);
// 使用HTTP 202 Accepted,因为写入是异步的,客户端不必等待数据立即可查
return HttpResponse.accepted();
}
}
// DTO类
// src/main/java/com/example/feature/dto/FeatureIngestionRequest.java
// ... 省略了带有getter, setter和validation注解的简单Java Bean ...
服务网格下的可观测性
一个常见的错误是在应用代码里加入过多的可观测性逻辑。在我们的架构中,服务网gao(Linkerd)自动处理了以下问题:
分布式追踪: 当请求进入
feature-ingestion-service
时,Linkerd的代理(sidecar)会检查入口请求是否包含追踪头(如b3
或traceparent
)。如果没有,它会生成一个新的trace ID。然后,当我们的服务通过JDBC调用PostgreSQL和ClickHouse时,只要这些数据库的JDBC驱动支持并且环境中配置了OpenTelemetry等agent,追踪上下文就会被自动传播。最终,我们可以在Jaeger或Zipkin中看到一条完整的链路:API Gateway -> feature-ingestion-service -> JDBC(PostgreSQL) -> JDBC(ClickHouse)
。这对于排查哪个环节耗时最长至关重要,而我们一行代码都不用写。黄金指标: Linkerd自动为我们的服务暴露了请求成功率、请求速率和延迟(RPS)等黄金指标。我们可以在Grafana仪表盘上监控服务的健康状况,设置告警,而无需在代码中引入任何Prometheus客户端库。
mTLS: 服务之间的所有通信,例如从上游服务到
feature-ingestion-service
的调用,都由服务网格自动加密,保证了传输安全。
这种关注点分离让我们能够专注于核心的、复杂的双写业务逻辑,将网络层面的问题交给基础设施。
方案的局限性与未来迭代
当前方案并非完美,它是一个在特定约束下的务实选择。
首先,双写的一致性问题并未被彻底解决。我们依赖于“如果ClickHouse失败,就回滚所有操作”的策略,这能防止脏数据写入审计库。但它无法处理PostgreSQL提交失败的边缘情况。一个更健壮的方案是引入消息队列(如Kafka)作为中间缓冲。服务只负责将事件原子地写入Kafka,然后由两个独立的消费者分别写入PostgreSQL和ClickHouse。这种模式(通常被称为Change Data Capture或事件溯源的变体)能提供更强的解耦和可靠性,但架构复杂度也相应增加。
其次,ClickHouse的写入性能。当前的同步JDBC写入方式,在高并发下可能会成为瓶颈,阻塞HTTP请求线程。对于需要极高吞吐量的场景,应该考虑异步写入,或者将数据批量写入Kafka,再通过ClickHouse的Kafka引擎或专用的ETL工具(如Vector)进行消费。这能将写入延迟和服务响应延迟解耦。
最后,错误处理与重试。服务网格可以处理网络层面的瞬时故障重试,但对于应用层面的错误(如ClickHouse因为数据格式问题拒绝写入),需要应用自身具备更精细的死信队列(DLQ)机制。目前的设计中,失败的请求会直接返回500错误,需要调用方负责重试,这在某些场景下可能不够理想。未来的版本可以增加一个错误处理器,将处理失败的消息发送到DLQ,供后续分析和重补。