构建基于Cassandra的软件供应链安全数据平台及其索引优化实践


CI流水线每天产出上万份依赖扫描报告,最初采用的对象存储加日志检索的方案很快就成了瓶颈。当安全团队提出两个核心查询需求时,现有架构彻底无法支撑:“哪些项目正受新爆出的CVE-2023-XXXX漏洞影响?”以及“给我展示Platform团队过去半年的安全态势演进曲线”。这两个需求,一个要求对全量组件进行快速、精确的反向查找,另一个则需要对时间序列数据进行聚合分析。这两种查询模式对于传统关系型数据库或者简单的日志系统来说,都是一场灾难。

问题的本质是数据模型的矛盾。依赖扫描数据天然具备时间序列属性(每次构建都是一个时间点快照),同时又需要支持高度灵活的切片查询(按漏洞、按组件、按许可证、按项目)。在评估了Elasticsearch等方案后,考虑到其运维复杂性和我们场景下相对固定的查询模式,最终将技术栈锁定在了Apache Cassandra。选择它的理由很明确:优异的写吞吐能力、线性的横向扩展性、以及为特定查询模式设计数据模型的能力。这恰好与我们“写入量远大于读取量,且读取模式固定”的场景相契合。

第一步:失败的数据模型与重构

任何Cassandra的实践都始于数据建模。一个常见的错误是,工程师会带着关系型数据库的设计思路来构建表,这注定会失败。我们最初的设计就是一个典型反例:

// 反模式:一个大而全的表,试图满足所有查询
CREATE TABLE IF NOT EXISTS security_scans_bad (
    project_id uuid,
    scan_timestamp timestamp,
    component_name text,
    component_version text,
    cve_id text,
    severity text,
    license text,
    // ... 其他几十个字段
    PRIMARY KEY ((project_id), scan_timestamp, component_name)
) WITH CLUSTERING ORDER BY (scan_timestamp DESC, component_name ASC);

这个模型的PRIMARY KEY定义,以project_id为分区键,scan_timestampcomponent_name为聚类键。它能很好地回答“A项目某次扫描发现了哪些组件?”这类问题。但对于我们的核心需求——“哪些项目使用了log4j-core-2.17.0?”——它完全无能为力。执行这类查询需要对整个集群进行全表扫描,这在生产环境中是绝对禁止的。

Cassandra的设计哲学是“查询驱动建模”,即先定义你的查询语句(CQL),再反向设计能够最高效服务这些查询的表结构。这意味着数据冗余不再是罪恶,而是特性。根据我们的核心需求,至少需要两张表:

  1. 一张表用于查询特定项目的漏洞历史。
  2. 另一张表用于反向查询受特定漏洞影响的所有项目。
graph TD
    A[CI Pipeline] -- Scan Report (JSON) --> B(Ingestion Service);
    B -- Writes (Batch) --> C{Cassandra Cluster};
    C --> D[scans_by_project_and_time];
    C --> E[projects_by_vulnerability];
    G(Query API) -- CQL --> C;
    H[Grafana Dashboard] -- HTTP Request --> G;

基于此,我们重新设计了表结构:

表1:scans_by_project_and_time

这张表的核心目标是获取某个项目的安全扫描历史,并按时间倒序排列。

-- 专为“查询项目历史”而设计的表
CREATE TABLE IF NOT EXISTS scans_by_project_and_time (
    project_id uuid,          // 分区键:定位到具体项目的数据分区
    scan_id timeuuid,         // 聚类键:保证扫描的唯一性,并按时间排序
    scan_timestamp timestamp, // 扫描发生时间,用于前端展示
    report_url text,          // 指向原始报告的链接
    critical_count int,       // 聚合指标,方便快速预览
    high_count int,
    medium_count int,
    low_count int,
    licenses set<text>,       // 项目使用的所有许可证集合
    raw_report blob,          // 存储原始JSON报告,用于追溯
    PRIMARY KEY (project_id, scan_id)
) WITH CLUSTERING ORDER BY (scan_id DESC);
-- `scan_id` 使用 timeuuid 类型,它天然包含了时间戳信息并能保证唯一性,作为聚类键可以实现高效的时间范围查询。
-- 聚合指标(critical_count等)的预计算是关键优化,避免了在查询时再对原始报告进行解析和计算。

表2:projects_by_vulnerability

这张表用于快速响应“哪个项目受此漏洞影响”的核心安全应急需求。

-- 专为“按漏洞反查项目”而设计的表
CREATE TABLE IF NOT EXISTS projects_by_vulnerability (
    vulnerability_id text,      // 分区键:例如 CVE-2021-44228
    project_id uuid,            // 聚类键:受影响的项目
    project_name text,          // 冗余字段,避免二次查询
    component_name text,        // 造成漏洞的组件
    component_version text,     // 组件版本
    severity text,              // 漏洞级别
    first_seen_at timestamp,    // 首次发现时间
    last_seen_at timestamp,     // 最近一次扫描仍然存在的时间
    PRIMARY KEY (vulnerability_id, project_id, component_name)
);
-- 这里的关键是将 `vulnerability_id` 作为分区键。
-- 当一个新的高危漏洞爆发时,我们可以通过 `SELECT project_name FROM projects_by_vulnerability WHERE vulnerability_id = 'CVE-2021-44228'`
-- 在毫秒级别内拉出全公司所有受影响的项目列表,这是之前架构完全无法做到的。

构建高吞吐数据摄入服务

模型设计完成后,下一步是构建一个能处理海量扫描报告并将其写入Cassandra的服务。我们选择使用Go语言,因为它出色的并发性能和静态类型系统非常适合构建这类中间件。

核心代码段展示了如何解析报告并以批处理方式写入上述两张表。在真实项目中,批处理(Batching)至关重要,它能将多次写入操作合并为一次网络往返,极大降低了客户端与协调节点之间的通信开销。

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/gocql/gocql"
)

// ScanReport 模拟从Trivy或类似工具接收的JSON报告结构
type ScanReport struct {
	ProjectID   string `json:"projectId"`
	ProjectName string `json:"projectName"`
	ReportURL   string `json:"reportUrl"`
	Results     []struct {
		Vulnerabilities []struct {
			VulnerabilityID  string `json:"vulnerabilityID"`
			PkgName          string `json:"pkgName"`
			InstalledVersion string `json:"installedVersion"`
			Severity         string `json:"severity"`
		} `json:"vulnerabilities"`
	} `json:"results"`
}

// CassandraService 封装了与数据库交互的逻辑
type CassandraService struct {
	session *gocql.Session
}

// NewCassandraService 初始化并返回一个服务实例
func NewCassandraService(hosts []string, keyspace string) (*CassandraService, error) {
	cluster := gocql.NewCluster(hosts...)
	cluster.Keyspace = keyspace
	cluster.Consistency = gocql.Quorum
	cluster.Timeout = 10 * time.Second

	// 在生产环境中,应配置重试策略、连接池大小等
	// cluster.RetryPolicy = &gocql.ExponentialBackoffRetryPolicy{NumRetries: 3}
	// cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())

	session, err := cluster.CreateSession()
	if err != nil {
		return nil, fmt.Errorf("failed to create cassandra session: %w", err)
	}
	return &CassandraService{session: session}, nil
}

// ProcessAndStoreReport 是核心处理函数
func (s *CassandraService) ProcessAndStoreReport(reportData []byte) error {
	var report ScanReport
	if err := json.Unmarshal(reportData, &report); err != nil {
		log.Printf("ERROR: Failed to unmarshal report JSON: %v", err)
		return err
	}

	projectUUID, err := gocql.ParseUUID(report.ProjectID)
	if err != nil {
		log.Printf("ERROR: Invalid project ID format: %s", report.ProjectID)
		return err
	}

	// 1. 准备数据并创建Batch
	batch := s.session.NewBatch(gocql.LoggedBatch)
	scanID := gocql.TimeUUID()
	scanTimestamp := time.Now().UTC()

	// 聚合统计信息
	var criticalCount, highCount, mediumCount, lowCount int
	vulnerabilities := make(map[string]bool) // 用于去重

	// 2. 迭代报告中的所有漏洞
	for _, result := range report.Results {
		for _, v := range result.Vulnerabilities {
			// 避免在同一个报告中重复处理同一个漏洞
			if _, exists := vulnerabilities[v.VulnerabilityID+v.PkgName]; exists {
				continue
			}
			vulnerabilities[v.VulnerabilityID+v.PkgName] = true

			switch v.Severity {
			case "CRITICAL":
				criticalCount++
			case "HIGH":
				highCount++
			// ...其他级别
			}

			// 2.1 添加到 projects_by_vulnerability 表的写入操作
			// 在真实场景中,我们可能需要先查询是否存在,来决定是INSERT还是UPDATE first_seen/last_seen
			// 但对于写入密集型场景,直接写入(UPSERT)通常性能更高
			batch.Query(`
                INSERT INTO projects_by_vulnerability (vulnerability_id, project_id, project_name, component_name, component_version, severity, last_seen_at)
                VALUES (?, ?, ?, ?, ?, ?, ?)`,
				v.VulnerabilityID, projectUUID, report.ProjectName, v.PkgName, v.InstalledVersion, v.Severity, scanTimestamp)
		}
	}

	// 3. 添加到 scans_by_project_and_time 表的写入操作
	batch.Query(`
        INSERT INTO scans_by_project_and_time (project_id, scan_id, scan_timestamp, report_url, critical_count, high_count, medium_count, low_count, raw_report)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
		projectUUID, scanID, scanTimestamp, report.ReportURL, criticalCount, highCount, mediumCount, lowCount, reportData)

	// 4. 执行Batch写入
	if err := s.session.ExecuteBatch(batch); err != nil {
		log.Printf("ERROR: Failed to execute batch write for project %s: %v", report.ProjectID, err)
		// 这里的错误处理很关键,需要有重试和死信队列机制
		return err
	}

	log.Printf("Successfully processed report for project: %s", report.ProjectName)
	return nil
}

func (s *CassandraService) Close() {
	s.session.Close()
}

这个服务的设计考虑了几个生产实践:

  • 原子性LOGGED BATCH保证了对同一个分区的多次写入是原子性的。虽然跨分区写入不是原子的,但对于我们这个场景(一个报告对应多个表的更新)已经足够。
  • 幂等性:重复处理同一份报告应该产生相同的结果,这对于构建可靠的数据管道至关重要。
  • 错误处理:日志记录了详细的错误信息,在实际系统中,失败的消息会被发送到死信队列以便后续分析和重试。

应对复杂查询:索引优化的进阶

随着平台上线,新的查询需求浮出水面:“查找所有项目中,由log4j相关组件引发的CRITICAL级别漏洞”。这个查询同时涉及component_name(需要模糊匹配)和severity两个过滤条件。在我们现有的表上,这两个字段都不是分区键或聚类键的一部分。

直接在projects_by_vulnerability表上为severity创建Cassandra的二级索引(Secondary Index)是一个诱人的选项,但也是一个常见的陷阱。Cassandra的二级索引在处理高基数(high-cardinality)字段时性能尚可,但对于像severity这样只有少数几个值的低基数字段(CRITICAL, HIGH, MEDIUM, LOW),会导致巨大的热点问题。所有CRITICAL漏洞的索引条目都会集中在少数几个节点上,查询时会给这些节点带来巨大压力。

正确的解决方案仍然是创建一张新的物化视图(Materialized View)或手动维护一张索引表。

方案:手动维护索引表vulnerabilities_by_component_and_severity

CREATE TABLE IF NOT EXISTS vulnerabilities_by_component_and_severity (
    component_prefix text,       // 分区键: 组件名的前缀, 例如 'log4j'
    severity text,               // 聚类键 1: 漏洞级别
    vulnerability_id text,       // 聚类键 2: 具体的漏洞ID
    project_id uuid,             // 聚类键 3: 关联的项目ID
    project_name text,           // 冗余数据
    component_name text,
    component_version text,
    PRIMARY KEY ((component_prefix), severity, vulnerability_id, project_id)
) WITH CLUSTERING ORDER BY (severity DESC, vulnerability_id ASC);

这个设计的精妙之处在于component_prefix分区键。我们不在摄入服务中存储完整的component_name(如log4j-core),而是提取其业务上有意义的前缀(log4j)作为分区键。这样,所有与log4j相关的漏洞,无论其子模块是什么,都会被路由到同一个分区,极大提升了查询效率。

查询语句就变成了:

SELECT project_name, component_name, vulnerability_id
FROM vulnerabilities_by_component_and_severity
WHERE component_prefix = 'log4j' AND severity = 'CRITICAL';

这个查询极其高效,因为它精确地定位到一个分区,然后在分区内部按severity进行有序扫描。

当然,这也为写入服务增加了逻辑复杂度,需要在解析组件名时提取前缀,并向这张新表写入数据。这是典型的用写入复杂性和存储成本换取读取性能的权衡,在Cassandra的世界里,这通常是正确的选择。

连接Grafana实现可视化

Cassandra本身没有官方的一流Grafana数据源插件,直接连接并不方便。最稳定、灵活的方式是构建一个轻量级的查询API服务,作为Grafana和Cassandra之间的桥梁。这个API服务将Grafana的HTTP请求翻译成高效的CQL查询。

// 在之前的 CassandraService 之上构建一个简单的HTTP服务器
// 以下为伪代码示例

func handleVulnerabilityTrend(w http.ResponseWriter, r *http.Request) {
    // 1. 从请求中解析参数,例如项目ID和时间范围
    projectID := r.URL.Query().Get("projectId")
    // ...

    // 2. 调用CassandraService执行CQL查询
    // 查询 scans_by_project_and_time 表获取时间序列数据
    query := "SELECT scan_timestamp, critical_count, high_count FROM scans_by_project_and_time WHERE project_id = ? AND scan_id > maxTimeuuid(?)"
    // ... 执行查询,处理结果 ...

    // 3. 将结果格式化为Grafana的timeseries格式
    /*
    [
        {
            "target": "critical",
            "datapoints": [ [10, 1635724800000], [12, 1635811200000] ]
        },
        {
            "target": "high",
            "datapoints": [ [30, 1635724800000], [28, 1635811200000] ]
        }
    ]
    */

    // 4. 返回JSON响应
}

在Grafana中,我们使用JSON API数据源插件,配置好这个查询API的地址。然后,就可以创建仪表盘面板,通过调用http://query-api/trends?projectId=...来动态绘制每个项目的漏洞趋势图,或者调用/api/vulnerabilities/summary?cve=...来展示受特定漏洞影响的项目列表。这套组合拳最终实现了安全态势的实时、可视化监控。

局限与未来路径

当前这套基于Cassandra的架构,在处理我们预定义的几种核心查询模式时,性能表现极为出色。然而,它的局限性也同样明显。由于高度依赖反范式和为查询定制的表结构,对于任何新的、未预料到的查询需求(即Ad-hoc查询),系统都无法有效支持。例如,如果产品经理突然想知道“使用GPL-3.0许可证且同时存在高危漏洞的Java项目”,我们现有的数据模型就束手无策了,除非再次创建新的索引表。

为了解决这个问题,未来的一个迭代方向是将Cassandra中的数据通过ETL管道定期同步到数据仓库(如ClickHouse或Snowflake)中。Cassandra继续承担高吞吐的实时写入和在线查询职责,而数据仓库则专门用于复杂的离线分析和探索性查询。此外,随着数据量的持续增长,数据生命周期管理(TTL)和冷数据归档到S3等对象存储的策略也必须提上日程,以控制存储成本。


  目录