使用 Rust 构建一个 Kubernetes 原生的 gRPC 动态路由网关


在 Kubernetes 环境中管理 gRPC 服务间通信,常常会陷入两难境地:要么使用功能强大但过于笨重的服务网格(Service Mesh),要么依赖 L7 Ingress Controller,但它们对 gRPC 的支持往往仅限于透传,缺乏应用层的路由能力。当需求只是一个轻量级、高性能、能够根据 gRPC 的服务与方法动态路由的网关时,市面上的通用方案就显得大材小用。我们的痛点很明确:需要一个无需独立控制平面、能够直接利用 Kubernetes API 作为服务发现源、并且对 gRPC 协议有深度理解的 Sidecar 代理。

这个项目的初步构想是,构建一个 Rust 应用,它以 Sidecar 容器的形式与业务应用部署在同一个 Pod 中。它监听一个端口,接收所有出站的 gRPC 请求。通过解析请求的 gRPC Full Method Name(例如 /package.Service/Method),它能识别出目标服务。同时,它会持续监听 Kubernetes API Server,获取所有 ServiceEndpointSlice 的变化,在内存中维护一个从 gRPC 服务名到后端 Pod IP 地址列表的动态映射。当请求到来时,它从映射中选择一个健康的后端 Pod IP,并将请求代理过去。

技术选型决策如下:

  • 语言: Rust。对于网络代理这种需要高性能、高并发和内存安全的基础设施组件,Rust 是不二之选。其 async/await 语法结合 Tokio 运行时,能以极低的资源开销处理海量并发连接。所有权系统则从根本上杜绝了数据竞争和内存泄漏等顽固问题。
  • gRPC 协议栈: Tonic。Tonic 是 Rust 生态中最成熟的 gRPC 实现,它构建于 hypertower 之上,提供了强大的中间件抽象能力和出色的性能。
  • Kubernetes 客户端: kube-rskube-rs 提供了与 Kubernetes API 交互的健壮客户端,支持 watch 机制,这对于实现动态配置更新至关重要。

整个实现的核心分为三个部分:Kubernetes 服务发现、动态路由表维护、以及 gRPC 请求代理。

第一步: 监听 Kubernetes API 实现服务发现

我们需要一个后台任务,持续监听 EndpointSlice 资源的变化。EndpointSlice 比传统的 Endpoints 资源更具扩展性,是现代 Kubernetes 集群中服务发现的首选。当 EndpointSlice 发生变化时(Pod 的增减、就绪状态变更),我们就更新内存中的路由表。

这是项目依赖 Cargo.toml:

[package]
name = "grpc-k8s-proxy"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { version = "1", features = ["full"] }
tonic = "0.10"
hyper = "0.14"
tower = "0.4"
prost = "0.12"

kube = { version = "0.85", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.20.0", features = ["v1"] }
futures = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
anyhow = "1.0"
thiserror = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

# gRPC Health Checking Protocol
tonic-health = "0.10.2"

服务发现模块的代码负责与 K8s API 通信。

// src/discovery.rs

use anyhow::Result;
use futures::stream::{StreamExt, TryStreamExt};
use k8s_openapi::api::discovery::v1::EndpointSlice;
use kube::{
    api::{Api, ListParams, ResourceExt},
    runtime::{watcher, WatchStreamExt},
    Client,
};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;
use tracing::{error, info, warn};

// 路由表的核心数据结构:
// Key: gRPC Service FQDN (e.g., "user.UserService")
// Value: Vec of healthy backend Pod IPs (e.g., ["10.1.1.2:50051", "10.1.1.3:50051"])
pub type RoutingTable = Arc<Mutex<HashMap<String, Vec<String>>>>;

pub async fn watch_endpoints(client: Client, routing_table: RoutingTable) -> Result<()> {
    // 监控所有命名空间中的 EndpointSlice
    let slices: Api<EndpointSlice> = Api::all(client);
    // 我们只关心那些由 Kubernetes Service 管理的 EndpointSlice
    let lp = ListParams::default().labels("kubernetes.io/service-name");

    let mut watcher = watcher(slices, lp).applied_objects().boxed();

    info!("Starting Kubernetes EndpointSlice watcher...");

    loop {
        match watcher.try_next().await {
            Ok(Some(slice)) => {
                let service_name = slice
                    .metadata
                    .labels
                    .as_ref()
                    .and_then(|labels| labels.get("kubernetes.io/service-name"))
                    .cloned();

                if service_name.is_none() {
                    // 理论上 ListParams 已经过滤,但在真实项目中,防御性编程是必要的
                    warn!(slice = %slice.name_any(), "EndpointSlice without 'kubernetes.io/service-name' label, skipping.");
                    continue;
                }
                let service_name = service_name.unwrap();
                
                // 我们需要一个约定,即 gRPC Service 的 FQDN 与 Kubernetes Service name 相关
                // 例如,`user.UserService` 对应名为 `user-service` 的 K8s Service。
                // 这里的转换逻辑需要根据团队规范来定,此处做一个简单示例:
                // `user-service` -> `user.UserService` (这部分在实际中可能需要更复杂的映射或CRD)
                // 为简化,我们假设gRPC服务名就是K8s服务名,在代理时进行转换。
                // 真实项目中,可能通过 CRD 或 Annotation 来声明 gRPC 服务名。
                // 此处我们直接使用 K8s Service Name 作为 Key。
                
                let mut endpoints: Vec<String> = Vec::new();
                if let Some(slice_endpoints) = slice.endpoints {
                    for endpoint in slice_endpoints {
                        // 只选择处于 "Ready" 状态的 Pod
                        if let Some(conditions) = endpoint.conditions {
                            if conditions.ready.unwrap_or(false) {
                                if let Some(addresses) = endpoint.addresses {
                                    for address in addresses {
                                        // 假设 gRPC 服务端口在 EndpointSlice 中有明确定义
                                        if let Some(port_info) = &slice.ports {
                                            if let Some(port) = port_info.get(0).and_then(|p| p.port) {
                                                endpoints.push(format!("http://{}:{}", address, port));
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }
                }

                let mut table = routing_table.lock().await;
                if endpoints.is_empty() {
                    info!(service = %service_name, "Service has no ready endpoints, removing from routing table.");
                    table.remove(&service_name);
                } else {
                    info!(service = %service_name, endpoints = ?endpoints, "Updating routing table for service.");
                    table.insert(service_name.clone(), endpoints);
                }
            }
            Ok(None) => {
                // 流结束,这在 watcher 中不应该发生
                warn!("EndpointSlice watcher stream ended unexpectedly.");
                break;
            }
            Err(e) => {
                error!(error = %e, "Error watching EndpointSlices.");
                // 在生产环境中,这里应该有重试和退避策略
                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
            }
        }
    }
    Ok(())
}

第二步: gRPC 请求代理与动态负载均衡

这是代理的核心逻辑。它接收一个请求,解析出目标服务,从路由表中查找可用的后端,然后将请求转发出去。

// src/proxy.rs

use crate::discovery::RoutingTable;
use hyper::{Body, Request, Response, Uri};
use std::{collections::HashMap, sync::Arc, convert::Infallible};
use tokio::sync::Mutex;
use tonic::transport::Channel;
use tower::Service;
use tracing::{debug, error, info, warn};

// 客户端连接池,避免为每个请求重复建立 TCP 和 HTTP/2 连接
// Key: Backend Address (e.g., "http://10.1.1.2:50051")
// Value: Tonic Channel
type ConnectionPool = Arc<Mutex<HashMap<String, Channel>>>;

#[derive(Clone)]
pub struct GrpcProxy {
    routing_table: RoutingTable,
    connection_pool: ConnectionPool,
}

impl GrpcProxy {
    pub fn new(routing_table: RoutingTable) -> Self {
        Self {
            routing_table,
            connection_pool: Arc::new(Mutex::new(HashMap::new())),
        }
    }

    // 从 gRPC 请求路径中解析出服务名
    // e.g. /main.EchoService/SayHello -> main.EchoService
    // 这是一个简化实现,生产环境需要处理更复杂的 package name
    fn get_service_name_from_path(path: &str) -> Option<String> {
        path.strip_prefix('/')
            .and_then(|p| p.split('/').next())
            // 假设 gRPC FQDN 映射到 K8s Service Name 的规则是:
            // `package.Service` -> `package-service`
            // `Greeter` -> `greeter`
            .map(|s| s.replace('.', "-").to_lowercase())
    }

    // 一个简单的轮询负载均衡策略
    // 在真实项目中,这里可以替换为更复杂的策略,如最少连接数、P2C 等
    async fn select_backend(&self, service_name: &str) -> Option<String> {
        let table = self.routing_table.lock().await;
        table.get(service_name).and_then(|endpoints| {
            if endpoints.is_empty() {
                None
            } else {
                // 简单的轮询实现 (伪随机,实际应用应使用原子计数器等)
                let index = (rand::random::<usize>()) % endpoints.len();
                endpoints.get(index).cloned()
            }
        })
    }

    // 获取或创建一个到后端的连接
    async fn get_channel(&self, backend_addr: String) -> Result<Channel, tonic::transport::Error> {
        let mut pool = self.connection_pool.lock().await;
        if let Some(channel) = pool.get(&backend_addr) {
            // 在生产环境中,需要检查 channel 的健康状态
            // tower::discover::Discovery 等机制可以做得更好
            return Ok(channel.clone());
        }

        info!(address = %backend_addr, "Creating new connection to backend.");
        let endpoint = tonic::transport::Endpoint::from_shared(backend_addr.clone())?;
        let channel = endpoint.connect().await?;
        
        pool.insert(backend_addr, channel.clone());
        Ok(channel)
    }
}

// 实现 tower::Service trait 是与 hyper 集成的关键
impl Service<Request<Body>> for GrpcProxy {
    type Response = Response<Body>;
    type Error = Infallible;
    type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
        std::task::Poll::Ready(Ok(()))
    }

    fn call(&mut self, mut req: Request<Body>) -> Self::Future {
        let clone = self.clone();
        Box::pin(async move {
            let path = req.uri().path().to_string();
            debug!(path = %path, "Received gRPC request");

            let service_name = match Self::get_service_name_from_path(&path) {
                Some(name) => name,
                None => {
                    error!(path = %path, "Failed to parse service name from path");
                    let mut resp = Response::new(Body::from("Invalid gRPC path"));
                    *resp.status_mut() = hyper::StatusCode::BAD_REQUEST;
                    return Ok(resp);
                }
            };
            
            debug!(service = %service_name, "Resolved service name");

            let backend_addr = match clone.select_backend(&service_name).await {
                Some(addr) => addr,
                None => {
                    warn!(service = %service_name, "No healthy backend found for service");
                    let mut resp = Response::new(Body::from(format!("Service '{}' unavailable", service_name)));
                    // gRPC 错误码 `UNAVAILABLE` 对应 HTTP 状态码 503
                    *resp.status_mut() = hyper::StatusCode::SERVICE_UNAVAILABLE; 
                    return Ok(resp);
                }
            };

            debug!(backend = %backend_addr, "Selected backend for request");

            // 重写请求的 URI,指向选定的后端 Pod
            let new_uri = {
                let mut parts = req.uri().clone().into_parts();
                parts.scheme = Some(backend_addr.parse::<Uri>().unwrap().scheme().unwrap().clone());
                parts.authority = Some(backend_addr.parse::<Uri>().unwrap().authority().unwrap().clone());
                Uri::from_parts(parts).unwrap()
            };
            *req.uri_mut() = new_uri;

            // 获取到后端的连接并发送请求
            match clone.get_channel(backend_addr).await {
                Ok(mut channel) => {
                    // 使用 tower::Service::call 来转发请求
                    // 这里的 unwrap 在真实代码中需要更优雅的处理
                    match channel.call(req).await {
                        Ok(resp) => Ok(resp),
                        Err(e) => {
                            error!(error = ?e, "Failed to proxy request to backend");
                            let mut resp = Response::new(Body::from("Upstream request failed"));
                            *resp.status_mut() = hyper::StatusCode::BAD_GATEWAY;
                            Ok(resp)
                        }
                    }
                }
                Err(e) => {
                    error!(error = %e, "Failed to connect to backend");
                    let mut resp = Response::new(Body::from("Failed to connect to upstream service"));
                    *resp.status_mut() = hyper::StatusCode::SERVICE_UNAVAILABLE;
                    Ok(resp)
                }
            }
        })
    }
}

第三步: 整合与启动

main.rs 文件将所有部分组合在一起:初始化 Kubernetes 客户端,启动服务发现的后台任务,并运行 gRPC 代理服务器。

// src/main.rs

mod discovery;
mod proxy;

use anyhow::Result;
use kube::Client;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;
use tracing::info;
use hyper::server::conn::Http;
use tokio::net::TcpListener;
use tower::ServiceBuilder;

#[tokio::main]
async fn main() -> Result<()> {
    // 初始化日志系统,这对于调试分布式系统至关重要
    tracing_subscriber::fmt()
        .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
        .init();

    // 初始化 Kubernetes 客户端
    // kube-rs 会自动从环境中加载配置 (in-cluster or from kubeconfig)
    let client = Client::try_default().await?;
    info!("Successfully connected to Kubernetes API server");

    // 创建共享的路由表
    let routing_table = Arc::new(Mutex::new(HashMap::new()));

    // 在后台启动 EndpointSlice watcher
    let discovery_handle = tokio::spawn(discovery::watch_endpoints(client.clone(), routing_table.clone()));

    // 初始化代理服务
    let proxy_service = proxy::GrpcProxy::new(routing_table.clone());
    
    // 构建我们的服务器
    let addr = "0.0.0.0:8080".parse()?;
    let listener = TcpListener::bind(addr).await?;
    info!(address = %addr, "gRPC proxy listening");

    loop {
        let (stream, remote_addr) = listener.accept().await?;
        info!(client.addr = %remote_addr, "Accepted new connection");

        let tower_service = ServiceBuilder::new().service(proxy_service.clone());
        
        tokio::spawn(async move {
            if let Err(err) = Http::new()
                .http2_only(true)
                .serve_connection(stream, tower_service)
                .await
            {
                // 连接错误是常见的,例如客户端断开,所以用 warn 级别
                warn!(error = %err, "Error serving connection");
            }
        });
    }

    // 实际项目中,需要优雅停机逻辑来处理 discovery_handle 的关闭
}

部署为 Sidecar

为了将这个代理部署为 Sidecar,我们需要一个 Dockerfile 和相应的 Kubernetes Deployment YAML。

Dockerfile:

# 使用 Rust 官方镜像进行构建
FROM rust:1.73 as builder
WORKDIR /usr/src/grpc-k8s-proxy
COPY . .
# 使用 cargo-chef 进行缓存优化,加快后续构建速度
# RUN cargo install cargo-chef
# COPY ./Cargo.toml ./Cargo.lock ./
# RUN cargo chef prepare --recipe-path recipe.json
# COPY . .
# RUN cargo chef cook --release --recipe-path recipe.json
# 对于简单项目,直接构建也可以
RUN cargo build --release

# 使用一个轻量级的基础镜像进行部署
FROM debian:bullseye-slim
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /usr/src/grpc-k8s-proxy/target/release/grpc-k8s-proxy /usr/local/bin/grpc-k8s-proxy
CMD ["grpc-k8s-proxy"]

Kubernetes Deployment YAML:

假设我们有一个名为 my-app 的应用,它需要调用 user-service

apiVersion: v1
kind: ServiceAccount
metadata:
  name: grpc-proxy-sa
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: endpointslice-watcher
rules:
- apiGroups: ["discovery.k8s.io"]
  resources: ["endpointslices"]
  verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: grpc-proxy-rolebinding
subjects:
- kind: ServiceAccount
  name: grpc-proxy-sa
  namespace: default # 替换为你的命名空间
roleRef:
  kind: ClusterRole
  name: endpointslice-watcher
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: my-app-deployment
spec:
  replicas: 2
  selector:
    matchLabels:
      app: my-app
  template:
    metadata:
      labels:
        app: my-app
    spec:
      serviceAccountName: grpc-proxy-sa
      containers:
      - name: my-app
        image: my-app-image:latest
        # 应用需要将所有 gRPC 请求发送到 localhost:8080
        env:
        - name: USER_SERVICE_ADDR
          value: "http://localhost:8080"
      - name: grpc-proxy
        image: your-repo/grpc-k8s-proxy:latest
        ports:
        - containerPort: 8080
        env:
        - name: RUST_LOG
          value: "info,kube=warn" # 配置日志级别

下面的 Mermaid 图清晰地展示了整个请求流程:

graph TD
    subgraph Pod
        A[Client Application]
        P[gRPC Proxy Sidecar]
    end

    subgraph Kubernetes Cluster
        K8S_API[Kubernetes API Server]
        Svc[Service: user-service]
        EP1[Pod: user-service-pod-1]
        EP2[Pod: user-service-pod-2]
    end

    A -- "gRPC call to localhost:8080" --> P
    P -- "1. Watch EndpointSlices" --> K8S_API
    K8S_API -- "2. Returns IPs for user-service" --> P
    P -- "3. Forwards gRPC call to healthy Pod IP" --> EP1

这个方案虽然已经可以工作,但在生产环境中仍有其局限性。首先,负载均衡策略过于简单,一个基于 P2C (Power of Two Choices) 或最少连接数的策略会更优。其次,当前实现没有处理 gRPC 的健康检查协议,无法主动剔除不健康的后端实例,只能被动依赖 Kubernetes 的就绪探针。此外,完整的可观测性,例如集成 Prometheus 导出关键指标(请求延迟、成功率、后端连接数)和 OpenTelemetry 进行分布式追踪,对于问题排查和性能分析是必不可少的。未来的迭代方向将是引入 CRD 来定义更复杂的路由规则,例如基于 gRPC Method 或 Header 的路由,以及实现更智能的负载均衡和熔断策略,使其成为一个更完备的微服务基础设施组件。


  目录