构建基于 Scala 与 Knative 的时序数据 Serverless 采集网关


我们需要处理一个棘手的时序数据采集场景:数据源是百万级的物联网设备,它们并非持续上报,而是在特定时间窗口(例如每小时的头五分钟)内集中爆发式地提交数据。峰值 QPS 可达成千上万,但其余 90% 的时间里,系统几乎处于空闲状态。为一个短暂的高峰期而长期维持一个庞大的、高规格的微服务集群,在成本上是完全无法接受的。

初步的技术评估很快排除了几个常见方案。

传统的单体或常规微服务架构,需要按照峰值流量进行容量规划。这意味着在绝大部分时间里,服务器资源都在闲置,造成巨大的成本浪费。虽然可以配置 HPA (Horizontal Pod Autoscaler) 实现一定程度的弹性,但其伸缩速度和从零到一的启动能力,远不能满足我们对极致成本效益的要求。

另一个方向是纯粹的 FaaS (Function as a Service) 平台,如 AWS Lambda 或 Google Cloud Functions。它们天然具备按需付费和零伸缩的能力,看似完美匹配。但在真实项目中,这种方案的弊端也很明显:

  1. 供应商锁定:业务逻辑与特定云厂商的 FaaS SDK 深度绑定,迁移成本极高。
  2. “泥球”风险:时序数据采集的逻辑远非“接收-存储”这么简单。它涉及到数据校验、格式转换、标签规范化、指标聚合等一系列复杂的业务规则。将这些逻辑全部塞进一个函数体,很快就会演变成一个难以测试和维护的“大泥球”。
  3. 冷启动:对于某些对延迟敏感的场景,FaaS 的冷启动延迟可能成为一个不确定因素。

我们的目标是找到一个平衡点:既要获得 Serverless 的成本效益和弹性,又要保持架构的清晰、可测试性以及平台无关性。最终,我们将技术栈锁定在 Knative + Clean Architecture 的组合上。

  • Knative: 部署在任何标准的 Kubernetes 集群之上,提供事件驱动的 Serverless 工作负载能力,包括从零到一的快速扩容和缩容至零。它解决了资源利用率和成本问题,同时避免了供应商锁定。
  • Clean Architecture: 一种强大的架构思想,通过分层和依赖倒置原则,将核心业务逻辑与外部框架(如 Web 框架、数据库)彻底解耦。这保证了我们的核心采集逻辑是纯粹的、可独立测试的,并且可以轻松地从 Ktor 迁移到其他任何交付机制。

语言和框架的选择则是基于生产效率和稳定性的考量:

  • Scala: 其强大的类型系统和函数式编程范式,非常适合处理复杂的数据转换和校验流程,能在编译期发现大量潜在错误。
  • Ktor: 一个基于 Kotlin 协程的轻量级异步 Web 框架。它的资源占用小、启动速度快,非常适合在 Serverless 环境中作为 API 入口点。在 JVM 生态中,Scala 与 Kotlin 可以很好地协同工作。

架构决策:分层与依赖关系

在动手编码前,我们首先用 Clean Architecture 的思想明确了系统的分层结构和依赖关系。依赖箭头永远由外向内,确保核心业务逻辑的纯粹性。

graph TD
    subgraph Frameworks & Drivers [外部框架与驱动]
        A[Ktor Server] --> B
        D[InfluxDB Client] --> C
        E[Knative Service YAML]
    end

    subgraph Interface Adapters [接口适配器]
        B[IngestionController] --> F
        C[TimeSeriesRepositoryImpl] --> G
    end

    subgraph Use Cases [应用业务规则]
        F[IngestDataUseCase] --> G
        F --> H
    end

    subgraph Entities [企业业务规则]
        H[Metric / DataPoint]
        G[TimeSeriesRepository Port]
    end

    style Entities fill:#f9f,stroke:#333,stroke-width:2px
    style Use Cases fill:#ccf,stroke:#333,stroke-width:2px
  • Entities (实体层): 系统的核心,定义了最关键的业务对象,如 Metric, DataPoint 等。它们是纯粹的数据结构,不包含任何外部依赖。
  • Use Cases (用例层): 封装了具体的应用业务逻辑,例如“采集时序数据”这个用例。它会引用实体,并定义业务流程。同时,它定义了所需的数据持久化接口(Port),如 TimeSeriesRepository,但并不关心其具体实现。
  • Interface Adapters (接口适配器层): 这一层是转换器。IngestionController 将来自 Ktor 的 HTTP 请求转换为用例层能理解的输入。TimeSeriesRepositoryImpl 则是对 TimeSeriesRepository 接口的具体实现,负责与数据库交互。
  • Frameworks & Drivers (框架与驱动层): 最外层,包含了 Ktor 服务器、数据库客户端、Knative 部署配置等所有具体的技术实现和基础设施。

这种结构的核心优势在于,最内两层(Entities 和 Use Cases)构成了我们系统的业务核心,它们完全独立于任何外部技术,可以被独立编译、测试和复用。

核心实现:代码即架构

下面我们将逐步展示各个层次的关键代码实现。

1. Entities: 定义核心数据模型

这是最纯粹的一层,只包含业务对象的定义。我们使用 Scala 的 case class 来实现不可变的数据模型。

src/main/scala/com/example/domain/entities/TimeSeriesData.scala

package com.example.domain.entities

import java.time.Instant

/**
 * 代表一个数据点
 * @param time 时间戳
 * @param value 值
 */
case class DataPoint(time: Instant, value: Double)

/**
 * 代表一个完整的时序指标
 * @param name 指标名称, e.g., "cpu.temperature"
 * @param tags 标签,用于数据筛选和聚合。这是高基数问题的根源。
 *             e.g., Map("host" -> "server-a", "region" -> "us-west-1")
 * @param point 数据点
 */
case class Metric(name: String, tags: Map[String, String], point: DataPoint)

// DTO (Data Transfer Object) for request parsing
// 注意:DTO 属于接口适配器层,但常与实体一起定义以便于转换。
// 在严格的 Clean Architecture 中,它应该在适配器层。
// 这里为了简化项目结构,我们放在一起。
case class IngestRequest(
  name: String,
  tags: Map[String, String],
  timestamp: Long, // Use epoch milliseconds for simplicity in JSON
  value: Double
)

这里的 Metric 定义是关键。tags 字段是导致时序数据库出现“高基数 (high cardinality)”问题的核心。例如,如果一个 tag 的值是用户 ID 或请求 ID,那么这个 tag 的可能取值(基数)就会变得非常庞大,给 TSDB 的索引带来巨大压力。

2. Use Cases: 封装业务逻辑与抽象依赖

用例层定义了业务操作流程,并声明了它所依赖的接口(Port)。

src/main/scala/com/example/domain/ports/TimeSeriesRepository.scala

package com.example.domain.ports

import com.example.domain.entities.Metric
import scala.concurrent.Future

/**
 * 数据持久化接口 (Port)
 * 定义了用例层需要的数据存储能力,但不关心如何实现。
 * 使用 Future 来处理异步操作。
 */
trait TimeSeriesRepository {
  def save(metric: Metric): Future[Unit]
}

src/main/scala/com/example/application/usecases/IngestDataUseCase.scala

package com.example.application.usecases

import com.example.domain.entities.{DataPoint, Metric}
import com.example.domain.ports.TimeSeriesRepository

import java.time.Instant
import scala.concurrent.{ExecutionContext, Future}

/**
 * 采集时序数据的用例
 *
 * @param repository 数据仓库的实现,通过依赖注入传入
 * @param ec ExecutionContext for Future operations
 */
class IngestDataUseCase(repository: TimeSeriesRepository)(implicit ec: ExecutionContext) {

  // 定义业务校验规则
  private val MAX_TAGS = 10
  private val VALID_NAME_REGEX = "^[a-zA-Z0-9_.]+$".r

  // 应用级别的业务逻辑都封装在此
  def execute(name: String, tags: Map[String, String], timestamp: Long, value: Double): Future[Either[IngestionError, Unit]] = {
    
    // 1. 业务校验
    val validationResult = for {
      _ <- validateMetricName(name)
      _ <- validateTags(tags)
      _ <- validateTimestamp(timestamp)
    } yield ()

    validationResult match {
      case Left(error) => Future.successful(Left(error))
      case Right(_) =>
        // 2. 规范化处理 (对抗高基数问题)
        val normalizedTags = normalizeTags(tags)

        // 3. 构建领域对象
        val metric = Metric(
          name = name,
          tags = normalizedTags,
          point = DataPoint(time = Instant.ofEpochMilli(timestamp), value = value)
        )
        
        // 4. 通过接口进行持久化
        repository.save(metric).map(Right(_)).recover {
          case ex: Exception => Left(RepositoryError(ex.getMessage))
        }
    }
  }

  private def validateMetricName(name: String): Either[IngestionError, Unit] = {
    if (name.trim.isEmpty || !VALID_NAME_REGEX.matches(name)) {
      Left(InvalidMetricName("Metric name contains invalid characters or is empty."))
    } else {
      Right(())
    }
  }

  private def validateTags(tags: Map[String, String]): Either[IngestionError, Unit] = {
    if (tags.size > MAX_TAGS) {
      Left(TooManyTags(s"Number of tags ${tags.size} exceeds the limit of $MAX_TAGS."))
    } else if (tags.keys.exists(_.trim.isEmpty) || tags.values.exists(_.trim.isEmpty)) {
      Left(InvalidTag("Tag keys or values cannot be empty."))
    } else {
      Right(())
    }
  }
  
  private def validateTimestamp(timestamp: Long): Either[IngestionError, Unit] = {
    val now = Instant.now().toEpochMilli
    val tenMinutesInMillis = 10 * 60 * 1000
    // 数据点时间不能离当前时间太远
    if (Math.abs(now - timestamp) > tenMinutesInMillis) {
      Left(InvalidTimestamp("Timestamp is too far in the past or future."))
    } else {
      Right(())
    }
  }

  /**
   * 标签规范化是处理高基数问题的关键手段之一。
   * 在真实项目中,这里的逻辑会更复杂,例如:
   * - 移除基数过高的标签 (如 user_id, request_id)
   * - 将某些标签的值进行归一化 (如 http_status_code 200, 201, 204 -> 2xx)
   * - 限制标签值的长度
   */
  private def normalizeTags(tags: Map[String, String]): Map[String, String] = {
    tags.map { case (key, value) => key.toLowerCase.trim -> value.trim }
        .filterNot { case (key, _) => key == "session_id" } // 举例:过滤掉高基数标签
  }
}

// 定义用例可能产生的业务错误
sealed trait IngestionError
case class InvalidMetricName(message: String) extends IngestionError
case class TooManyTags(message: String) extends IngestionError
case class InvalidTag(message: String) extends IngestionError
case class InvalidTimestamp(message: String) extends IngestionError
case class RepositoryError(message: String) extends IngestionError

这个 IngestDataUseCase 是整个应用的核心。它包含了校验、规范化和持久化编排,但完全不依赖任何 Web 框架或数据库的具体实现。所有依赖都通过 TimeSeriesRepository 这个抽象接口注入。这一点至关重要,因为它使得我们可以对这部分核心逻辑进行独立的单元测试,而无需启动一个 Web 服务器或连接真实的数据库。

3. Interface Adapters & Frameworks: 连接外部世界

Repository 实现

这是对 TimeSeriesRepository 接口的具体实现。在真实项目中,这里会是 InfluxDB, VictoriaMetrics 或 Prometheus 的客户端。为了示例的可运行性,我们先用一个基于内存的伪实现。

src/main/scala/com/example/infrastructure/repositories/InMemoryTimeSeriesRepository.scala

package com.example.infrastructure.repositories

import com.example.domain.entities.Metric
import com.example.domain.ports.TimeSeriesRepository
import org.slf4j.LoggerFactory

import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}

/**
 * 一个用于演示和测试的内存实现
 */
class InMemoryTimeSeriesRepository(implicit ec: ExecutionContext) extends TimeSeriesRepository {
  private val logger = LoggerFactory.getLogger(getClass)
  private val storage = mutable.ArrayBuffer[Metric]()

  override def save(metric: Metric): Future[Unit] = Future {
    // 在真实项目中,这里会是调用数据库客户端的异步IO操作
    // logger.info(s"Persisting metric: $metric")
    synchronized {
      storage.append(metric)
    }
    // 模拟数据库写入延迟
    // Thread.sleep(10)
    logger.debug(s"Successfully saved metric. Total count: ${storage.size}")
  }
}
Ktor Controller 和主应用

这是最外层,负责接收 HTTP 请求,并将其委托给用例层处理。

src/main/scala/com/example/infrastructure/server/Application.scala

package com.example.infrastructure.server

import com.example.application.usecases.{IngestDataUseCase, IngestionError, RepositoryError}
import com.example.domain.entities.IngestRequest
import com.example.infrastructure.repositories.InMemoryTimeSeriesRepository
import io.ktor.serialization.gson.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.ktor.server.plugins.contentnegotiation.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.http.HttpStatusCode
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}
import scala.compat.java8.FutureConverters.*

object Application {
  // 生产环境中应该使用更精细配置的 ExecutionContext
  implicit val ec: ExecutionContext = ExecutionContext.global

  // --- 依赖注入 ---
  // 在真实项目中会使用 DI 框架如 Macwire
  val timeSeriesRepository = new InMemoryTimeSeriesRepository()
  val ingestDataUseCase = new IngestDataUseCase(timeSeriesRepository)

  @JvmStatic
  fun main(args: Array<String>) {
    embeddedServer(Netty, port = 8080, module = Application::module).start(wait = true)
  }
}

// Ktor Module
fun Application.module() {
  val logger = LoggerFactory.getLogger("Application")
  
  install(ContentNegotiation) {
    gson {
      setPrettyPrinting()
    }
  }

  routing {
    get("/health") {
      call.respond(mapOf("status" to "UP"))
    }
    
    post("/ingest") {
      val request = call.receive<IngestRequest>()
      
      // 调用 UseCase, Future[Either] -> CompletableFuture -> Ktor suspend
      val futureResult = Application.ingestDataUseCase.execute(
        request.name,
        request.tags,
        request.timestamp,
        request.value
      )

      // 桥接 Scala Future 和 Kotlin Coroutines
      val result = futureResult.toJava.await()

      result match {
        case Right(_) =>
          call.respond(HttpStatusCode.Accepted, mapOf("status" to "ok"))
        case Left(error) =>
          val (statusCode, errorMessage) = mapErrorToResponse(error)
          logger.warn(s"Ingestion failed: $errorMessage")
          call.respond(statusCode, mapOf("error" to errorMessage))
      }
    }
  }
}

private fun mapErrorToResponse(error: IngestionError): (HttpStatusCode, String) = {
  error match {
    case e: RepositoryError => (HttpStatusCode.InternalServerError, s"Storage error: ${e.message}")
    case _ => (HttpStatusCode.BadRequest, error.toString)
  }
}

注意这里我们如何将 Scala 的 Future 通过 toJava.await() 桥接到 Ktor 的协程上下文。这使得异步的业务逻辑可以无缝地集成到异步的 Web 框架中。Controller 的职责非常轻薄:解析请求、调用用例、根据结果返回响应。

4. Knative 部署配置

最后一步是将我们的应用容器化并部署到 Knative。

Dockerfile

FROM eclipse-temurin:11-jre

WORKDIR /app

# 我们使用 sbt-assembly 插件打包成一个 fat jar
COPY target/scala-2.13/knative-clean-arch-assembly-0.1.0-SNAPSHOT.jar app.jar

# Knative 期望应用监听 PORT 环境变量指定的端口
CMD ["java", "-jar", "app.jar"]

service.yaml

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: timeseries-ingester
spec:
  template:
    metadata:
      annotations:
        # 关键的 Knative 伸缩配置
        autoscaling.knative.dev/class: "kpa.autoscaling.knative.dev"
        # 每个 Pod 实例的目标并发请求数。当超过此值时,Knative 会扩容。
        autoscaling.knative.dev/metric: "concurrency"
        autoscaling.knative.dev/target: "100"
        # Pod 在缩容至零之前的闲置时间。
        autoscaling.knative.dev/scale-down-delay: "5m"
    spec:
      containers:
        - image: your-docker-registry/timeseries-ingester:latest # 替换为你的镜像地址
          ports:
            - containerPort: 8080 # 容器内部监听的端口
          env:
            - name: PORT
              value: "8080"
          resources:
            requests:
              cpu: "250m"
              memory: "512Mi"
            limits:
              cpu: "1"
              memory: "1Gi"

这份 Knative Service 定义是整个 Serverless 行为的核心。autoscaling.knative.dev/target: "100" 告诉 Knative Autscaler (KPA) 保持每个服务实例(Pod)平均处理 100 个并发请求。当流量激增,并发数超过这个阈值,Knative 会在几秒钟内快速创建新的 Pod 来分担负载。当流量消失,所有 Pod 会在 scale-down-delay 定义的闲置时间(这里是 5 分钟)后被自动销毁,真正实现“缩容至零”,不产生任何闲置成本。

遗留问题与未来迭代路径

这个架构虽然解决了核心的成本和维护性问题,但并非银弹。在生产环境中,仍有一些局限性和需要进一步考虑的优化点。

首先,Knative 的冷启动问题依然存在。对于从零启动的第一个请求,其延迟会包括镜像拉取、Pod 调度和 JVM 启动时间。对于我们的时序采集场景,秒级的冷启动延迟通常是可以接受的。但如果业务场景要求毫秒级响应,那么可以配置 autoscaling.knative.dev/min-scale: "1" 来保持至少一个热实例,但这会牺牲掉“缩容至零”的成本优势。

其次,当前的 InMemoryTimeSeriesRepository 只是一个占位符。替换为真实的 TSDB 客户端后,数据库连接池的管理在 Serverless 环境下是一个需要仔细权衡的问题。每个 Pod 实例都维护一个独立的连接池可能会迅速耗尽数据库的总连接数。需要采用合理的连接池配置(较小的最大连接数)或考虑使用像 PgBouncer 这样的外部连接池代理。

最后,用例层中的高基数处理逻辑目前还比较简单。一个更健壮的系统可能需要引入一个可配置的规则引擎,根据指标名称动态地决定哪些标签需要被过滤或聚合,甚至引入基于流处理的预聚合层来进一步降低入库数据的基数。Clean Architecture 在此再次展现了它的优势:这些复杂的业务逻辑变更都可以被限制在 Use Cases 层,而无需触动外部的 Web 服务或部署配置。


  目录