基于 etcd 实现动态配置驱动的多租户向量检索系统架构


我们面临的第一个问题不是技术选型,而是运营模式。构建一个SaaS向量检索平台,核心需求是租户的自动化开通与隔离。如果每新增一个租户,都需要工程师手动修改Nginx配置、重启服务、创建数据库表、部署定制化的前端主题,那么这个项目在第一天就已经失败了。我们需要的是一个能够响应配置变化的“活”系统,一个由单一事实源驱动,能在数秒内完成租户从创建到上线的全链路自动化流程。

这个单一事实源,我们选择了 etcd。它不仅仅是一个键值存储,其提供的 watch 机制是我们整个动态架构的基石。所有关于租户的定义——域名、API凭证、UI主题色、对应的 Weaviate 数据类——都将以结构化的方式存储在 etcd 中。系统的其他部分则作为订阅者,实时响应这些数据的变化。

graph TD
    A[运维/API] -- etcdctl put --> B(etcd Cluster);
    B -- Watch Event --> C{Go Control Plane};
    C -- POST /load --> D[Caddy Admin API];
    C -- Tenant Config API --> F[Vite Frontend];
    C -- Schema Mgmt --> E[Weaviate];
    G[User Request] --> H(Caddy Reverse Proxy);
    H -- tenant-a.domain.com --> F;
    F -- GraphQL Query --> E;

上图是我们设计的核心流程。所有变更始于 etcd,一个Go编写的控制平面服务负责监听这些变更,并将其“翻译”成各个组件能够理解的指令:为 Caddy 生成路由配置,为 Weaviate 管理schema,并为前端提供动态主题。

etcd 作为架构的神经中枢

我们为租户设计的 etcd key结构如下:

/multisearch/tenants/{tenant_id}

value 是一个JSON字符串,包含了租户的所有元数据。

{
  "tenantId": "tenant-a",
  "status": "active",
  "domains": ["tenant-a.search.com"],
  "weaviate": {
    "className": "TenantA_Documents",
    "apiKey": "secp256k1-weaviate-key-for-a"
  },
  "theme": {
    "primaryColor": "#3498db",
    "secondaryColor": "#2ecc71",
    "fontColor": "#ffffff",
    "logoUrl": "base64-encoded-svg-or-empty"
  }
}

使用 etcdctl 添加一个新租户的操作非常简单,这使得与CI/CD或自动化脚本的集成变得轻而易举。

# 定义租户配置
TENANT_CONFIG='{
  "tenantId": "tenant-a",
  "status": "active",
  "domains": ["tenant-a.search.com"],
  "weaviate": {
    "className": "TenantA_Documents",
    "apiKey": "secp256k1-weaviate-key-for-a"
  },
  "theme": {
    "primaryColor": "#3498db",
    "secondaryColor": "#2ecc71",
    "fontColor": "#ffffff",
    "logoUrl": ""
  }
}'

# 写入 etcd
etcdctl put /multisearch/tenants/tenant-a "$TENANT_CONFIG"

这个操作就是触发整个系统动态重构的“扳机”。

Go 控制平面:连接 etcd 与 Caddy 的桥梁

静态配置文件无法满足我们的需求。Caddy v2 提供的 Admin API 允许通过一个简单的 HTTP POST 请求来热加载完整的配置,这正是我们需要的。我们的控制平面服务(config-sync)的核心职责就是:

  1. 启动时,从 etcd 全量拉取所有租户配置。
  2. 根据租户配置生成一份完整的 Caddy JSON 配置文件。
  3. 通过 Admin API 应用这份配置。
  4. 持续监听 etcd/multisearch/tenants/ 前缀下的所有变更。
  5. 每当有变更(增、删、改),重复步骤 2 和 3。

下面是 config-sync 服务的核心逻辑简化版。在真实项目中,你需要更健壮的错误处理、重试机制和可配置性。

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	"go.etcd.io/etcd/clientv3"
)

// Tenant represents the structure of our tenant config in etcd
type Tenant struct {
	TenantID string   `json:"tenantId"`
	Status   string   `json:"status"`
	Domains  []string `json:"domains"`
	Theme    struct {
		PrimaryColor   string `json:"primaryColor"`
		SecondaryColor string `json:"secondaryColor"`
	} `json:"theme"`
	// Weaviate config omitted for brevity
}

const (
	etcdEndpoints   = "localhost:2379"
	etcdPrefix      = "/multisearch/tenants/"
	caddyAdminAPI   = "http://localhost:2019/load"
	frontendService = "http://localhost:5173" // Vite dev server
	controlPlaneAPI = "http://localhost:8080" // API for frontend to get config
)

func main() {
	// Setup graceful shutdown
	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer stop()

	// Connect to etcd
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{etcdEndpoints},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatalf("Failed to connect to etcd: %v", err)
	}
	defer cli.Close()

	log.Println("Connected to etcd.")

	// Initial full sync and start watching for changes
	go watchTenants(ctx, cli)

	// Wait for shutdown signal
	<-ctx.Done()
	log.Println("Shutting down config-sync service.")
}

func watchTenants(ctx context.Context, cli *clientv3.Client) {
	// Perform an initial full synchronization
	if err := syncCaddyConfig(ctx, cli); err != nil {
		log.Printf("Initial sync failed: %v. Retrying...", err)
        // In a real app, you'd have a retry loop here
	}

	// Watch for future changes
	watchChan := cli.Watch(ctx, etcdPrefix, clientv3.WithPrefix())
	log.Printf("Watching for changes on prefix: %s", etcdPrefix)

	for {
		select {
		case <-ctx.Done():
			log.Println("Watch loop is shutting down.")
			return
		case wresp := <-watchChan:
			if wresp.Err() != nil {
				log.Printf("Watch error: %v. Re-establishing watch...", wresp.Err())
				// Simplified error handling. A production system would try to re-establish the watch.
				time.Sleep(5 * time.Second)
				watchChan = cli.Watch(ctx, etcdPrefix, clientv3.WithPrefix())
				continue
			}
			log.Printf("Detected %d events. Triggering reconfiguration.", len(wresp.Events))
			if err := syncCaddyConfig(ctx, cli); err != nil {
				log.Printf("Sync failed after watch event: %v", err)
			}
		}
	}
}

// syncCaddyConfig fetches all tenants and pushes a new config to Caddy
func syncCaddyConfig(ctx context.Context, cli *clientv3.Client) error {
	resp, err := cli.Get(ctx, etcdPrefix, clientv3.WithPrefix())
	if err != nil {
		return err
	}

	tenants := make(map[string]Tenant)
	for _, kv := range resp.Kvs {
		var t Tenant
		if err := json.Unmarshal(kv.Value, &t); err != nil {
			log.Printf("Failed to unmarshal tenant config for key %s: %v", string(kv.Key), err)
			continue
		}
		if t.Status == "active" {
			tenants[t.TenantID] = t
		}
	}

	caddyConfig, err := generateCaddyConfig(tenants)
	if err != nil {
		return err
	}

	return postConfigToCaddy(caddyConfig)
}

// This function is the core of the dynamic routing logic
func generateCaddyConfig(tenants map[string]Tenant) ([]byte, error) {
	// Base Caddy server structure
	httpApp := map[string]interface{}{
		"servers": map[string]interface{}{
			"main": map[string]interface{}{
				"listen": []string{":443"},
				"routes": []map[string]interface{}{},
			},
		},
	}
	routes := httpApp["servers"].(map[string]interface{})["main"].(map[string]interface{})["routes"].([]map[string]interface{})

	// Add routes for each tenant
	for _, tenant := range tenants {
		for _, domain := range tenant.Domains {
			tenantRoute := map[string]interface{}{
				"match": []map[string]interface{}{
					{"host": []string{domain}},
				},
				"handle": []map[string]interface{}{
					{
						"handler": "reverse_proxy",
						"upstreams": []map[string]interface{}{
							{"dial": frontendService},
						},
						// We add a header to tell the frontend which tenant this is.
						// A more secure approach could involve JWTs.
						"header_up": map[string]interface{}{
							"X-Tenant-ID": []string{tenant.TenantID},
						},
					},
				},
			}
			routes = append(routes, tenantRoute)
		}
	}
    
    // It's critical to also add a handler for the control plane itself
    // so the frontend can fetch its config.
    apiRoute := map[string]interface{}{
        "match": []map[string]interface{}{
            {"path": []string{"/api/tenant-config/*"}},
        },
        "handle": []map[string]interface{}{
            {
                "handler": "reverse_proxy",
                "upstreams": []map[string]interface{}{
                    {"dial": controlPlaneAPI},
                },
            },
        },
    }
    routes = append(routes, apiRoute)

	// Wrap in the full Caddy config structure
	fullConfig := map[string]interface{}{
		"apps": map[string]interface{}{
			"http": httpApp,
		},
	}

	return json.Marshal(fullConfig)
}

func postConfigToCaddy(config []byte) error {
	req, err := http.NewRequest(http.MethodPost, caddyAdminAPI, bytes.NewReader(config))
	if err != nil {
		return err
	}
	req.Header.Set("Content-Type", "application/json")

	client := &http.Client{Timeout: 10 * time.Second}
	resp, err := client.Do(req)
	if err != nil {
		return err
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		// In production, log the response body for debugging
		log.Printf("Caddy admin API returned non-200 status: %d", resp.StatusCode)
		return err
	}

	log.Println("Successfully pushed new configuration to Caddy.")
	return nil
}

这段代码的核心在于 generateCaddyConfig。它动态地为每个激活的租户构建路由规则。当一个请求 tenant-a.search.com 到达 Caddy 时,Caddy 会匹配到对应的 host,然后将请求代理到我们的Vite前端服务,同时注入一个 X-Tenant-ID 请求头。这个请求头对于前端识别当前上下文至关重要。

Weaviate 中的数据隔离

Weaviate 提供了多租户功能,但我们选择了一种更显式、更易于管理的隔离策略:为每个租户创建一个独立的类(Class)。类名直接来源于 etcd 中的配置,例如 TenantA_Documents

config-sync 服务检测到一个新租户时,除了更新 Caddy 配置,它还会调用 Weaviate 的 API 来创建对应的类。

# A python script snippet demonstrating Weaviate class creation
# This logic would be part of our control plane or a related service.

import weaviate
import os

client = weaviate.Client(
    url="http://localhost:8080",
    auth_client_secret=weaviate.AuthApiKey(api_key=os.getenv("WEAVIATE_ADMIN_KEY")),
)

def provision_tenant_class(class_name, description):
    """
    Creates a new class in Weaviate for a new tenant.
    This must be idempotent.
    """
    try:
        # Check if class already exists
        client.schema.get(class_name)
        print(f"Class '{class_name}' already exists. Skipping creation.")
        return
    except weaviate.exceptions.UnexpectedStatusCodeException as e:
        if e.status_code == 404:
            # Class does not exist, so we create it.
            pass
        else:
            # Another error occurred
            raise e

    tenant_class = {
        "class": class_name,
        "description": description,
        "vectorizer": "text2vec-openai",
        "properties": [
            {
                "name": "content",
                "dataType": ["text"],
                "description": "The content of the document",
            },
            {
                "name": "source",
                "dataType": ["string"],
                "description": "The source of the document",
            }
        ]
    }

    client.schema.create_class(tenant_class)
    print(f"Successfully created class: {class_name}")

# Example usage when a new tenant 'tenant-b' is detected
provision_tenant_class("TenantB_Documents", "Documents for Tenant B")

这种方法的优点是隔离性极强,不同租户的数据在物理上(或至少在逻辑索引上)是完全分开的,不会有任何意外的数据泄露风险。查询时,前端应用只需要根据当前租户ID,动态构建指向正确类名的GraphQL查询即可。

Vite + Styled-components:打造动态主题前端

前端应用需要做到两件事:

  1. 识别当前是哪个租户。
  2. 获取该租户的配置(尤其是主题),并应用到UI上。

我们的 Caddy 配置已经通过 X-Tenant-ID 头解决了第一个问题。但浏览器端的JS无法直接读取请求头。因此,我们让前端应用在加载时,根据 window.location.hostname 向我们的后端API(由 config-sync 或另一个服务提供)请求配置。

src/contexts/TenantProvider.tsx:

import React, { createContext, useContext, useState, useEffect } from 'react';
import { ThemeProvider } from 'styled-components';

// Define the shape of tenant config
interface TenantTheme {
  primaryColor: string;
  secondaryColor: string;
  fontColor: string;
}

interface TenantConfig {
  tenantId: string;
  theme: TenantTheme;
}

// A default theme to prevent errors on initial load
const defaultTheme: TenantTheme = {
  primaryColor: '#cccccc',
  secondaryColor: '#aaaaaa',
  fontColor: '#000000',
};

const TenantContext = createContext<TenantConfig | null>(null);

export const useTenant = () => useContext(TenantContext);

export const TenantProvider: React.FC<{ children: React.ReactNode }> = ({ children }) => {
  const [tenantConfig, setTenantConfig] = useState<TenantConfig | null>(null);
  const [loading, setLoading] = useState(true);

  useEffect(() => {
    const fetchTenantConfig = async () => {
      try {
        // In a real app, you'd extract the tenant ID from the hostname
        // For this example, we'll hardcode it, but imagine logic like:
        // const tenantId = window.location.hostname.split('.')[0];
        const tenantId = 'tenant-a'; // This would be dynamic

        // This endpoint is proxied by Caddy to our control plane service
        const response = await fetch(`/api/tenant-config/${tenantId}`);
        if (!response.ok) {
          throw new Error('Tenant configuration not found.');
        }
        const data: TenantConfig = await response.json();
        setTenantConfig(data);
      } catch (error) {
        console.error("Failed to fetch tenant config:", error);
        // Fallback to a default or error state
        setTenantConfig({ tenantId: 'default', theme: defaultTheme });
      } finally {
        setLoading(false);
      }
    };

    fetchTenantConfig();
  }, []);

  if (loading) {
    return <div>Loading Tenant...</div>; // Or a proper skeleton loader
  }

  return (
    <TenantContext.Provider value={tenantConfig}>
      <ThemeProvider theme={tenantConfig?.theme || defaultTheme}>
        {children}
      </ThemeProvider>
    </TenantContext.Provider>
  );
};

有了 TenantProvider,应用中的任何组件都可以通过 styled-componentstheme prop 访问到租户的颜色配置。

src/components/SearchBox.tsx:

import React from 'react';
import styled from 'styled-components';

const SearchContainer = styled.div`
  padding: 2rem;
  background-color: #f0f0f0;
  border-radius: 8px;
  display: flex;
  gap: 1rem;
`;

// Here's the magic: theme properties are accessed via a function.
const SearchInput = styled.input`
  flex-grow: 1;
  padding: 0.8rem 1rem;
  border: 2px solid ${props => props.theme.secondaryColor};
  border-radius: 4px;
  font-size: 1rem;

  &:focus {
    outline: none;
    border-color: ${props => props.theme.primaryColor};
    box-shadow: 0 0 5px ${props => props.theme.primaryColor};
  }
`;

const SearchButton = styled.button`
  padding: 0.8rem 1.5rem;
  border: none;
  border-radius: 4px;
  cursor: pointer;
  font-size: 1rem;
  font-weight: bold;
  background-color: ${props => props.theme.primaryColor};
  color: ${props => props.theme.fontColor};
  transition: background-color 0.2s ease;

  &:hover {
    opacity: 0.9;
  }
`;

export const SearchBox = () => {
  return (
    <SearchContainer>
      <SearchInput type="text" placeholder="Enter your semantic query..." />
      <SearchButton>Search</SearchButton>
    </SearchContainer>
  );
};

现在,当 tenant-a.search.com 被访问时,SearchButton 的背景色会是 #3498db。如果我们通过 etcdctl 更新 tenant-aprimaryColor#e74c3c (红色),config-sync 会立刻感知到(虽然它只关心Caddy的路由,但提供配置的API会读到新值),用户下次刷新页面时,按钮就会变成红色。整个过程无需任何代码部署。

当前方案的局限性与未来展望

这个架构虽然实现了高度的自动化,但也引入了一些需要关注的复杂性。config-sync 服务成为了一个单点故障(SPOF),在生产环境中,它必须以高可用的方式部署,例如通过Kubernetes的Deployment配合Leader Election机制,确保同一时间只有一个实例在向 Caddy 推送配置。

其次,将完整的 Caddy 配置存储在内存中并全量更新,对于几百个租户来说是可行的。但当租户数量达到数千乃至数万时,这份JSON配置会变得异常庞大,对 Caddy 加载配置的性能和内存消耗会构成挑战。未来的优化方向可能是探索 Caddy 的增量配置API(如果未来支持),或者将租户路由逻辑部分移至一个更轻量的、专门定制的 Caddy 插件中,该插件直接与 etcd 通信。

最后,安全模型需要加固。当前通过 X-Tenant-ID 头来识别租户的方式过于简单,容易被伪造。一个更成熟的方案是引入基于JWT的认证体系,由一个中心化的认证服务为每个租户的会话颁发带有租户信息的token,Caddy 和后端服务则负责校验这些token的有效性。


  目录