构建微前端架构下的多租户向量检索服务 JWT身份、Milvus分区与GitHub Actions自动化部署的实践复盘


我们SaaS平台的微前端架构一直运行良好,直到产品团队提出了一个看似简单的需求:为每个租户提供基于自然语言的智能文档检索功能。这意味着我们需要引入向量数据库。技术选型很快落定在Milvus上,但真正的挑战随之而来:在一个共享的基础设施上,如何为成百上千个租户提供毫秒级的向量检索,同时保证他们之间的数据存在“物理”级别的严格隔离?这是一个典型的多租户数据隔离问题,但在向量数据库领域,错误的决策会带来灾难性的性能或安全后果。

最初的方案讨论中,有人提出为每个租户创建一个独立的Milvus Collection。这个想法在逻辑上最简单,隔离性也最强。但在真实项目中,这意味着我们需要动态管理数千个Collection,元数据开销、连接数、以及对底层Pulsar和ETCD的压力会急剧上升。当租户数量达到一定规模时,这种方案的运维复杂度和资源成本将无法接受。

另一个方案是在一个统一的Collection中增加一个tenant_id标量字段,并在检索时通过expr表达式进行过滤。这在关系型数据库中是常见操作,但在Milvus这类基于ANN(近似最近邻)索引的数据库中,这样做会严重影响性能。它无法利用索引进行预过滤,常常导致全量扫描后再过滤,检索延迟会随着数据总量的增加而线性增长,完全违背了我们对性能的要求。

最终,我们决定采用Milvus的分区(Partition)机制。它允许在一个Collection内部创建逻辑隔离区,检索时可以指定在一个或多个分区内进行,从而有效利用索引,将查询范围限定在特定租户的数据上。这看起来是性能、隔离性和管理成本之间的最佳平衡点。这次复盘,就是围绕这个核心决策,记录我们如何利用JWT传递租户身份,设计服务层来操作Milvus分区,并最终通过GitHub Actions将整套服务自动化部署到Azure上的完整过程。

架构设计:请求的生命周期与隔离边界

整个系统的核心是确保每一个API请求,从进入系统到触达数据存储层,都携带着不可伪造的租户身份标识,并在每一步都强制执行隔离策略。

graph TD
    subgraph Browser
        A[Micro-frontend App]
    end

    subgraph Azure Cloud
        B[Azure API Management] --> C{Auth Middleware};
        C -- JWT with tenant_id --> D[FastAPI Search Service];
        D --> E[VectorDB Service];
        E -- Scoped Operation --> F[Milvus on AKS];
    end

    A -- API Request with Bearer Token --> B;
    F -- search in partition --> F;

    style C fill:#f9f,stroke:#333,stroke-width:2px
    style E fill:#ccf,stroke:#333,stroke-width:2px
  1. 微前端 (Micro-frontend App): 用户登录后,认证服务颁发一个JWT。这个JWT的Payload中必须包含tenant_id。前端应用在后续所有对后端服务的请求中,都在Authorization头里携带此Token。
  2. 认证中间件 (Auth Middleware): 这是第一道防线。我们的FastAPI服务使用一个自定义的中间件来验证JWT的签名,并解码出tenant_id。如果Token无效或tenant_id缺失,请求将直接被拒绝。通过验证的tenant_id会被注入到请求的上下文状态中,供后续的业务逻辑使用。
  3. 向量数据库服务 (VectorDB Service): 这是一个封装了所有Milvus操作的抽象层。它的所有方法,如insertsearchdelete,都必须接收tenant_id作为强制参数。服务内部会根据tenant_id动态地选择或创建对应的Milvus分区,确保所有数据库操作都限定在正确的租户数据范围内。

核心实现:代码中的隔离策略

理论上的架构设计需要通过严谨的代码实现来落地。这里的关键在于认证中间件和VectorDBService的实现。

1. JWT认证与租户身份注入

我们使用FastAPI构建API服务,它的依赖注入系统非常适合传递租户上下文。

# file: app/security/auth.py

import os
from fastapi import Request, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt

# 从环境变量加载安全配置,避免硬编码
ALGORITHM = os.environ.get("JWT_ALGORITHM", "HS256")
SECRET_KEY = os.environ.get("JWT_SECRET_KEY")

if not SECRET_KEY:
    raise ValueError("JWT_SECRET_KEY environment variable not set.")

bearer_scheme = HTTPBearer()

class Tenant:
    """A simple container for the current request's tenant ID."""
    def __init__(self, id: str):
        if not id or not isinstance(id, str):
            raise ValueError("Tenant ID must be a non-empty string.")
        self.id = id

async def get_current_tenant(
    token: HTTPAuthorizationCredentials = Depends(bearer_scheme)
) -> Tenant:
    """
    Dependency to validate JWT and extract tenant_id.
    This function is injected into API endpoints that require tenant context.
    """
    credentials_exception = HTTPException(
        status_code=401,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token.credentials, SECRET_KEY, algorithms=[ALGORITHM])
        tenant_id: str = payload.get("tenant_id")
        if tenant_id is None:
            # 必须包含 tenant_id claim,这是多租户隔离的基石
            raise credentials_exception
        
        # 返回一个包含租户信息的对象,而不是简单的字符串
        # 这样更易于扩展,例如未来可以加入用户角色等信息
        return Tenant(id=tenant_id)
    except jwt.PyJWTError:
        raise credentials_exception
    except ValueError:
        # Tenant ID 格式校验失败
        raise HTTPException(status_code=400, detail="Invalid tenant ID format in token.")

在API路由中,我们通过Depends(get_current_tenant)来保护接口并获取租户信息。

# file: app/main.py

from fastapi import FastAPI, Depends, Body
from .security.auth import get_current_tenant, Tenant
from .services.vector_service import VectorDBService, SearchResult

app = FastAPI()
vector_service = VectorDBService() # 假设这是单例

@app.post("/search", response_model=list[SearchResult])
async def search_documents(
    query: str = Body(..., embed=True),
    current_tenant: Tenant = Depends(get_current_tenant)
):
    """
    Endpoint for tenant-scoped vector search.
    The `current_tenant` dependency ensures this endpoint is protected
    and has access to the tenant ID.
    """
    # ... 此处省略将 query 文本转换为向量的逻辑 ...
    query_vector = [0.1] * 128 # 示例向量

    # 将租户ID传递给服务层,服务层绝不信任任何其他来源的租户ID
    results = await vector_service.search(
        tenant_id=current_tenant.id,
        query_vector=query_vector,
        top_k=5
    )
    return results

2. Milvus服务层的分区封装

VectorDBService是隔离策略的执行者。它屏蔽了Milvus的底层复杂性,并强制所有操作都与分区绑定。

# file: app/services/vector_service.py

import logging
from pymilvus import (
    connections,
    utility,
    Collection,
    CollectionSchema,
    FieldSchema,
    DataType
)

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class VectorDBService:
    """
    A service layer that abstracts Milvus operations and enforces
    tenant isolation via partitions.
    """
    def __init__(self):
        self.host = os.environ.get("MILVUS_HOST", "localhost")
        self.port = os.environ.get("MILVUS_PORT", "19530")
        self.collection_name = "tenant_documents"
        self._connect()
        self._ensure_collection_exists()

    def _connect(self):
        try:
            logging.info(f"Connecting to Milvus at {self.host}:{self.port}")
            connections.connect("default", host=self.host, port=self.port)
        except Exception as e:
            logging.error(f"Failed to connect to Milvus: {e}")
            # 在生产环境中,这里应该有重试机制或更稳健的错误处理
            raise

    def _ensure_collection_exists(self):
        """
        Creates the collection on startup if it doesn't exist.
        This is an idempotent operation.
        """
        if utility.has_collection(self.collection_name):
            logging.info(f"Collection '{self.collection_name}' already exists.")
            self.collection = Collection(self.collection_name)
            self.collection.load() # 确保集合加载到内存
            return

        logging.info(f"Collection '{self.collection_name}' not found, creating...")
        # 定义字段. 注意: id 字段必须是主键
        fields = [
            FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
            FieldSchema(name="doc_source_id", dtype=DataType.VARCHAR, max_length=256),
            FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128)
        ]
        schema = CollectionSchema(fields, "Document embeddings for all tenants")
        self.collection = Collection(self.collection_name, schema)

        # 创建索引是性能的关键. HNSW 是常用的高性能索引
        index_params = {
            "metric_type": "L2",
            "index_type": "HNSW",
            "params": {"M": 16, "efConstruction": 256}
        }
        self.collection.create_index("embedding", index_params)
        logging.info("Index created on 'embedding' field.")
        self.collection.load()
        logging.info(f"Collection '{self.collection_name}' created and loaded.")

    async def _get_or_create_partition(self, tenant_id: str):
        """
        Ensures a partition exists for the given tenant ID.
        This is a critical, idempotent function for tenant onboarding.
        """
        partition_name = self._sanitize_partition_name(tenant_id)
        if not self.collection.has_partition(partition_name):
            try:
                logging.info(f"Partition '{partition_name}' not found for tenant '{tenant_id}'. Creating...")
                self.collection.create_partition(partition_name)
                logging.info(f"Partition '{partition_name}' created.")
            except Exception as e:
                # 处理并发创建分区的竞争条件
                if "partition already exist" in str(e):
                    logging.warning(f"Partition '{partition_name}' was created concurrently.")
                else:
                    logging.error(f"Failed to create partition '{partition_name}': {e}")
                    raise
    
    @staticmethod
    def _sanitize_partition_name(tenant_id: str) -> str:
        """
        Milvus partition names have restrictions. They must start with a letter
        or underscore and contain only letters, numbers, or underscores.
        """
        # 这是一个简单的示例,生产环境可能需要更复杂的规则
        return f"tenant_{tenant_id.replace('-', '_')}"

    async def insert(self, data: list, tenant_id: str):
        """Inserts data into a specific tenant's partition."""
        partition_name = self._sanitize_partition_name(tenant_id)
        await self._get_or_create_partition(tenant_id)
        
        logging.info(f"Inserting {len(data)} entities into partition '{partition_name}'")
        # insert 操作可以指定 partition_name
        result = self.collection.insert(data, partition_name=partition_name)
        self.collection.flush() # 异步操作,建议在插入后调用
        return result

    async def search(self, query_vector: list, tenant_id: str, top_k: int):
        """Searches for vectors only within the specified tenant's partition."""
        partition_name = self._sanitize_partition_name(tenant_id)
        
        # 这里的坑在于:如果分区不存在,直接搜索会报错。
        # 因此,必须先确认分区存在。
        if not self.collection.has_partition(partition_name):
            logging.warning(f"Tenant '{tenant_id}' has no data partition '{partition_name}'. Returning empty result.")
            return []

        search_params = {"metric_type": "L2", "params": {"ef": 128}}
        
        # 关键!通过 `partition_names` 参数将搜索范围限定在单个租户的分区内。
        # 这是一个列表,理论上可以同时搜索多个分区。
        results = self.collection.search(
            data=[query_vector],
            anns_field="embedding",
            param=search_params,
            limit=top_k,
            output_fields=["doc_source_id"],
            partition_names=[partition_name] 
        )
        
        # 解析结果
        hits = results[0]
        return [
            {"id": hit.id, "distance": hit.distance, "source": hit.entity.get('doc_source_id')}
            for hit in hits
        ]

这段代码有几个在真实项目中非常关键的点:

  1. 幂等性操作: _ensure_collection_exists_get_or_create_partition 必须是幂等的。服务可能会重启或水平扩展,这些初始化操作不能因为重复执行而出错。
  2. 分区名限制: Milvus对分区名有字符限制。_sanitize_partition_name 负责处理这个问题,防止因非法的租户ID导致创建分区失败。
  3. 显式指定分区: 无论是insert还是search,都必须显式地使用partition_namepartition_names参数。这是实现隔离的核心。忘记这个参数将导致数据插入到默认分区(_default)或在所有分区中搜索,造成数据泄露。
  4. 错误处理: 代码中包含了对连接失败、分区并发创建等情况的日志记录和基本处理。生产代码需要更完善的重试和告警逻辑。

自动化部署:GitHub Actions与Azure的协同

手动部署是不可靠且低效的。我们使用GitHub Actions来自动化构建、测试和部署流程,目标是每次代码推送到main分支时,都能自动将更新后的服务部署到Azure App Service。

我们的CI/CD流水线定义如下:

# file: .github/workflows/deploy-to-azure.yml

name: Deploy Vector Search Service to Azure

on:
  push:
    branches:
      - main
  workflow_dispatch:

env:
  AZURE_APP_SERVICE_NAME: "vector-search-prod"
  AZURE_RESOURCE_GROUP: "saas-prod-rg"
  AZURE_CONTAINER_REGISTRY: "saasprodacr"
  IMAGE_NAME: "vector-search-service"

jobs:
  build-and-deploy:
    runs-on: ubuntu-latest
    
    steps:
    - name: 'Checkout GitHub Action'
      uses: actions/checkout@v3

    - name: 'Login to Azure'
      uses: azure/login@v1
      with:
        creds: ${{ secrets.AZURE_CREDENTIALS }}

    - name
      : 'Login to Azure Container Registry'
      uses: azure/docker-login@v1
      with:
        login-server: ${{ env.AZURE_CONTAINER_REGISTRY }}.azurecr.io
        username: ${{ secrets.ACR_USERNAME }}
        password: ${{ secrets.ACR_PASSWORD }}

    - name: 'Build and push Docker image'
      run: |
        docker build . -t ${{ env.AZURE_CONTAINER_REGISTRY }}.azurecr.io/${{ env.IMAGE_NAME }}:${{ github.sha }}
        docker push ${{ env.AZURE_CONTAINER_REGISTRY }}.azurecr.io/${{ env.IMAGE_NAME }}:${{ github.sha }}

    - name: 'Deploy to Azure App Service'
      uses: azure/webapps-deploy@v2
      with:
        app-name: ${{ env.AZURE_APP_SERVICE_NAME }}
        resource-group-name: ${{ env.AZURE_RESOURCE_GROUP }}
        images: '${{ env.AZURE_CONTAINER_REGISTRY }}.azurecr.io/${{ env.IMAGE_NAME }}:${{ github.sha }}'

    - name: 'Logout from Azure'
      run: |
        az logout
        az cache purge
        az account clear

这个工作流做了几件关键事情:

  1. 认证: 使用secrets.AZURE_CREDENTIALS安全地登录到Azure。这是一个通过Azure CLI创建的服务主体凭据,存储在GitHub仓库的Secrets中。
  2. 构建与推送: 它构建一个Docker镜像,并用Git commit SHA作为标签,保证了镜像的唯一性和可追溯性。然后将镜像推送到Azure Container Registry (ACR)。
  3. 部署: 最后,azure/webapps-deploy action会通知Azure App Service拉取ACR中的新镜像并进行部署,实现了零停机更新。
  4. 配置管理: JWT密钥、Milvus地址等敏感配置,我们没有写在代码或Dockerfile里,而是在Azure App Service的”Configuration” -> “Application settings”中进行配置。App Service会自动将这些设置作为环境变量注入到容器中,我们的Python代码通过os.environ.get()来读取。

方案的局限性与未来迭代

尽管基于分区的多租户方案在当前阶段满足了我们的需求,但它并非没有局限。

首先,Milvus对单个Collection中的分区数量有限制。虽然新版本已经大幅提高了这个上限(理论上可达65536个),但对于需要支持数十万甚至更多租户的超大规模SaaS平台来说,这依然是一个潜在的瓶颈。当租户规模达到这个量级时,我们可能不得不重新审视“每个租户一个Collection”的方案,并围绕它构建一套复杂的自动化管理系统,例如开发一个Kubernetes Operator来管理成千上万个Collection的生命周期。

其次,资源争抢问题依然存在。虽然数据是隔离的,但所有租户共享同一个Collection的计算和内存资源。如果某个“超级租户”的数据量或查询量远超其他租户,可能会影响到整个集群的性能稳定性。未来的一个优化方向是实现基于租户的资源配额与监控,甚至考虑将不同等级的租户路由到物理上不同的Milvus集群,但这会显著增加架构的复杂性。

最后,当前的方案在租户数据备份和恢复方面粒度较粗。虽然可以备份整个Milvus实例,但要实现单个租户数据的独立、快速恢复则比较困难。这可能需要借助Milvus的数据导入导出工具,并结合外部元数据管理来实现更精细化的操作。


  目录