我们需要处理一个棘手的时序数据采集场景:数据源是百万级的物联网设备,它们并非持续上报,而是在特定时间窗口(例如每小时的头五分钟)内集中爆发式地提交数据。峰值 QPS 可达成千上万,但其余 90% 的时间里,系统几乎处于空闲状态。为一个短暂的高峰期而长期维持一个庞大的、高规格的微服务集群,在成本上是完全无法接受的。
初步的技术评估很快排除了几个常见方案。
传统的单体或常规微服务架构,需要按照峰值流量进行容量规划。这意味着在绝大部分时间里,服务器资源都在闲置,造成巨大的成本浪费。虽然可以配置 HPA (Horizontal Pod Autoscaler) 实现一定程度的弹性,但其伸缩速度和从零到一的启动能力,远不能满足我们对极致成本效益的要求。
另一个方向是纯粹的 FaaS (Function as a Service) 平台,如 AWS Lambda 或 Google Cloud Functions。它们天然具备按需付费和零伸缩的能力,看似完美匹配。但在真实项目中,这种方案的弊端也很明显:
- 供应商锁定:业务逻辑与特定云厂商的 FaaS SDK 深度绑定,迁移成本极高。
- “泥球”风险:时序数据采集的逻辑远非“接收-存储”这么简单。它涉及到数据校验、格式转换、标签规范化、指标聚合等一系列复杂的业务规则。将这些逻辑全部塞进一个函数体,很快就会演变成一个难以测试和维护的“大泥球”。
- 冷启动:对于某些对延迟敏感的场景,FaaS 的冷启动延迟可能成为一个不确定因素。
我们的目标是找到一个平衡点:既要获得 Serverless 的成本效益和弹性,又要保持架构的清晰、可测试性以及平台无关性。最终,我们将技术栈锁定在 Knative
+ Clean Architecture
的组合上。
- Knative: 部署在任何标准的 Kubernetes 集群之上,提供事件驱动的 Serverless 工作负载能力,包括从零到一的快速扩容和缩容至零。它解决了资源利用率和成本问题,同时避免了供应商锁定。
- Clean Architecture: 一种强大的架构思想,通过分层和依赖倒置原则,将核心业务逻辑与外部框架(如 Web 框架、数据库)彻底解耦。这保证了我们的核心采集逻辑是纯粹的、可独立测试的,并且可以轻松地从 Ktor 迁移到其他任何交付机制。
语言和框架的选择则是基于生产效率和稳定性的考量:
- Scala: 其强大的类型系统和函数式编程范式,非常适合处理复杂的数据转换和校验流程,能在编译期发现大量潜在错误。
- Ktor: 一个基于 Kotlin 协程的轻量级异步 Web 框架。它的资源占用小、启动速度快,非常适合在 Serverless 环境中作为 API 入口点。在 JVM 生态中,Scala 与 Kotlin 可以很好地协同工作。
架构决策:分层与依赖关系
在动手编码前,我们首先用 Clean Architecture 的思想明确了系统的分层结构和依赖关系。依赖箭头永远由外向内,确保核心业务逻辑的纯粹性。
graph TD subgraph Frameworks & Drivers [外部框架与驱动] A[Ktor Server] --> B D[InfluxDB Client] --> C E[Knative Service YAML] end subgraph Interface Adapters [接口适配器] B[IngestionController] --> F C[TimeSeriesRepositoryImpl] --> G end subgraph Use Cases [应用业务规则] F[IngestDataUseCase] --> G F --> H end subgraph Entities [企业业务规则] H[Metric / DataPoint] G[TimeSeriesRepository Port] end style Entities fill:#f9f,stroke:#333,stroke-width:2px style Use Cases fill:#ccf,stroke:#333,stroke-width:2px
- Entities (实体层): 系统的核心,定义了最关键的业务对象,如
Metric
,DataPoint
等。它们是纯粹的数据结构,不包含任何外部依赖。 - Use Cases (用例层): 封装了具体的应用业务逻辑,例如“采集时序数据”这个用例。它会引用实体,并定义业务流程。同时,它定义了所需的数据持久化接口(Port),如
TimeSeriesRepository
,但并不关心其具体实现。 - Interface Adapters (接口适配器层): 这一层是转换器。
IngestionController
将来自 Ktor 的 HTTP 请求转换为用例层能理解的输入。TimeSeriesRepositoryImpl
则是对TimeSeriesRepository
接口的具体实现,负责与数据库交互。 - Frameworks & Drivers (框架与驱动层): 最外层,包含了 Ktor 服务器、数据库客户端、Knative 部署配置等所有具体的技术实现和基础设施。
这种结构的核心优势在于,最内两层(Entities 和 Use Cases)构成了我们系统的业务核心,它们完全独立于任何外部技术,可以被独立编译、测试和复用。
核心实现:代码即架构
下面我们将逐步展示各个层次的关键代码实现。
1. Entities: 定义核心数据模型
这是最纯粹的一层,只包含业务对象的定义。我们使用 Scala 的 case class 来实现不可变的数据模型。
src/main/scala/com/example/domain/entities/TimeSeriesData.scala
package com.example.domain.entities
import java.time.Instant
/**
* 代表一个数据点
* @param time 时间戳
* @param value 值
*/
case class DataPoint(time: Instant, value: Double)
/**
* 代表一个完整的时序指标
* @param name 指标名称, e.g., "cpu.temperature"
* @param tags 标签,用于数据筛选和聚合。这是高基数问题的根源。
* e.g., Map("host" -> "server-a", "region" -> "us-west-1")
* @param point 数据点
*/
case class Metric(name: String, tags: Map[String, String], point: DataPoint)
// DTO (Data Transfer Object) for request parsing
// 注意:DTO 属于接口适配器层,但常与实体一起定义以便于转换。
// 在严格的 Clean Architecture 中,它应该在适配器层。
// 这里为了简化项目结构,我们放在一起。
case class IngestRequest(
name: String,
tags: Map[String, String],
timestamp: Long, // Use epoch milliseconds for simplicity in JSON
value: Double
)
这里的 Metric
定义是关键。tags
字段是导致时序数据库出现“高基数 (high cardinality)”问题的核心。例如,如果一个 tag 的值是用户 ID 或请求 ID,那么这个 tag 的可能取值(基数)就会变得非常庞大,给 TSDB 的索引带来巨大压力。
2. Use Cases: 封装业务逻辑与抽象依赖
用例层定义了业务操作流程,并声明了它所依赖的接口(Port)。
src/main/scala/com/example/domain/ports/TimeSeriesRepository.scala
package com.example.domain.ports
import com.example.domain.entities.Metric
import scala.concurrent.Future
/**
* 数据持久化接口 (Port)
* 定义了用例层需要的数据存储能力,但不关心如何实现。
* 使用 Future 来处理异步操作。
*/
trait TimeSeriesRepository {
def save(metric: Metric): Future[Unit]
}
src/main/scala/com/example/application/usecases/IngestDataUseCase.scala
package com.example.application.usecases
import com.example.domain.entities.{DataPoint, Metric}
import com.example.domain.ports.TimeSeriesRepository
import java.time.Instant
import scala.concurrent.{ExecutionContext, Future}
/**
* 采集时序数据的用例
*
* @param repository 数据仓库的实现,通过依赖注入传入
* @param ec ExecutionContext for Future operations
*/
class IngestDataUseCase(repository: TimeSeriesRepository)(implicit ec: ExecutionContext) {
// 定义业务校验规则
private val MAX_TAGS = 10
private val VALID_NAME_REGEX = "^[a-zA-Z0-9_.]+$".r
// 应用级别的业务逻辑都封装在此
def execute(name: String, tags: Map[String, String], timestamp: Long, value: Double): Future[Either[IngestionError, Unit]] = {
// 1. 业务校验
val validationResult = for {
_ <- validateMetricName(name)
_ <- validateTags(tags)
_ <- validateTimestamp(timestamp)
} yield ()
validationResult match {
case Left(error) => Future.successful(Left(error))
case Right(_) =>
// 2. 规范化处理 (对抗高基数问题)
val normalizedTags = normalizeTags(tags)
// 3. 构建领域对象
val metric = Metric(
name = name,
tags = normalizedTags,
point = DataPoint(time = Instant.ofEpochMilli(timestamp), value = value)
)
// 4. 通过接口进行持久化
repository.save(metric).map(Right(_)).recover {
case ex: Exception => Left(RepositoryError(ex.getMessage))
}
}
}
private def validateMetricName(name: String): Either[IngestionError, Unit] = {
if (name.trim.isEmpty || !VALID_NAME_REGEX.matches(name)) {
Left(InvalidMetricName("Metric name contains invalid characters or is empty."))
} else {
Right(())
}
}
private def validateTags(tags: Map[String, String]): Either[IngestionError, Unit] = {
if (tags.size > MAX_TAGS) {
Left(TooManyTags(s"Number of tags ${tags.size} exceeds the limit of $MAX_TAGS."))
} else if (tags.keys.exists(_.trim.isEmpty) || tags.values.exists(_.trim.isEmpty)) {
Left(InvalidTag("Tag keys or values cannot be empty."))
} else {
Right(())
}
}
private def validateTimestamp(timestamp: Long): Either[IngestionError, Unit] = {
val now = Instant.now().toEpochMilli
val tenMinutesInMillis = 10 * 60 * 1000
// 数据点时间不能离当前时间太远
if (Math.abs(now - timestamp) > tenMinutesInMillis) {
Left(InvalidTimestamp("Timestamp is too far in the past or future."))
} else {
Right(())
}
}
/**
* 标签规范化是处理高基数问题的关键手段之一。
* 在真实项目中,这里的逻辑会更复杂,例如:
* - 移除基数过高的标签 (如 user_id, request_id)
* - 将某些标签的值进行归一化 (如 http_status_code 200, 201, 204 -> 2xx)
* - 限制标签值的长度
*/
private def normalizeTags(tags: Map[String, String]): Map[String, String] = {
tags.map { case (key, value) => key.toLowerCase.trim -> value.trim }
.filterNot { case (key, _) => key == "session_id" } // 举例:过滤掉高基数标签
}
}
// 定义用例可能产生的业务错误
sealed trait IngestionError
case class InvalidMetricName(message: String) extends IngestionError
case class TooManyTags(message: String) extends IngestionError
case class InvalidTag(message: String) extends IngestionError
case class InvalidTimestamp(message: String) extends IngestionError
case class RepositoryError(message: String) extends IngestionError
这个 IngestDataUseCase
是整个应用的核心。它包含了校验、规范化和持久化编排,但完全不依赖任何 Web 框架或数据库的具体实现。所有依赖都通过 TimeSeriesRepository
这个抽象接口注入。这一点至关重要,因为它使得我们可以对这部分核心逻辑进行独立的单元测试,而无需启动一个 Web 服务器或连接真实的数据库。
3. Interface Adapters & Frameworks: 连接外部世界
Repository 实现
这是对 TimeSeriesRepository
接口的具体实现。在真实项目中,这里会是 InfluxDB, VictoriaMetrics 或 Prometheus 的客户端。为了示例的可运行性,我们先用一个基于内存的伪实现。
src/main/scala/com/example/infrastructure/repositories/InMemoryTimeSeriesRepository.scala
package com.example.infrastructure.repositories
import com.example.domain.entities.Metric
import com.example.domain.ports.TimeSeriesRepository
import org.slf4j.LoggerFactory
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
/**
* 一个用于演示和测试的内存实现
*/
class InMemoryTimeSeriesRepository(implicit ec: ExecutionContext) extends TimeSeriesRepository {
private val logger = LoggerFactory.getLogger(getClass)
private val storage = mutable.ArrayBuffer[Metric]()
override def save(metric: Metric): Future[Unit] = Future {
// 在真实项目中,这里会是调用数据库客户端的异步IO操作
// logger.info(s"Persisting metric: $metric")
synchronized {
storage.append(metric)
}
// 模拟数据库写入延迟
// Thread.sleep(10)
logger.debug(s"Successfully saved metric. Total count: ${storage.size}")
}
}
Ktor Controller 和主应用
这是最外层,负责接收 HTTP 请求,并将其委托给用例层处理。
src/main/scala/com/example/infrastructure/server/Application.scala
package com.example.infrastructure.server
import com.example.application.usecases.{IngestDataUseCase, IngestionError, RepositoryError}
import com.example.domain.entities.IngestRequest
import com.example.infrastructure.repositories.InMemoryTimeSeriesRepository
import io.ktor.serialization.gson.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.server.plugins.contentnegotiation.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.http.HttpStatusCode
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}
import scala.compat.java8.FutureConverters.*
object Application {
// 生产环境中应该使用更精细配置的 ExecutionContext
implicit val ec: ExecutionContext = ExecutionContext.global
// --- 依赖注入 ---
// 在真实项目中会使用 DI 框架如 Macwire
val timeSeriesRepository = new InMemoryTimeSeriesRepository()
val ingestDataUseCase = new IngestDataUseCase(timeSeriesRepository)
@JvmStatic
fun main(args: Array<String>) {
embeddedServer(Netty, port = 8080, module = Application::module).start(wait = true)
}
}
// Ktor Module
fun Application.module() {
val logger = LoggerFactory.getLogger("Application")
install(ContentNegotiation) {
gson {
setPrettyPrinting()
}
}
routing {
get("/health") {
call.respond(mapOf("status" to "UP"))
}
post("/ingest") {
val request = call.receive<IngestRequest>()
// 调用 UseCase, Future[Either] -> CompletableFuture -> Ktor suspend
val futureResult = Application.ingestDataUseCase.execute(
request.name,
request.tags,
request.timestamp,
request.value
)
// 桥接 Scala Future 和 Kotlin Coroutines
val result = futureResult.toJava.await()
result match {
case Right(_) =>
call.respond(HttpStatusCode.Accepted, mapOf("status" to "ok"))
case Left(error) =>
val (statusCode, errorMessage) = mapErrorToResponse(error)
logger.warn(s"Ingestion failed: $errorMessage")
call.respond(statusCode, mapOf("error" to errorMessage))
}
}
}
}
private fun mapErrorToResponse(error: IngestionError): (HttpStatusCode, String) = {
error match {
case e: RepositoryError => (HttpStatusCode.InternalServerError, s"Storage error: ${e.message}")
case _ => (HttpStatusCode.BadRequest, error.toString)
}
}
注意这里我们如何将 Scala 的 Future
通过 toJava.await()
桥接到 Ktor 的协程上下文。这使得异步的业务逻辑可以无缝地集成到异步的 Web 框架中。Controller 的职责非常轻薄:解析请求、调用用例、根据结果返回响应。
4. Knative 部署配置
最后一步是将我们的应用容器化并部署到 Knative。
Dockerfile
FROM eclipse-temurin:11-jre
WORKDIR /app
# 我们使用 sbt-assembly 插件打包成一个 fat jar
COPY target/scala-2.13/knative-clean-arch-assembly-0.1.0-SNAPSHOT.jar app.jar
# Knative 期望应用监听 PORT 环境变量指定的端口
CMD ["java", "-jar", "app.jar"]
service.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: timeseries-ingester
spec:
template:
metadata:
annotations:
# 关键的 Knative 伸缩配置
autoscaling.knative.dev/class: "kpa.autoscaling.knative.dev"
# 每个 Pod 实例的目标并发请求数。当超过此值时,Knative 会扩容。
autoscaling.knative.dev/metric: "concurrency"
autoscaling.knative.dev/target: "100"
# Pod 在缩容至零之前的闲置时间。
autoscaling.knative.dev/scale-down-delay: "5m"
spec:
containers:
- image: your-docker-registry/timeseries-ingester:latest # 替换为你的镜像地址
ports:
- containerPort: 8080 # 容器内部监听的端口
env:
- name: PORT
value: "8080"
resources:
requests:
cpu: "250m"
memory: "512Mi"
limits:
cpu: "1"
memory: "1Gi"
这份 Knative Service
定义是整个 Serverless 行为的核心。autoscaling.knative.dev/target: "100"
告诉 Knative Autscaler (KPA) 保持每个服务实例(Pod)平均处理 100 个并发请求。当流量激增,并发数超过这个阈值,Knative 会在几秒钟内快速创建新的 Pod 来分担负载。当流量消失,所有 Pod 会在 scale-down-delay
定义的闲置时间(这里是 5 分钟)后被自动销毁,真正实现“缩容至零”,不产生任何闲置成本。
遗留问题与未来迭代路径
这个架构虽然解决了核心的成本和维护性问题,但并非银弹。在生产环境中,仍有一些局限性和需要进一步考虑的优化点。
首先,Knative 的冷启动问题依然存在。对于从零启动的第一个请求,其延迟会包括镜像拉取、Pod 调度和 JVM 启动时间。对于我们的时序采集场景,秒级的冷启动延迟通常是可以接受的。但如果业务场景要求毫秒级响应,那么可以配置 autoscaling.knative.dev/min-scale: "1"
来保持至少一个热实例,但这会牺牲掉“缩容至零”的成本优势。
其次,当前的 InMemoryTimeSeriesRepository
只是一个占位符。替换为真实的 TSDB 客户端后,数据库连接池的管理在 Serverless 环境下是一个需要仔细权衡的问题。每个 Pod 实例都维护一个独立的连接池可能会迅速耗尽数据库的总连接数。需要采用合理的连接池配置(较小的最大连接数)或考虑使用像 PgBouncer 这样的外部连接池代理。
最后,用例层中的高基数处理逻辑目前还比较简单。一个更健壮的系统可能需要引入一个可配置的规则引擎,根据指标名称动态地决定哪些标签需要被过滤或聚合,甚至引入基于流处理的预聚合层来进一步降低入库数据的基数。Clean Architecture 在此再次展现了它的优势:这些复杂的业务逻辑变更都可以被限制在 Use Cases 层,而无需触动外部的 Web 服务或部署配置。