在 Kubernetes 环境中管理 gRPC 服务间通信,常常会陷入两难境地:要么使用功能强大但过于笨重的服务网格(Service Mesh),要么依赖 L7 Ingress Controller,但它们对 gRPC 的支持往往仅限于透传,缺乏应用层的路由能力。当需求只是一个轻量级、高性能、能够根据 gRPC 的服务与方法动态路由的网关时,市面上的通用方案就显得大材小用。我们的痛点很明确:需要一个无需独立控制平面、能够直接利用 Kubernetes API 作为服务发现源、并且对 gRPC 协议有深度理解的 Sidecar 代理。
这个项目的初步构想是,构建一个 Rust 应用,它以 Sidecar 容器的形式与业务应用部署在同一个 Pod 中。它监听一个端口,接收所有出站的 gRPC 请求。通过解析请求的 gRPC Full Method Name(例如 /package.Service/Method
),它能识别出目标服务。同时,它会持续监听 Kubernetes API Server,获取所有 Service
和 EndpointSlice
的变化,在内存中维护一个从 gRPC 服务名到后端 Pod IP 地址列表的动态映射。当请求到来时,它从映射中选择一个健康的后端 Pod IP,并将请求代理过去。
技术选型决策如下:
- 语言: Rust。对于网络代理这种需要高性能、高并发和内存安全的基础设施组件,Rust 是不二之选。其
async/await
语法结合 Tokio 运行时,能以极低的资源开销处理海量并发连接。所有权系统则从根本上杜绝了数据竞争和内存泄漏等顽固问题。 - gRPC 协议栈: Tonic。Tonic 是 Rust 生态中最成熟的 gRPC 实现,它构建于
hyper
和tower
之上,提供了强大的中间件抽象能力和出色的性能。 - Kubernetes 客户端: kube-rs。
kube-rs
提供了与 Kubernetes API 交互的健壮客户端,支持watch
机制,这对于实现动态配置更新至关重要。
整个实现的核心分为三个部分:Kubernetes 服务发现、动态路由表维护、以及 gRPC 请求代理。
第一步: 监听 Kubernetes API 实现服务发现
我们需要一个后台任务,持续监听 EndpointSlice
资源的变化。EndpointSlice
比传统的 Endpoints
资源更具扩展性,是现代 Kubernetes 集群中服务发现的首选。当 EndpointSlice
发生变化时(Pod 的增减、就绪状态变更),我们就更新内存中的路由表。
这是项目依赖 Cargo.toml
:
[package]
name = "grpc-k8s-proxy"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["full"] }
tonic = "0.10"
hyper = "0.14"
tower = "0.4"
prost = "0.12"
kube = { version = "0.85", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.20.0", features = ["v1"] }
futures = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
anyhow = "1.0"
thiserror = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
# gRPC Health Checking Protocol
tonic-health = "0.10.2"
服务发现模块的代码负责与 K8s API 通信。
// src/discovery.rs
use anyhow::Result;
use futures::stream::{StreamExt, TryStreamExt};
use k8s_openapi::api::discovery::v1::EndpointSlice;
use kube::{
api::{Api, ListParams, ResourceExt},
runtime::{watcher, WatchStreamExt},
Client,
};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;
use tracing::{error, info, warn};
// 路由表的核心数据结构:
// Key: gRPC Service FQDN (e.g., "user.UserService")
// Value: Vec of healthy backend Pod IPs (e.g., ["10.1.1.2:50051", "10.1.1.3:50051"])
pub type RoutingTable = Arc<Mutex<HashMap<String, Vec<String>>>>;
pub async fn watch_endpoints(client: Client, routing_table: RoutingTable) -> Result<()> {
// 监控所有命名空间中的 EndpointSlice
let slices: Api<EndpointSlice> = Api::all(client);
// 我们只关心那些由 Kubernetes Service 管理的 EndpointSlice
let lp = ListParams::default().labels("kubernetes.io/service-name");
let mut watcher = watcher(slices, lp).applied_objects().boxed();
info!("Starting Kubernetes EndpointSlice watcher...");
loop {
match watcher.try_next().await {
Ok(Some(slice)) => {
let service_name = slice
.metadata
.labels
.as_ref()
.and_then(|labels| labels.get("kubernetes.io/service-name"))
.cloned();
if service_name.is_none() {
// 理论上 ListParams 已经过滤,但在真实项目中,防御性编程是必要的
warn!(slice = %slice.name_any(), "EndpointSlice without 'kubernetes.io/service-name' label, skipping.");
continue;
}
let service_name = service_name.unwrap();
// 我们需要一个约定,即 gRPC Service 的 FQDN 与 Kubernetes Service name 相关
// 例如,`user.UserService` 对应名为 `user-service` 的 K8s Service。
// 这里的转换逻辑需要根据团队规范来定,此处做一个简单示例:
// `user-service` -> `user.UserService` (这部分在实际中可能需要更复杂的映射或CRD)
// 为简化,我们假设gRPC服务名就是K8s服务名,在代理时进行转换。
// 真实项目中,可能通过 CRD 或 Annotation 来声明 gRPC 服务名。
// 此处我们直接使用 K8s Service Name 作为 Key。
let mut endpoints: Vec<String> = Vec::new();
if let Some(slice_endpoints) = slice.endpoints {
for endpoint in slice_endpoints {
// 只选择处于 "Ready" 状态的 Pod
if let Some(conditions) = endpoint.conditions {
if conditions.ready.unwrap_or(false) {
if let Some(addresses) = endpoint.addresses {
for address in addresses {
// 假设 gRPC 服务端口在 EndpointSlice 中有明确定义
if let Some(port_info) = &slice.ports {
if let Some(port) = port_info.get(0).and_then(|p| p.port) {
endpoints.push(format!("http://{}:{}", address, port));
}
}
}
}
}
}
}
}
let mut table = routing_table.lock().await;
if endpoints.is_empty() {
info!(service = %service_name, "Service has no ready endpoints, removing from routing table.");
table.remove(&service_name);
} else {
info!(service = %service_name, endpoints = ?endpoints, "Updating routing table for service.");
table.insert(service_name.clone(), endpoints);
}
}
Ok(None) => {
// 流结束,这在 watcher 中不应该发生
warn!("EndpointSlice watcher stream ended unexpectedly.");
break;
}
Err(e) => {
error!(error = %e, "Error watching EndpointSlices.");
// 在生产环境中,这里应该有重试和退避策略
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
Ok(())
}
第二步: gRPC 请求代理与动态负载均衡
这是代理的核心逻辑。它接收一个请求,解析出目标服务,从路由表中查找可用的后端,然后将请求转发出去。
// src/proxy.rs
use crate::discovery::RoutingTable;
use hyper::{Body, Request, Response, Uri};
use std::{collections::HashMap, sync::Arc, convert::Infallible};
use tokio::sync::Mutex;
use tonic::transport::Channel;
use tower::Service;
use tracing::{debug, error, info, warn};
// 客户端连接池,避免为每个请求重复建立 TCP 和 HTTP/2 连接
// Key: Backend Address (e.g., "http://10.1.1.2:50051")
// Value: Tonic Channel
type ConnectionPool = Arc<Mutex<HashMap<String, Channel>>>;
#[derive(Clone)]
pub struct GrpcProxy {
routing_table: RoutingTable,
connection_pool: ConnectionPool,
}
impl GrpcProxy {
pub fn new(routing_table: RoutingTable) -> Self {
Self {
routing_table,
connection_pool: Arc::new(Mutex::new(HashMap::new())),
}
}
// 从 gRPC 请求路径中解析出服务名
// e.g. /main.EchoService/SayHello -> main.EchoService
// 这是一个简化实现,生产环境需要处理更复杂的 package name
fn get_service_name_from_path(path: &str) -> Option<String> {
path.strip_prefix('/')
.and_then(|p| p.split('/').next())
// 假设 gRPC FQDN 映射到 K8s Service Name 的规则是:
// `package.Service` -> `package-service`
// `Greeter` -> `greeter`
.map(|s| s.replace('.', "-").to_lowercase())
}
// 一个简单的轮询负载均衡策略
// 在真实项目中,这里可以替换为更复杂的策略,如最少连接数、P2C 等
async fn select_backend(&self, service_name: &str) -> Option<String> {
let table = self.routing_table.lock().await;
table.get(service_name).and_then(|endpoints| {
if endpoints.is_empty() {
None
} else {
// 简单的轮询实现 (伪随机,实际应用应使用原子计数器等)
let index = (rand::random::<usize>()) % endpoints.len();
endpoints.get(index).cloned()
}
})
}
// 获取或创建一个到后端的连接
async fn get_channel(&self, backend_addr: String) -> Result<Channel, tonic::transport::Error> {
let mut pool = self.connection_pool.lock().await;
if let Some(channel) = pool.get(&backend_addr) {
// 在生产环境中,需要检查 channel 的健康状态
// tower::discover::Discovery 等机制可以做得更好
return Ok(channel.clone());
}
info!(address = %backend_addr, "Creating new connection to backend.");
let endpoint = tonic::transport::Endpoint::from_shared(backend_addr.clone())?;
let channel = endpoint.connect().await?;
pool.insert(backend_addr, channel.clone());
Ok(channel)
}
}
// 实现 tower::Service trait 是与 hyper 集成的关键
impl Service<Request<Body>> for GrpcProxy {
type Response = Response<Body>;
type Error = Infallible;
type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, mut req: Request<Body>) -> Self::Future {
let clone = self.clone();
Box::pin(async move {
let path = req.uri().path().to_string();
debug!(path = %path, "Received gRPC request");
let service_name = match Self::get_service_name_from_path(&path) {
Some(name) => name,
None => {
error!(path = %path, "Failed to parse service name from path");
let mut resp = Response::new(Body::from("Invalid gRPC path"));
*resp.status_mut() = hyper::StatusCode::BAD_REQUEST;
return Ok(resp);
}
};
debug!(service = %service_name, "Resolved service name");
let backend_addr = match clone.select_backend(&service_name).await {
Some(addr) => addr,
None => {
warn!(service = %service_name, "No healthy backend found for service");
let mut resp = Response::new(Body::from(format!("Service '{}' unavailable", service_name)));
// gRPC 错误码 `UNAVAILABLE` 对应 HTTP 状态码 503
*resp.status_mut() = hyper::StatusCode::SERVICE_UNAVAILABLE;
return Ok(resp);
}
};
debug!(backend = %backend_addr, "Selected backend for request");
// 重写请求的 URI,指向选定的后端 Pod
let new_uri = {
let mut parts = req.uri().clone().into_parts();
parts.scheme = Some(backend_addr.parse::<Uri>().unwrap().scheme().unwrap().clone());
parts.authority = Some(backend_addr.parse::<Uri>().unwrap().authority().unwrap().clone());
Uri::from_parts(parts).unwrap()
};
*req.uri_mut() = new_uri;
// 获取到后端的连接并发送请求
match clone.get_channel(backend_addr).await {
Ok(mut channel) => {
// 使用 tower::Service::call 来转发请求
// 这里的 unwrap 在真实代码中需要更优雅的处理
match channel.call(req).await {
Ok(resp) => Ok(resp),
Err(e) => {
error!(error = ?e, "Failed to proxy request to backend");
let mut resp = Response::new(Body::from("Upstream request failed"));
*resp.status_mut() = hyper::StatusCode::BAD_GATEWAY;
Ok(resp)
}
}
}
Err(e) => {
error!(error = %e, "Failed to connect to backend");
let mut resp = Response::new(Body::from("Failed to connect to upstream service"));
*resp.status_mut() = hyper::StatusCode::SERVICE_UNAVAILABLE;
Ok(resp)
}
}
})
}
}
第三步: 整合与启动
main.rs
文件将所有部分组合在一起:初始化 Kubernetes 客户端,启动服务发现的后台任务,并运行 gRPC 代理服务器。
// src/main.rs
mod discovery;
mod proxy;
use anyhow::Result;
use kube::Client;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;
use tracing::info;
use hyper::server::conn::Http;
use tokio::net::TcpListener;
use tower::ServiceBuilder;
#[tokio::main]
async fn main() -> Result<()> {
// 初始化日志系统,这对于调试分布式系统至关重要
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
// 初始化 Kubernetes 客户端
// kube-rs 会自动从环境中加载配置 (in-cluster or from kubeconfig)
let client = Client::try_default().await?;
info!("Successfully connected to Kubernetes API server");
// 创建共享的路由表
let routing_table = Arc::new(Mutex::new(HashMap::new()));
// 在后台启动 EndpointSlice watcher
let discovery_handle = tokio::spawn(discovery::watch_endpoints(client.clone(), routing_table.clone()));
// 初始化代理服务
let proxy_service = proxy::GrpcProxy::new(routing_table.clone());
// 构建我们的服务器
let addr = "0.0.0.0:8080".parse()?;
let listener = TcpListener::bind(addr).await?;
info!(address = %addr, "gRPC proxy listening");
loop {
let (stream, remote_addr) = listener.accept().await?;
info!(client.addr = %remote_addr, "Accepted new connection");
let tower_service = ServiceBuilder::new().service(proxy_service.clone());
tokio::spawn(async move {
if let Err(err) = Http::new()
.http2_only(true)
.serve_connection(stream, tower_service)
.await
{
// 连接错误是常见的,例如客户端断开,所以用 warn 级别
warn!(error = %err, "Error serving connection");
}
});
}
// 实际项目中,需要优雅停机逻辑来处理 discovery_handle 的关闭
}
部署为 Sidecar
为了将这个代理部署为 Sidecar,我们需要一个 Dockerfile 和相应的 Kubernetes Deployment YAML。
Dockerfile:
# 使用 Rust 官方镜像进行构建
FROM rust:1.73 as builder
WORKDIR /usr/src/grpc-k8s-proxy
COPY . .
# 使用 cargo-chef 进行缓存优化,加快后续构建速度
# RUN cargo install cargo-chef
# COPY ./Cargo.toml ./Cargo.lock ./
# RUN cargo chef prepare --recipe-path recipe.json
# COPY . .
# RUN cargo chef cook --release --recipe-path recipe.json
# 对于简单项目,直接构建也可以
RUN cargo build --release
# 使用一个轻量级的基础镜像进行部署
FROM debian:bullseye-slim
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
COPY /usr/src/grpc-k8s-proxy/target/release/grpc-k8s-proxy /usr/local/bin/grpc-k8s-proxy
CMD ["grpc-k8s-proxy"]
Kubernetes Deployment YAML:
假设我们有一个名为 my-app
的应用,它需要调用 user-service
。
apiVersion: v1
kind: ServiceAccount
metadata:
name: grpc-proxy-sa
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: endpointslice-watcher
rules:
- apiGroups: ["discovery.k8s.io"]
resources: ["endpointslices"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: grpc-proxy-rolebinding
subjects:
- kind: ServiceAccount
name: grpc-proxy-sa
namespace: default # 替换为你的命名空间
roleRef:
kind: ClusterRole
name: endpointslice-watcher
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-app-deployment
spec:
replicas: 2
selector:
matchLabels:
app: my-app
template:
metadata:
labels:
app: my-app
spec:
serviceAccountName: grpc-proxy-sa
containers:
- name: my-app
image: my-app-image:latest
# 应用需要将所有 gRPC 请求发送到 localhost:8080
env:
- name: USER_SERVICE_ADDR
value: "http://localhost:8080"
- name: grpc-proxy
image: your-repo/grpc-k8s-proxy:latest
ports:
- containerPort: 8080
env:
- name: RUST_LOG
value: "info,kube=warn" # 配置日志级别
下面的 Mermaid 图清晰地展示了整个请求流程:
graph TD subgraph Pod A[Client Application] P[gRPC Proxy Sidecar] end subgraph Kubernetes Cluster K8S_API[Kubernetes API Server] Svc[Service: user-service] EP1[Pod: user-service-pod-1] EP2[Pod: user-service-pod-2] end A -- "gRPC call to localhost:8080" --> P P -- "1. Watch EndpointSlices" --> K8S_API K8S_API -- "2. Returns IPs for user-service" --> P P -- "3. Forwards gRPC call to healthy Pod IP" --> EP1
这个方案虽然已经可以工作,但在生产环境中仍有其局限性。首先,负载均衡策略过于简单,一个基于 P2C (Power of Two Choices) 或最少连接数的策略会更优。其次,当前实现没有处理 gRPC 的健康检查协议,无法主动剔除不健康的后端实例,只能被动依赖 Kubernetes 的就绪探针。此外,完整的可观测性,例如集成 Prometheus 导出关键指标(请求延迟、成功率、后端连接数)和 OpenTelemetry 进行分布式追踪,对于问题排查和性能分析是必不可少的。未来的迭代方向将是引入 CRD 来定义更复杂的路由规则,例如基于 gRPC Method 或 Header 的路由,以及实现更智能的负载均衡和熔断策略,使其成为一个更完备的微服务基础设施组件。