我们面临一个棘手的工程问题:一个通过RESTful API暴露的科学计算服务,其核心功能是根据用户请求动态生成复杂的数据可视化图像。后台使用Matplotlib进行渲染,部分图像的生成过程计算密集且耗时,可能需要数秒甚至数十秒。将其部署为单体服务,并发能力极差,很快就会成为瓶颈。
初步的架构构想是采用一个请求分发层和一个分布式计算集群。Python技术栈下,Ray因其轻量级和对现有代码极低的侵入性而成为分布式计算的首选。服务本身,包括API网关和Ray集群,都将作为OCI兼容的容器镜像运行在Kubernetes上。
但这立刻引出了核心矛盾:如何对执行Matplotlib渲染任务的Ray worker节点进行弹性伸缩?
常规的Kubernetes HPA(Horizontal Pod Autoscaler)依赖CPU或内存利用率。在我们的场景中,这是一个滞后的、不可靠的指标。一个worker可能因为等待IO而CPU空闲,但任务队列中却堆积了大量请求。反之,一个高CPU的worker可能正在处理最后一个任务,即将进入空闲状态,此时扩容毫无意义。我们需要一个能精准反映系统负载的前瞻性指标——即Ray集群中待处理的任务队列长度。
第一阶段:构建基础服务与暴露自定义指标
在引入复杂的伸缩逻辑之前,必须先有一个稳固运行的基础服务。这包括API服务、Ray集群以及关键的自定义指标导出器。
1. 核心应用代码:FastAPI + Ray + Matplotlib
API层我们选用FastAPI,它性能出色且与Python的类型提示结合良好。Ray的集成方式是,API接收请求后,将耗时的绘图任务作为一个远程函数(@ray.remote
)提交给Ray集群。
# app/main.py
import os
import time
import uuid
import logging
import ray
import numpy as np
import matplotlib.pyplot as plt
from fastapi import FastAPI, HTTPException
from prometheus_client import Gauge, start_http_server
# --- 日志配置 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Prometheus 指标定义 ---
# 该指标将是整个弹性伸缩系统的核心
RAY_PENDING_TASKS_GAUGE = Gauge(
'ray_pending_tasks',
'Number of pending tasks in the Ray cluster queue'
)
# --- Ray 连接与初始化 ---
# 在生产环境中,RAY_ADDRESS 会由 KubeRay Operator 自动注入
ray_address = os.environ.get("RAY_ADDRESS", "auto")
try:
ray.init(address=ray_address, namespace="serving")
logging.info(f"Successfully connected to Ray cluster at {ray.get_runtime_context().dashboard_url}")
except ConnectionError:
logging.error("Failed to connect to Ray cluster. Check RAY_ADDRESS.")
# 在无法连接Ray的情况下,服务应该无法启动
exit(1)
# --- 核心计算任务 ---
@ray.remote(num_cpus=1) # 明确指定资源需求,便于Ray调度
def generate_complex_plot(data_size: int, plot_type: str):
"""
一个模拟耗时且计算密集的Matplotlib绘图任务
"""
try:
start_time = time.time()
# 模拟数据生成
x = np.linspace(0, 10, data_size)
y = np.sin(x) + np.random.rand(data_size) * 0.2
fig, ax = plt.subplots()
if plot_type == 'scatter':
ax.scatter(x, y, alpha=0.6)
else:
ax.plot(x, y)
ax.set_title(f'Complex Plot ({plot_type}) with {data_size} points')
ax.set_xlabel('X-axis')
ax.set_ylabel('Y-axis')
ax.grid(True)
# 将图像保存到内存中的字节流
# 在真实项目中,这里可能会保存到对象存储
from io import BytesIO
buf = BytesIO()
fig.savefig(buf, format='png')
plt.close(fig) # 关键:必须关闭图形以释放内存
processing_time = time.time() - start_time
logging.info(f"Plot generation took {processing_time:.2f} seconds.")
return buf.getvalue()
except Exception as e:
logging.error(f"Error during plot generation: {e}", exc_info=True)
# 确保即使任务失败,也能返回一个错误标识
return None
# --- API 应用实例 ---
app = FastAPI()
# --- 任务状态存储 ---
# 在真实项目中,应该使用Redis或类似数据库来持久化任务状态
# 这里为了简化,使用内存字典
task_results = {}
@app.post("/v1/plots/async")
async def create_plot_task(data_size: int = 10000, plot_type: str = 'line'):
"""
异步提交绘图任务
"""
if not 100 <= data_size <= 500000:
raise HTTPException(status_code=400, detail="data_size must be between 100 and 500000")
if plot_type not in ['line', 'scatter']:
raise HTTPException(status_code=400, detail="plot_type must be 'line' or 'scatter'")
# 将任务提交给Ray,并立即返回任务ID
task_ref = generate_complex_plot.remote(data_size, plot_type)
task_id = str(uuid.uuid4())
# 将ObjectRef存储起来,以便后续查询
task_results[task_id] = task_ref
return {"task_id": task_id, "status": "submitted"}
@app.get("/v1/plots/result/{task_id}")
async def get_plot_result(task_id: str):
"""
根据任务ID获取结果
"""
if task_id not in task_results:
raise HTTPException(status_code=404, detail="Task not found")
task_ref = task_results[task_id]
# 检查任务是否完成,非阻塞
# ray.wait的timeout_seconds=0使其立即返回
ready_refs, remaining_refs = ray.wait([task_ref], timeout=0)
if ready_refs:
try:
# 获取结果
result_png_bytes = ray.get(ready_refs[0])
if result_png_bytes is None:
# 任务内部发生错误
raise HTTPException(status_code=500, detail="Task failed during execution.")
# 清理已完成的任务
del task_results[task_id]
from fastapi.responses import Response
return Response(content=result_png_bytes, media_type="image/png")
except ray.exceptions.RayTaskError as e:
logging.error(f"Task {task_id} failed: {e}")
del task_results[task_id]
raise HTTPException(status_code=500, detail=f"Task execution failed: {e}")
else:
return {"task_id": task_id, "status": "pending"}
# --- 自定义指标导出器 ---
def update_ray_metrics():
"""
定期查询Ray集群状态并更新Prometheus指标
"""
while True:
try:
# ray.state.tasks()可以获取所有任务信息
# 筛选出处于 PENDING 或 RUNNING 状态的任务
# 这里的逻辑是,PENDING状态的任务是等待资源的任务,最能反映队列积压
pending_tasks = [
task for task in ray.state.tasks()
if task['State'] == 'PENDING_ARGS_AVAIL'
]
num_pending = len(pending_tasks)
RAY_PENDING_TASKS_GAUGE.set(num_pending)
logging.info(f"Updated ray_pending_tasks metric: {num_pending}")
except Exception as e:
logging.warning(f"Failed to update Ray metrics: {e}")
time.sleep(15) # 指标更新频率,15秒是一个比较合理的折衷
@app.on_event("startup")
async def startup_event():
# 启动一个后台线程来更新Prometheus指标
import threading
metrics_thread = threading.Thread(target=update_ray_metrics, daemon=True)
metrics_thread.start()
# 启动Prometheus HTTP服务器,用于暴露指标
start_http_server(8001)
logging.info("Prometheus metrics server started on port 8001.")
这段代码的核心在于update_ray_metrics
函数。它通过ray.state.tasks()
查询集群的内部状态,过滤出等待调度的任务,并将其数量通过prometheus_client
库暴露为一个名为ray_pending_tasks
的Gauge
指标。这个指标将成为我们弹性伸缩的决策依据。
2. 容器化 (OCI Image)
构建一个OCI兼容的容器镜像是部署到Kubernetes的前提。Dockerfile需要包含所有依赖,并定义启动命令。
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
# 为了利用构建缓存,先拷贝依赖文件
COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# 拷贝应用代码
COPY ./app /app
# 暴露API端口和指标端口
EXPOSE 8000
EXPOSE 8001
# 默认启动命令是API服务器
# Ray worker的启动命令将在Kubernetes YAML中覆盖
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
requirements.txt
内容如下:
ray[default]
fastapi
uvicorn[standard]
matplotlib
numpy
prometheus_client
requests # 用于示例客户端或测试
第二阶段:Kubernetes部署与服务网格集成
现在,我们将容器化的服务部署到Kubernetes,并利用KubeRay Operator简化Ray集群的管理,同时引入Istio进行服务治理。
graph TD subgraph Kubernetes Cluster subgraph Istio Service Mesh A[External Traffic] --> B[Istio Ingress Gateway]; B --> C{FastAPI Service}; C --> D[Ray Head Service]; end D --> E[Ray Worker Pods]; subgraph Monitoring F[Prometheus] -- Scrapes --> C; F -- Provides Metric --> G[KEDA]; end subgraph Autoscaling G -- Scales --> H[Ray Worker Deployment/StatefulSet]; end end H -- Manages --> E
1. 使用KubeRay Operator部署Ray集群
KubeRay Operator极大地简化了在Kubernetes上部署和管理Ray集群的生命周期。我们只需要定义一个RayCluster
的CRD(Custom Resource Definition)。
# ray-cluster.yaml
apiVersion: ray.io/v1alpha1
kind: RayCluster
metadata:
name: plot-generator-cluster
spec:
# Ray版本
rayVersion: '2.5.0'
# Head节点配置
headGroupSpec:
serviceType: ClusterIP # Head节点对集群内部可见
rayStartParams:
dashboard-host: '0.0.0.0'
num-cpus: '1' # Head节点通常不参与计算
template:
spec:
containers:
- name: ray-head
image: your-repo/plot-generator:latest # 使用我们构建的镜像
imagePullPolicy: Always
ports:
- containerPort: 6379 # GCS Port
- containerPort: 8265 # Dashboard Port
- containerPort: 10001 # Client Port
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 1
memory: 2Gi
# Head节点不需要启动API服务
command: ["/bin/bash", "-c", "ray start --head --port=6379 --dashboard-host=0.0.0.0 --num-cpus=1 --block"]
# Worker节点配置
workerGroupSpecs:
- groupName: small-group
replicas: 1 # 初始副本数
minReplicas: 1 # 最小副本数
maxReplicas: 10 # 最大副本数
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker
image: your-repo/plot-generator:latest
imagePullPolicy: Always
resources:
requests:
cpu: 1
memory: 2Gi
limits:
cpu: 2
memory: 4Gi
# Worker启动时连接到Head
command: ["/bin/bash", "-c", "ray start --address=$RAY_HEAD_IP:6379 --block"]
这里的坑在于,workerGroupSpecs
中的replicas
字段将由我们的自动伸缩器动态管理。minReplicas
和maxReplicas
定义了伸缩的边界。
2. API服务部署与Istio注入
API服务作为一个独立的Deployment部署,并通过环境变量RAY_ADDRESS
连接到由KubeRay Operator创建的Ray Head Service。
# api-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: plot-api-server
labels:
app: plot-api
spec:
replicas: 2
selector:
matchLabels:
app: plot-api
template:
metadata:
labels:
app: plot-api
# Istio自动注入的标签
istio.io/rev: default
annotations:
# 告诉Prometheus去抓取指标
prometheus.io/scrape: 'true'
prometheus.io/path: '/metrics' # prometheus_client默认路径
prometheus.io/port: '8001'
spec:
containers:
- name: api-server
image: your-repo/plot-generator:latest
imagePullPolicy: Always
env:
# KubeRay Operator会自动创建名为<cluster-name>-head-svc的服务
- name: RAY_ADDRESS
value: "plot-generator-cluster-head-svc:10001"
ports:
- containerPort: 8000
name: http-api
- containerPort: 8001
name: http-metrics
resources:
requests:
cpu: 200m
memory: 512Mi
limits:
cpu: 500m
memory: 1Gi
---
apiVersion: v1
kind: Service
metadata:
name: plot-api-svc
spec:
selector:
app: plot-api
ports:
- name: http
port: 80
targetPort: 8000
通过在Pod模板中添加istio.io/rev: default
标签,Istio的sidecar-injector会自动为我们的API Pod注入Envoy代理。这立即带来了服务间mTLS加密、遥测数据收集和高级流量控制能力,而无需修改任何应用代码。
第三阶段:配置KEDA实现智能伸缩
KEDA (Kubernetes Event-driven Autoscaling) 是一个完美的工具,它扩展了Kubernetes的能力,允许我们根据各种事件源(包括Prometheus查询)来驱动Pod的伸缩。
我们需要创建一个ScaledObject
资源,它将连接Prometheus、我们的自定义指标和RayCluster的worker数量。
# keda-scaler.yaml
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: ray-worker-scaler
namespace: default
spec:
scaleTargetRef:
# 关键:KEDA可以直接伸缩自定义资源,这里我们指向KubeRay创建的RayCluster
apiVersion: ray.io/v1alpha1
kind: RayCluster
name: plot-generator-cluster
# envSourceContainerName: ray-worker # for certain scaler types
# 指示KEDA修改RayCluster对象中哪个路径下的副本数字段
# 对于KubeRay,路径是 spec.workerGroupSpecs[0].replicas
# KEDA目前对CRD的伸缩需要更明确的subresource支持,如果直接修改不生效,备用方案是伸缩一个Deployment,然后用一个operator同步到RayCluster
# UPDATE: KEDA 2.0+ 配合 KubeRay 0.4.0+ 能够更好地协作,但最稳妥的方式是伸缩Ray worker的Deployment/StatefulSet。
# KubeRay Operator会创建名为 ray-<cluster-name>-worker-<group-name> 的Deployment。
# scaleTargetRef:
# name: ray-plot-generator-cluster-worker-small-group # 这才是KubeRay operator创建的实际Deployment
pollingInterval: 20 # 每20秒查询一次指标
cooldownPeriod: 120 # 缩容前的冷却时间,防止流量波动导致频繁缩容
minReplicaCount: 1 # 对应RayCluster中的minReplicas
maxReplicaCount: 10 # 对应RayCluster中的maxReplicas
triggers:
- type: prometheus
metadata:
serverAddress: http://prometheus-k8s.monitoring.svc.cluster.local:9090 # Prometheus服务地址
# PromQL查询,计算待处理任务总数
query: |
sum(ray_pending_tasks{service="plot-api-svc"})
# 阈值:平均每个worker处理2个积压任务时就考虑扩容
# 例如,如果有2个worker,积压任务达到4个时,KEDA会计算出需要 (4/2)=2个副本,与当前副本数相同,不扩容。
# 如果积压任务达到5个,KEDA计算出需要(5/2) -> ceil(2.5) = 3个副本,触发扩容。
threshold: '2'
# safe to scale to zero
activationThreshold: '0'
一个重要的实现细节: KEDA scaleTargetRef
直接指向RayCluster
CRD的能力可能受版本限制。一个更通用和健壮的方法是让KEDA伸缩由KubeRay Operator为worker group创建的那个Deployment
。KubeRay Operator会监听这个Deployment
的replicas
变化并相应地调整Ray集群。这个Deployment的命名通常是ray-<cluster-name>-worker-<group-name>
。上述YAML注释中已指明此备选方案。
配置好ServiceMonitor
让Prometheus能够抓取到plot-api-svc
暴露的/metrics
端点后,整个闭环就形成了:
- 外部请求涌入,通过FastAPI提交大量
generate_complex_plot
任务到Ray。 - 由于worker资源有限,任务开始在Ray内部队列中积压。
- FastAPI服务中的指标导出器检测到
PENDING
状态的任务增多,ray_pending_tasks
指标值上升。 - Prometheus抓取到这个升高的指标。
- KEDA定期查询Prometheus,发现指标值超过了
threshold * 当前副本数
。 - KEDA修改Ray worker Deployment的
replicas
数量。 - Kubernetes响应并创建新的Ray worker Pod。
- 新worker启动后加入Ray集群,开始处理积压的任务。
- 任务队列长度下降,
ray_pending_tasks
指标回落,系统达到新的平衡。 - 流量高峰过后,指标值下降,KEDA在
cooldownPeriod
后开始缩减worker数量,直至minReplicaCount
。
方案的局限性与未来迭代路径
此架构虽然实现了基于业务负载的智能伸缩,但在生产环境中仍有几个需要注意的局限性和优化点。
首先,指标的准确性至关重要。ray.state.tasks()
在集群规模非常大时可能会有性能开销。更优化的方式是利用Ray内置的metrics系统,如果它能直接暴露队列长度,则可以省去我们自定义的查询逻辑。
其次,缩容策略需要非常谨慎。KEDA触发缩容时,Kubernetes会随机终止一个worker Pod。如果这个Pod正在执行一个耗时很长的任务,任务会失败。Ray的容错机制可以重新提交该任务,但这造成了计算资源的浪费。一个更优雅的缩容方案需要实现Pod的preStop
钩子,通知Ray节点优雅下线(draining),即不再接受新任务,并在完成所有现有任务后再退出。
最后,Istio的引入虽然带来了诸多好处,但也增加了系统的复杂性和资源开销。对于每个Pod,都需要一个sidecar代理,这会消耗额外的CPU和内存。在性能极其敏感的场景下,需要评估这种开销是否可以接受,或者考虑使用更轻量级的服务网格方案,甚至是eBPF等技术在网络层面实现部分功能。当前的实现也没有利用Istio的流量管理能力来做更复杂的发布,例如,可以部署一个新版本的绘图worker,通过Istio的流量规则将一小部分请求路由到新版本,进行金丝雀发布验证。