我们SaaS平台的微前端架构一直运行良好,直到产品团队提出了一个看似简单的需求:为每个租户提供基于自然语言的智能文档检索功能。这意味着我们需要引入向量数据库。技术选型很快落定在Milvus上,但真正的挑战随之而来:在一个共享的基础设施上,如何为成百上千个租户提供毫秒级的向量检索,同时保证他们之间的数据存在“物理”级别的严格隔离?这是一个典型的多租户数据隔离问题,但在向量数据库领域,错误的决策会带来灾难性的性能或安全后果。
最初的方案讨论中,有人提出为每个租户创建一个独立的Milvus Collection。这个想法在逻辑上最简单,隔离性也最强。但在真实项目中,这意味着我们需要动态管理数千个Collection,元数据开销、连接数、以及对底层Pulsar和ETCD的压力会急剧上升。当租户数量达到一定规模时,这种方案的运维复杂度和资源成本将无法接受。
另一个方案是在一个统一的Collection中增加一个tenant_id
标量字段,并在检索时通过expr
表达式进行过滤。这在关系型数据库中是常见操作,但在Milvus这类基于ANN(近似最近邻)索引的数据库中,这样做会严重影响性能。它无法利用索引进行预过滤,常常导致全量扫描后再过滤,检索延迟会随着数据总量的增加而线性增长,完全违背了我们对性能的要求。
最终,我们决定采用Milvus的分区(Partition)机制。它允许在一个Collection内部创建逻辑隔离区,检索时可以指定在一个或多个分区内进行,从而有效利用索引,将查询范围限定在特定租户的数据上。这看起来是性能、隔离性和管理成本之间的最佳平衡点。这次复盘,就是围绕这个核心决策,记录我们如何利用JWT传递租户身份,设计服务层来操作Milvus分区,并最终通过GitHub Actions将整套服务自动化部署到Azure上的完整过程。
架构设计:请求的生命周期与隔离边界
整个系统的核心是确保每一个API请求,从进入系统到触达数据存储层,都携带着不可伪造的租户身份标识,并在每一步都强制执行隔离策略。
graph TD subgraph Browser A[Micro-frontend App] end subgraph Azure Cloud B[Azure API Management] --> C{Auth Middleware}; C -- JWT with tenant_id --> D[FastAPI Search Service]; D --> E[VectorDB Service]; E -- Scoped Operation --> F[Milvus on AKS]; end A -- API Request with Bearer Token --> B; F -- search in partition --> F; style C fill:#f9f,stroke:#333,stroke-width:2px style E fill:#ccf,stroke:#333,stroke-width:2px
- 微前端 (Micro-frontend App): 用户登录后,认证服务颁发一个JWT。这个JWT的Payload中必须包含
tenant_id
。前端应用在后续所有对后端服务的请求中,都在Authorization
头里携带此Token。 - 认证中间件 (Auth Middleware): 这是第一道防线。我们的FastAPI服务使用一个自定义的中间件来验证JWT的签名,并解码出
tenant_id
。如果Token无效或tenant_id
缺失,请求将直接被拒绝。通过验证的tenant_id
会被注入到请求的上下文状态中,供后续的业务逻辑使用。 - 向量数据库服务 (VectorDB Service): 这是一个封装了所有Milvus操作的抽象层。它的所有方法,如
insert
、search
、delete
,都必须接收tenant_id
作为强制参数。服务内部会根据tenant_id
动态地选择或创建对应的Milvus分区,确保所有数据库操作都限定在正确的租户数据范围内。
核心实现:代码中的隔离策略
理论上的架构设计需要通过严谨的代码实现来落地。这里的关键在于认证中间件和VectorDBService
的实现。
1. JWT认证与租户身份注入
我们使用FastAPI构建API服务,它的依赖注入系统非常适合传递租户上下文。
# file: app/security/auth.py
import os
from fastapi import Request, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
# 从环境变量加载安全配置,避免硬编码
ALGORITHM = os.environ.get("JWT_ALGORITHM", "HS256")
SECRET_KEY = os.environ.get("JWT_SECRET_KEY")
if not SECRET_KEY:
raise ValueError("JWT_SECRET_KEY environment variable not set.")
bearer_scheme = HTTPBearer()
class Tenant:
"""A simple container for the current request's tenant ID."""
def __init__(self, id: str):
if not id or not isinstance(id, str):
raise ValueError("Tenant ID must be a non-empty string.")
self.id = id
async def get_current_tenant(
token: HTTPAuthorizationCredentials = Depends(bearer_scheme)
) -> Tenant:
"""
Dependency to validate JWT and extract tenant_id.
This function is injected into API endpoints that require tenant context.
"""
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token.credentials, SECRET_KEY, algorithms=[ALGORITHM])
tenant_id: str = payload.get("tenant_id")
if tenant_id is None:
# 必须包含 tenant_id claim,这是多租户隔离的基石
raise credentials_exception
# 返回一个包含租户信息的对象,而不是简单的字符串
# 这样更易于扩展,例如未来可以加入用户角色等信息
return Tenant(id=tenant_id)
except jwt.PyJWTError:
raise credentials_exception
except ValueError:
# Tenant ID 格式校验失败
raise HTTPException(status_code=400, detail="Invalid tenant ID format in token.")
在API路由中,我们通过Depends(get_current_tenant)
来保护接口并获取租户信息。
# file: app/main.py
from fastapi import FastAPI, Depends, Body
from .security.auth import get_current_tenant, Tenant
from .services.vector_service import VectorDBService, SearchResult
app = FastAPI()
vector_service = VectorDBService() # 假设这是单例
@app.post("/search", response_model=list[SearchResult])
async def search_documents(
query: str = Body(..., embed=True),
current_tenant: Tenant = Depends(get_current_tenant)
):
"""
Endpoint for tenant-scoped vector search.
The `current_tenant` dependency ensures this endpoint is protected
and has access to the tenant ID.
"""
# ... 此处省略将 query 文本转换为向量的逻辑 ...
query_vector = [0.1] * 128 # 示例向量
# 将租户ID传递给服务层,服务层绝不信任任何其他来源的租户ID
results = await vector_service.search(
tenant_id=current_tenant.id,
query_vector=query_vector,
top_k=5
)
return results
2. Milvus服务层的分区封装
VectorDBService
是隔离策略的执行者。它屏蔽了Milvus的底层复杂性,并强制所有操作都与分区绑定。
# file: app/services/vector_service.py
import logging
from pymilvus import (
connections,
utility,
Collection,
CollectionSchema,
FieldSchema,
DataType
)
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class VectorDBService:
"""
A service layer that abstracts Milvus operations and enforces
tenant isolation via partitions.
"""
def __init__(self):
self.host = os.environ.get("MILVUS_HOST", "localhost")
self.port = os.environ.get("MILVUS_PORT", "19530")
self.collection_name = "tenant_documents"
self._connect()
self._ensure_collection_exists()
def _connect(self):
try:
logging.info(f"Connecting to Milvus at {self.host}:{self.port}")
connections.connect("default", host=self.host, port=self.port)
except Exception as e:
logging.error(f"Failed to connect to Milvus: {e}")
# 在生产环境中,这里应该有重试机制或更稳健的错误处理
raise
def _ensure_collection_exists(self):
"""
Creates the collection on startup if it doesn't exist.
This is an idempotent operation.
"""
if utility.has_collection(self.collection_name):
logging.info(f"Collection '{self.collection_name}' already exists.")
self.collection = Collection(self.collection_name)
self.collection.load() # 确保集合加载到内存
return
logging.info(f"Collection '{self.collection_name}' not found, creating...")
# 定义字段. 注意: id 字段必须是主键
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="doc_source_id", dtype=DataType.VARCHAR, max_length=256),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128)
]
schema = CollectionSchema(fields, "Document embeddings for all tenants")
self.collection = Collection(self.collection_name, schema)
# 创建索引是性能的关键. HNSW 是常用的高性能索引
index_params = {
"metric_type": "L2",
"index_type": "HNSW",
"params": {"M": 16, "efConstruction": 256}
}
self.collection.create_index("embedding", index_params)
logging.info("Index created on 'embedding' field.")
self.collection.load()
logging.info(f"Collection '{self.collection_name}' created and loaded.")
async def _get_or_create_partition(self, tenant_id: str):
"""
Ensures a partition exists for the given tenant ID.
This is a critical, idempotent function for tenant onboarding.
"""
partition_name = self._sanitize_partition_name(tenant_id)
if not self.collection.has_partition(partition_name):
try:
logging.info(f"Partition '{partition_name}' not found for tenant '{tenant_id}'. Creating...")
self.collection.create_partition(partition_name)
logging.info(f"Partition '{partition_name}' created.")
except Exception as e:
# 处理并发创建分区的竞争条件
if "partition already exist" in str(e):
logging.warning(f"Partition '{partition_name}' was created concurrently.")
else:
logging.error(f"Failed to create partition '{partition_name}': {e}")
raise
@staticmethod
def _sanitize_partition_name(tenant_id: str) -> str:
"""
Milvus partition names have restrictions. They must start with a letter
or underscore and contain only letters, numbers, or underscores.
"""
# 这是一个简单的示例,生产环境可能需要更复杂的规则
return f"tenant_{tenant_id.replace('-', '_')}"
async def insert(self, data: list, tenant_id: str):
"""Inserts data into a specific tenant's partition."""
partition_name = self._sanitize_partition_name(tenant_id)
await self._get_or_create_partition(tenant_id)
logging.info(f"Inserting {len(data)} entities into partition '{partition_name}'")
# insert 操作可以指定 partition_name
result = self.collection.insert(data, partition_name=partition_name)
self.collection.flush() # 异步操作,建议在插入后调用
return result
async def search(self, query_vector: list, tenant_id: str, top_k: int):
"""Searches for vectors only within the specified tenant's partition."""
partition_name = self._sanitize_partition_name(tenant_id)
# 这里的坑在于:如果分区不存在,直接搜索会报错。
# 因此,必须先确认分区存在。
if not self.collection.has_partition(partition_name):
logging.warning(f"Tenant '{tenant_id}' has no data partition '{partition_name}'. Returning empty result.")
return []
search_params = {"metric_type": "L2", "params": {"ef": 128}}
# 关键!通过 `partition_names` 参数将搜索范围限定在单个租户的分区内。
# 这是一个列表,理论上可以同时搜索多个分区。
results = self.collection.search(
data=[query_vector],
anns_field="embedding",
param=search_params,
limit=top_k,
output_fields=["doc_source_id"],
partition_names=[partition_name]
)
# 解析结果
hits = results[0]
return [
{"id": hit.id, "distance": hit.distance, "source": hit.entity.get('doc_source_id')}
for hit in hits
]
这段代码有几个在真实项目中非常关键的点:
- 幂等性操作:
_ensure_collection_exists
和_get_or_create_partition
必须是幂等的。服务可能会重启或水平扩展,这些初始化操作不能因为重复执行而出错。 - 分区名限制: Milvus对分区名有字符限制。
_sanitize_partition_name
负责处理这个问题,防止因非法的租户ID导致创建分区失败。 - 显式指定分区: 无论是
insert
还是search
,都必须显式地使用partition_name
或partition_names
参数。这是实现隔离的核心。忘记这个参数将导致数据插入到默认分区(_default
)或在所有分区中搜索,造成数据泄露。 - 错误处理: 代码中包含了对连接失败、分区并发创建等情况的日志记录和基本处理。生产代码需要更完善的重试和告警逻辑。
自动化部署:GitHub Actions与Azure的协同
手动部署是不可靠且低效的。我们使用GitHub Actions来自动化构建、测试和部署流程,目标是每次代码推送到main
分支时,都能自动将更新后的服务部署到Azure App Service。
我们的CI/CD流水线定义如下:
# file: .github/workflows/deploy-to-azure.yml
name: Deploy Vector Search Service to Azure
on:
push:
branches:
- main
workflow_dispatch:
env:
AZURE_APP_SERVICE_NAME: "vector-search-prod"
AZURE_RESOURCE_GROUP: "saas-prod-rg"
AZURE_CONTAINER_REGISTRY: "saasprodacr"
IMAGE_NAME: "vector-search-service"
jobs:
build-and-deploy:
runs-on: ubuntu-latest
steps:
- name: 'Checkout GitHub Action'
uses: actions/checkout@v3
- name: 'Login to Azure'
uses: azure/login@v1
with:
creds: ${{ secrets.AZURE_CREDENTIALS }}
- name
: 'Login to Azure Container Registry'
uses: azure/docker-login@v1
with:
login-server: ${{ env.AZURE_CONTAINER_REGISTRY }}.azurecr.io
username: ${{ secrets.ACR_USERNAME }}
password: ${{ secrets.ACR_PASSWORD }}
- name: 'Build and push Docker image'
run: |
docker build . -t ${{ env.AZURE_CONTAINER_REGISTRY }}.azurecr.io/${{ env.IMAGE_NAME }}:${{ github.sha }}
docker push ${{ env.AZURE_CONTAINER_REGISTRY }}.azurecr.io/${{ env.IMAGE_NAME }}:${{ github.sha }}
- name: 'Deploy to Azure App Service'
uses: azure/webapps-deploy@v2
with:
app-name: ${{ env.AZURE_APP_SERVICE_NAME }}
resource-group-name: ${{ env.AZURE_RESOURCE_GROUP }}
images: '${{ env.AZURE_CONTAINER_REGISTRY }}.azurecr.io/${{ env.IMAGE_NAME }}:${{ github.sha }}'
- name: 'Logout from Azure'
run: |
az logout
az cache purge
az account clear
这个工作流做了几件关键事情:
- 认证: 使用
secrets.AZURE_CREDENTIALS
安全地登录到Azure。这是一个通过Azure CLI创建的服务主体凭据,存储在GitHub仓库的Secrets中。 - 构建与推送: 它构建一个Docker镜像,并用Git commit SHA作为标签,保证了镜像的唯一性和可追溯性。然后将镜像推送到Azure Container Registry (ACR)。
- 部署: 最后,
azure/webapps-deploy
action会通知Azure App Service拉取ACR中的新镜像并进行部署,实现了零停机更新。 - 配置管理: JWT密钥、Milvus地址等敏感配置,我们没有写在代码或Dockerfile里,而是在Azure App Service的”Configuration” -> “Application settings”中进行配置。App Service会自动将这些设置作为环境变量注入到容器中,我们的Python代码通过
os.environ.get()
来读取。
方案的局限性与未来迭代
尽管基于分区的多租户方案在当前阶段满足了我们的需求,但它并非没有局限。
首先,Milvus对单个Collection中的分区数量有限制。虽然新版本已经大幅提高了这个上限(理论上可达65536个),但对于需要支持数十万甚至更多租户的超大规模SaaS平台来说,这依然是一个潜在的瓶颈。当租户规模达到这个量级时,我们可能不得不重新审视“每个租户一个Collection”的方案,并围绕它构建一套复杂的自动化管理系统,例如开发一个Kubernetes Operator来管理成千上万个Collection的生命周期。
其次,资源争抢问题依然存在。虽然数据是隔离的,但所有租户共享同一个Collection的计算和内存资源。如果某个“超级租户”的数据量或查询量远超其他租户,可能会影响到整个集群的性能稳定性。未来的一个优化方向是实现基于租户的资源配额与监控,甚至考虑将不同等级的租户路由到物理上不同的Milvus集群,但这会显著增加架构的复杂性。
最后,当前的方案在租户数据备份和恢复方面粒度较粗。虽然可以备份整个Milvus实例,但要实现单个租户数据的独立、快速恢复则比较困难。这可能需要借助Milvus的数据导入导出工具,并结合外部元数据管理来实现更精细化的操作。