CI流水线每天产出上万份依赖扫描报告,最初采用的对象存储加日志检索的方案很快就成了瓶颈。当安全团队提出两个核心查询需求时,现有架构彻底无法支撑:“哪些项目正受新爆出的CVE-2023-XXXX
漏洞影响?”以及“给我展示Platform
团队过去半年的安全态势演进曲线”。这两个需求,一个要求对全量组件进行快速、精确的反向查找,另一个则需要对时间序列数据进行聚合分析。这两种查询模式对于传统关系型数据库或者简单的日志系统来说,都是一场灾难。
问题的本质是数据模型的矛盾。依赖扫描数据天然具备时间序列属性(每次构建都是一个时间点快照),同时又需要支持高度灵活的切片查询(按漏洞、按组件、按许可证、按项目)。在评估了Elasticsearch等方案后,考虑到其运维复杂性和我们场景下相对固定的查询模式,最终将技术栈锁定在了Apache Cassandra。选择它的理由很明确:优异的写吞吐能力、线性的横向扩展性、以及为特定查询模式设计数据模型的能力。这恰好与我们“写入量远大于读取量,且读取模式固定”的场景相契合。
第一步:失败的数据模型与重构
任何Cassandra的实践都始于数据建模。一个常见的错误是,工程师会带着关系型数据库的设计思路来构建表,这注定会失败。我们最初的设计就是一个典型反例:
// 反模式:一个大而全的表,试图满足所有查询
CREATE TABLE IF NOT EXISTS security_scans_bad (
project_id uuid,
scan_timestamp timestamp,
component_name text,
component_version text,
cve_id text,
severity text,
license text,
// ... 其他几十个字段
PRIMARY KEY ((project_id), scan_timestamp, component_name)
) WITH CLUSTERING ORDER BY (scan_timestamp DESC, component_name ASC);
这个模型的PRIMARY KEY
定义,以project_id
为分区键,scan_timestamp
和component_name
为聚类键。它能很好地回答“A项目某次扫描发现了哪些组件?”这类问题。但对于我们的核心需求——“哪些项目使用了log4j-core-2.17.0
?”——它完全无能为力。执行这类查询需要对整个集群进行全表扫描,这在生产环境中是绝对禁止的。
Cassandra的设计哲学是“查询驱动建模”,即先定义你的查询语句(CQL),再反向设计能够最高效服务这些查询的表结构。这意味着数据冗余不再是罪恶,而是特性。根据我们的核心需求,至少需要两张表:
- 一张表用于查询特定项目的漏洞历史。
- 另一张表用于反向查询受特定漏洞影响的所有项目。
graph TD A[CI Pipeline] -- Scan Report (JSON) --> B(Ingestion Service); B -- Writes (Batch) --> C{Cassandra Cluster}; C --> D[scans_by_project_and_time]; C --> E[projects_by_vulnerability]; G(Query API) -- CQL --> C; H[Grafana Dashboard] -- HTTP Request --> G;
基于此,我们重新设计了表结构:
表1:scans_by_project_and_time
这张表的核心目标是获取某个项目的安全扫描历史,并按时间倒序排列。
-- 专为“查询项目历史”而设计的表
CREATE TABLE IF NOT EXISTS scans_by_project_and_time (
project_id uuid, // 分区键:定位到具体项目的数据分区
scan_id timeuuid, // 聚类键:保证扫描的唯一性,并按时间排序
scan_timestamp timestamp, // 扫描发生时间,用于前端展示
report_url text, // 指向原始报告的链接
critical_count int, // 聚合指标,方便快速预览
high_count int,
medium_count int,
low_count int,
licenses set<text>, // 项目使用的所有许可证集合
raw_report blob, // 存储原始JSON报告,用于追溯
PRIMARY KEY (project_id, scan_id)
) WITH CLUSTERING ORDER BY (scan_id DESC);
-- `scan_id` 使用 timeuuid 类型,它天然包含了时间戳信息并能保证唯一性,作为聚类键可以实现高效的时间范围查询。
-- 聚合指标(critical_count等)的预计算是关键优化,避免了在查询时再对原始报告进行解析和计算。
表2:projects_by_vulnerability
这张表用于快速响应“哪个项目受此漏洞影响”的核心安全应急需求。
-- 专为“按漏洞反查项目”而设计的表
CREATE TABLE IF NOT EXISTS projects_by_vulnerability (
vulnerability_id text, // 分区键:例如 CVE-2021-44228
project_id uuid, // 聚类键:受影响的项目
project_name text, // 冗余字段,避免二次查询
component_name text, // 造成漏洞的组件
component_version text, // 组件版本
severity text, // 漏洞级别
first_seen_at timestamp, // 首次发现时间
last_seen_at timestamp, // 最近一次扫描仍然存在的时间
PRIMARY KEY (vulnerability_id, project_id, component_name)
);
-- 这里的关键是将 `vulnerability_id` 作为分区键。
-- 当一个新的高危漏洞爆发时,我们可以通过 `SELECT project_name FROM projects_by_vulnerability WHERE vulnerability_id = 'CVE-2021-44228'`
-- 在毫秒级别内拉出全公司所有受影响的项目列表,这是之前架构完全无法做到的。
构建高吞吐数据摄入服务
模型设计完成后,下一步是构建一个能处理海量扫描报告并将其写入Cassandra的服务。我们选择使用Go语言,因为它出色的并发性能和静态类型系统非常适合构建这类中间件。
核心代码段展示了如何解析报告并以批处理方式写入上述两张表。在真实项目中,批处理(Batching)至关重要,它能将多次写入操作合并为一次网络往返,极大降低了客户端与协调节点之间的通信开销。
package main
import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/gocql/gocql"
)
// ScanReport 模拟从Trivy或类似工具接收的JSON报告结构
type ScanReport struct {
ProjectID string `json:"projectId"`
ProjectName string `json:"projectName"`
ReportURL string `json:"reportUrl"`
Results []struct {
Vulnerabilities []struct {
VulnerabilityID string `json:"vulnerabilityID"`
PkgName string `json:"pkgName"`
InstalledVersion string `json:"installedVersion"`
Severity string `json:"severity"`
} `json:"vulnerabilities"`
} `json:"results"`
}
// CassandraService 封装了与数据库交互的逻辑
type CassandraService struct {
session *gocql.Session
}
// NewCassandraService 初始化并返回一个服务实例
func NewCassandraService(hosts []string, keyspace string) (*CassandraService, error) {
cluster := gocql.NewCluster(hosts...)
cluster.Keyspace = keyspace
cluster.Consistency = gocql.Quorum
cluster.Timeout = 10 * time.Second
// 在生产环境中,应配置重试策略、连接池大小等
// cluster.RetryPolicy = &gocql.ExponentialBackoffRetryPolicy{NumRetries: 3}
// cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
session, err := cluster.CreateSession()
if err != nil {
return nil, fmt.Errorf("failed to create cassandra session: %w", err)
}
return &CassandraService{session: session}, nil
}
// ProcessAndStoreReport 是核心处理函数
func (s *CassandraService) ProcessAndStoreReport(reportData []byte) error {
var report ScanReport
if err := json.Unmarshal(reportData, &report); err != nil {
log.Printf("ERROR: Failed to unmarshal report JSON: %v", err)
return err
}
projectUUID, err := gocql.ParseUUID(report.ProjectID)
if err != nil {
log.Printf("ERROR: Invalid project ID format: %s", report.ProjectID)
return err
}
// 1. 准备数据并创建Batch
batch := s.session.NewBatch(gocql.LoggedBatch)
scanID := gocql.TimeUUID()
scanTimestamp := time.Now().UTC()
// 聚合统计信息
var criticalCount, highCount, mediumCount, lowCount int
vulnerabilities := make(map[string]bool) // 用于去重
// 2. 迭代报告中的所有漏洞
for _, result := range report.Results {
for _, v := range result.Vulnerabilities {
// 避免在同一个报告中重复处理同一个漏洞
if _, exists := vulnerabilities[v.VulnerabilityID+v.PkgName]; exists {
continue
}
vulnerabilities[v.VulnerabilityID+v.PkgName] = true
switch v.Severity {
case "CRITICAL":
criticalCount++
case "HIGH":
highCount++
// ...其他级别
}
// 2.1 添加到 projects_by_vulnerability 表的写入操作
// 在真实场景中,我们可能需要先查询是否存在,来决定是INSERT还是UPDATE first_seen/last_seen
// 但对于写入密集型场景,直接写入(UPSERT)通常性能更高
batch.Query(`
INSERT INTO projects_by_vulnerability (vulnerability_id, project_id, project_name, component_name, component_version, severity, last_seen_at)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
v.VulnerabilityID, projectUUID, report.ProjectName, v.PkgName, v.InstalledVersion, v.Severity, scanTimestamp)
}
}
// 3. 添加到 scans_by_project_and_time 表的写入操作
batch.Query(`
INSERT INTO scans_by_project_and_time (project_id, scan_id, scan_timestamp, report_url, critical_count, high_count, medium_count, low_count, raw_report)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
projectUUID, scanID, scanTimestamp, report.ReportURL, criticalCount, highCount, mediumCount, lowCount, reportData)
// 4. 执行Batch写入
if err := s.session.ExecuteBatch(batch); err != nil {
log.Printf("ERROR: Failed to execute batch write for project %s: %v", report.ProjectID, err)
// 这里的错误处理很关键,需要有重试和死信队列机制
return err
}
log.Printf("Successfully processed report for project: %s", report.ProjectName)
return nil
}
func (s *CassandraService) Close() {
s.session.Close()
}
这个服务的设计考虑了几个生产实践:
- 原子性:
LOGGED BATCH
保证了对同一个分区的多次写入是原子性的。虽然跨分区写入不是原子的,但对于我们这个场景(一个报告对应多个表的更新)已经足够。 - 幂等性:重复处理同一份报告应该产生相同的结果,这对于构建可靠的数据管道至关重要。
- 错误处理:日志记录了详细的错误信息,在实际系统中,失败的消息会被发送到死信队列以便后续分析和重试。
应对复杂查询:索引优化的进阶
随着平台上线,新的查询需求浮出水面:“查找所有项目中,由log4j
相关组件引发的CRITICAL
级别漏洞”。这个查询同时涉及component_name
(需要模糊匹配)和severity
两个过滤条件。在我们现有的表上,这两个字段都不是分区键或聚类键的一部分。
直接在projects_by_vulnerability
表上为severity
创建Cassandra的二级索引(Secondary Index)是一个诱人的选项,但也是一个常见的陷阱。Cassandra的二级索引在处理高基数(high-cardinality)字段时性能尚可,但对于像severity
这样只有少数几个值的低基数字段(CRITICAL
, HIGH
, MEDIUM
, LOW
),会导致巨大的热点问题。所有CRITICAL
漏洞的索引条目都会集中在少数几个节点上,查询时会给这些节点带来巨大压力。
正确的解决方案仍然是创建一张新的物化视图(Materialized View)或手动维护一张索引表。
方案:手动维护索引表vulnerabilities_by_component_and_severity
CREATE TABLE IF NOT EXISTS vulnerabilities_by_component_and_severity (
component_prefix text, // 分区键: 组件名的前缀, 例如 'log4j'
severity text, // 聚类键 1: 漏洞级别
vulnerability_id text, // 聚类键 2: 具体的漏洞ID
project_id uuid, // 聚类键 3: 关联的项目ID
project_name text, // 冗余数据
component_name text,
component_version text,
PRIMARY KEY ((component_prefix), severity, vulnerability_id, project_id)
) WITH CLUSTERING ORDER BY (severity DESC, vulnerability_id ASC);
这个设计的精妙之处在于component_prefix
分区键。我们不在摄入服务中存储完整的component_name
(如log4j-core
),而是提取其业务上有意义的前缀(log4j
)作为分区键。这样,所有与log4j
相关的漏洞,无论其子模块是什么,都会被路由到同一个分区,极大提升了查询效率。
查询语句就变成了:
SELECT project_name, component_name, vulnerability_id
FROM vulnerabilities_by_component_and_severity
WHERE component_prefix = 'log4j' AND severity = 'CRITICAL';
这个查询极其高效,因为它精确地定位到一个分区,然后在分区内部按severity
进行有序扫描。
当然,这也为写入服务增加了逻辑复杂度,需要在解析组件名时提取前缀,并向这张新表写入数据。这是典型的用写入复杂性和存储成本换取读取性能的权衡,在Cassandra的世界里,这通常是正确的选择。
连接Grafana实现可视化
Cassandra本身没有官方的一流Grafana数据源插件,直接连接并不方便。最稳定、灵活的方式是构建一个轻量级的查询API服务,作为Grafana和Cassandra之间的桥梁。这个API服务将Grafana的HTTP请求翻译成高效的CQL查询。
// 在之前的 CassandraService 之上构建一个简单的HTTP服务器
// 以下为伪代码示例
func handleVulnerabilityTrend(w http.ResponseWriter, r *http.Request) {
// 1. 从请求中解析参数,例如项目ID和时间范围
projectID := r.URL.Query().Get("projectId")
// ...
// 2. 调用CassandraService执行CQL查询
// 查询 scans_by_project_and_time 表获取时间序列数据
query := "SELECT scan_timestamp, critical_count, high_count FROM scans_by_project_and_time WHERE project_id = ? AND scan_id > maxTimeuuid(?)"
// ... 执行查询,处理结果 ...
// 3. 将结果格式化为Grafana的timeseries格式
/*
[
{
"target": "critical",
"datapoints": [ [10, 1635724800000], [12, 1635811200000] ]
},
{
"target": "high",
"datapoints": [ [30, 1635724800000], [28, 1635811200000] ]
}
]
*/
// 4. 返回JSON响应
}
在Grafana中,我们使用JSON API
数据源插件,配置好这个查询API的地址。然后,就可以创建仪表盘面板,通过调用http://query-api/trends?projectId=...
来动态绘制每个项目的漏洞趋势图,或者调用/api/vulnerabilities/summary?cve=...
来展示受特定漏洞影响的项目列表。这套组合拳最终实现了安全态势的实时、可视化监控。
局限与未来路径
当前这套基于Cassandra的架构,在处理我们预定义的几种核心查询模式时,性能表现极为出色。然而,它的局限性也同样明显。由于高度依赖反范式和为查询定制的表结构,对于任何新的、未预料到的查询需求(即Ad-hoc查询),系统都无法有效支持。例如,如果产品经理突然想知道“使用GPL-3.0许可证且同时存在高危漏洞的Java项目”,我们现有的数据模型就束手无策了,除非再次创建新的索引表。
为了解决这个问题,未来的一个迭代方向是将Cassandra中的数据通过ETL管道定期同步到数据仓库(如ClickHouse或Snowflake)中。Cassandra继续承担高吞吐的实时写入和在线查询职责,而数据仓库则专门用于复杂的离线分析和探索性查询。此外,随着数据量的持续增长,数据生命周期管理(TTL)和冷数据归档到S3等对象存储的策略也必须提上日程,以控制存储成本。