基于自定义任务队列指标的Ray分布式计算集群弹性伸缩实现


我们面临一个棘手的工程问题:一个通过RESTful API暴露的科学计算服务,其核心功能是根据用户请求动态生成复杂的数据可视化图像。后台使用Matplotlib进行渲染,部分图像的生成过程计算密集且耗时,可能需要数秒甚至数十秒。将其部署为单体服务,并发能力极差,很快就会成为瓶颈。

初步的架构构想是采用一个请求分发层和一个分布式计算集群。Python技术栈下,Ray因其轻量级和对现有代码极低的侵入性而成为分布式计算的首选。服务本身,包括API网关和Ray集群,都将作为OCI兼容的容器镜像运行在Kubernetes上。

但这立刻引出了核心矛盾:如何对执行Matplotlib渲染任务的Ray worker节点进行弹性伸缩?

常规的Kubernetes HPA(Horizontal Pod Autoscaler)依赖CPU或内存利用率。在我们的场景中,这是一个滞后的、不可靠的指标。一个worker可能因为等待IO而CPU空闲,但任务队列中却堆积了大量请求。反之,一个高CPU的worker可能正在处理最后一个任务,即将进入空闲状态,此时扩容毫无意义。我们需要一个能精准反映系统负载的前瞻性指标——即Ray集群中待处理的任务队列长度。

第一阶段:构建基础服务与暴露自定义指标

在引入复杂的伸缩逻辑之前,必须先有一个稳固运行的基础服务。这包括API服务、Ray集群以及关键的自定义指标导出器。

1. 核心应用代码:FastAPI + Ray + Matplotlib

API层我们选用FastAPI,它性能出色且与Python的类型提示结合良好。Ray的集成方式是,API接收请求后,将耗时的绘图任务作为一个远程函数(@ray.remote)提交给Ray集群。

# app/main.py
import os
import time
import uuid
import logging
import ray
import numpy as np
import matplotlib.pyplot as plt
from fastapi import FastAPI, HTTPException
from prometheus_client import Gauge, start_http_server

# --- 日志配置 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Prometheus 指标定义 ---
# 该指标将是整个弹性伸缩系统的核心
RAY_PENDING_TASKS_GAUGE = Gauge(
    'ray_pending_tasks',
    'Number of pending tasks in the Ray cluster queue'
)

# --- Ray 连接与初始化 ---
# 在生产环境中,RAY_ADDRESS 会由 KubeRay Operator 自动注入
ray_address = os.environ.get("RAY_ADDRESS", "auto")
try:
    ray.init(address=ray_address, namespace="serving")
    logging.info(f"Successfully connected to Ray cluster at {ray.get_runtime_context().dashboard_url}")
except ConnectionError:
    logging.error("Failed to connect to Ray cluster. Check RAY_ADDRESS.")
    # 在无法连接Ray的情况下,服务应该无法启动
    exit(1)


# --- 核心计算任务 ---
@ray.remote(num_cpus=1) # 明确指定资源需求,便于Ray调度
def generate_complex_plot(data_size: int, plot_type: str):
    """
    一个模拟耗时且计算密集的Matplotlib绘图任务
    """
    try:
        start_time = time.time()
        # 模拟数据生成
        x = np.linspace(0, 10, data_size)
        y = np.sin(x) + np.random.rand(data_size) * 0.2
        
        fig, ax = plt.subplots()
        if plot_type == 'scatter':
            ax.scatter(x, y, alpha=0.6)
        else:
            ax.plot(x, y)
        
        ax.set_title(f'Complex Plot ({plot_type}) with {data_size} points')
        ax.set_xlabel('X-axis')
        ax.set_ylabel('Y-axis')
        ax.grid(True)
        
        # 将图像保存到内存中的字节流
        # 在真实项目中,这里可能会保存到对象存储
        from io import BytesIO
        buf = BytesIO()
        fig.savefig(buf, format='png')
        plt.close(fig) # 关键:必须关闭图形以释放内存
        
        processing_time = time.time() - start_time
        logging.info(f"Plot generation took {processing_time:.2f} seconds.")
        
        return buf.getvalue()
    except Exception as e:
        logging.error(f"Error during plot generation: {e}", exc_info=True)
        # 确保即使任务失败,也能返回一个错误标识
        return None


# --- API 应用实例 ---
app = FastAPI()

# --- 任务状态存储 ---
# 在真实项目中,应该使用Redis或类似数据库来持久化任务状态
# 这里为了简化,使用内存字典
task_results = {}

@app.post("/v1/plots/async")
async def create_plot_task(data_size: int = 10000, plot_type: str = 'line'):
    """
    异步提交绘图任务
    """
    if not 100 <= data_size <= 500000:
        raise HTTPException(status_code=400, detail="data_size must be between 100 and 500000")
    if plot_type not in ['line', 'scatter']:
        raise HTTPException(status_code=400, detail="plot_type must be 'line' or 'scatter'")

    # 将任务提交给Ray,并立即返回任务ID
    task_ref = generate_complex_plot.remote(data_size, plot_type)
    task_id = str(uuid.uuid4())
    
    # 将ObjectRef存储起来,以便后续查询
    task_results[task_id] = task_ref
    
    return {"task_id": task_id, "status": "submitted"}


@app.get("/v1/plots/result/{task_id}")
async def get_plot_result(task_id: str):
    """
    根据任务ID获取结果
    """
    if task_id not in task_results:
        raise HTTPException(status_code=404, detail="Task not found")
        
    task_ref = task_results[task_id]
    
    # 检查任务是否完成,非阻塞
    # ray.wait的timeout_seconds=0使其立即返回
    ready_refs, remaining_refs = ray.wait([task_ref], timeout=0)
    
    if ready_refs:
        try:
            # 获取结果
            result_png_bytes = ray.get(ready_refs[0])
            if result_png_bytes is None:
                # 任务内部发生错误
                raise HTTPException(status_code=500, detail="Task failed during execution.")
                
            # 清理已完成的任务
            del task_results[task_id]
            
            from fastapi.responses import Response
            return Response(content=result_png_bytes, media_type="image/png")
        except ray.exceptions.RayTaskError as e:
            logging.error(f"Task {task_id} failed: {e}")
            del task_results[task_id]
            raise HTTPException(status_code=500, detail=f"Task execution failed: {e}")
    else:
        return {"task_id": task_id, "status": "pending"}

# --- 自定义指标导出器 ---
def update_ray_metrics():
    """
    定期查询Ray集群状态并更新Prometheus指标
    """
    while True:
        try:
            # ray.state.tasks()可以获取所有任务信息
            # 筛选出处于 PENDING 或 RUNNING 状态的任务
            # 这里的逻辑是,PENDING状态的任务是等待资源的任务,最能反映队列积压
            pending_tasks = [
                task for task in ray.state.tasks() 
                if task['State'] == 'PENDING_ARGS_AVAIL'
            ]
            num_pending = len(pending_tasks)
            RAY_PENDING_TASKS_GAUGE.set(num_pending)
            logging.info(f"Updated ray_pending_tasks metric: {num_pending}")
        except Exception as e:
            logging.warning(f"Failed to update Ray metrics: {e}")
        time.sleep(15) # 指标更新频率,15秒是一个比较合理的折衷

@app.on_event("startup")
async def startup_event():
    # 启动一个后台线程来更新Prometheus指标
    import threading
    metrics_thread = threading.Thread(target=update_ray_metrics, daemon=True)
    metrics_thread.start()
    
    # 启动Prometheus HTTP服务器,用于暴露指标
    start_http_server(8001)
    logging.info("Prometheus metrics server started on port 8001.")

这段代码的核心在于update_ray_metrics函数。它通过ray.state.tasks()查询集群的内部状态,过滤出等待调度的任务,并将其数量通过prometheus_client库暴露为一个名为ray_pending_tasksGauge指标。这个指标将成为我们弹性伸缩的决策依据。

2. 容器化 (OCI Image)

构建一个OCI兼容的容器镜像是部署到Kubernetes的前提。Dockerfile需要包含所有依赖,并定义启动命令。

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

# 为了利用构建缓存,先拷贝依赖文件
COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

# 拷贝应用代码
COPY ./app /app

# 暴露API端口和指标端口
EXPOSE 8000
EXPOSE 8001

# 默认启动命令是API服务器
# Ray worker的启动命令将在Kubernetes YAML中覆盖
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

requirements.txt 内容如下:

ray[default]
fastapi
uvicorn[standard]
matplotlib
numpy
prometheus_client
requests # 用于示例客户端或测试

第二阶段:Kubernetes部署与服务网格集成

现在,我们将容器化的服务部署到Kubernetes,并利用KubeRay Operator简化Ray集群的管理,同时引入Istio进行服务治理。

graph TD
    subgraph Kubernetes Cluster
        subgraph Istio Service Mesh
            A[External Traffic] --> B[Istio Ingress Gateway];
            B --> C{FastAPI Service};
            C --> D[Ray Head Service];
        end
        D --> E[Ray Worker Pods];
        
        subgraph Monitoring
            F[Prometheus] -- Scrapes --> C;
            F -- Provides Metric --> G[KEDA];
        end

        subgraph Autoscaling
            G -- Scales --> H[Ray Worker Deployment/StatefulSet];
        end
    end
    H -- Manages --> E

1. 使用KubeRay Operator部署Ray集群

KubeRay Operator极大地简化了在Kubernetes上部署和管理Ray集群的生命周期。我们只需要定义一个RayCluster的CRD(Custom Resource Definition)。

# ray-cluster.yaml
apiVersion: ray.io/v1alpha1
kind: RayCluster
metadata:
  name: plot-generator-cluster
spec:
  # Ray版本
  rayVersion: '2.5.0'
  # Head节点配置
  headGroupSpec:
    serviceType: ClusterIP # Head节点对集群内部可见
    rayStartParams:
      dashboard-host: '0.0.0.0'
      num-cpus: '1' # Head节点通常不参与计算
    template:
      spec:
        containers:
          - name: ray-head
            image: your-repo/plot-generator:latest # 使用我们构建的镜像
            imagePullPolicy: Always
            ports:
              - containerPort: 6379 # GCS Port
              - containerPort: 8265 # Dashboard Port
              - containerPort: 10001 # Client Port
            resources:
              requests:
                cpu: 500m
                memory: 1Gi
              limits:
                cpu: 1
                memory: 2Gi
            # Head节点不需要启动API服务
            command: ["/bin/bash", "-c", "ray start --head --port=6379 --dashboard-host=0.0.0.0 --num-cpus=1 --block"]
  # Worker节点配置
  workerGroupSpecs:
  - groupName: small-group
    replicas: 1 # 初始副本数
    minReplicas: 1 # 最小副本数
    maxReplicas: 10 # 最大副本数
    rayStartParams: {}
    template:
      spec:
        containers:
          - name: ray-worker
            image: your-repo/plot-generator:latest
            imagePullPolicy: Always
            resources:
              requests:
                cpu: 1
                memory: 2Gi
              limits:
                cpu: 2
                memory: 4Gi
            # Worker启动时连接到Head
            command: ["/bin/bash", "-c", "ray start --address=$RAY_HEAD_IP:6379 --block"]

这里的坑在于,workerGroupSpecs中的replicas字段将由我们的自动伸缩器动态管理。minReplicasmaxReplicas定义了伸缩的边界。

2. API服务部署与Istio注入

API服务作为一个独立的Deployment部署,并通过环境变量RAY_ADDRESS连接到由KubeRay Operator创建的Ray Head Service。

# api-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: plot-api-server
  labels:
    app: plot-api
spec:
  replicas: 2
  selector:
    matchLabels:
      app: plot-api
  template:
    metadata:
      labels:
        app: plot-api
        # Istio自动注入的标签
        istio.io/rev: default
      annotations:
        # 告诉Prometheus去抓取指标
        prometheus.io/scrape: 'true'
        prometheus.io/path: '/metrics' # prometheus_client默认路径
        prometheus.io/port: '8001'
    spec:
      containers:
      - name: api-server
        image: your-repo/plot-generator:latest
        imagePullPolicy: Always
        env:
        # KubeRay Operator会自动创建名为<cluster-name>-head-svc的服务
        - name: RAY_ADDRESS
          value: "plot-generator-cluster-head-svc:10001"
        ports:
        - containerPort: 8000
          name: http-api
        - containerPort: 8001
          name: http-metrics
        resources:
          requests:
            cpu: 200m
            memory: 512Mi
          limits:
            cpu: 500m
            memory: 1Gi
---
apiVersion: v1
kind: Service
metadata:
  name: plot-api-svc
spec:
  selector:
    app: plot-api
  ports:
  - name: http
    port: 80
    targetPort: 8000

通过在Pod模板中添加istio.io/rev: default标签,Istio的sidecar-injector会自动为我们的API Pod注入Envoy代理。这立即带来了服务间mTLS加密、遥测数据收集和高级流量控制能力,而无需修改任何应用代码。

第三阶段:配置KEDA实现智能伸缩

KEDA (Kubernetes Event-driven Autoscaling) 是一个完美的工具,它扩展了Kubernetes的能力,允许我们根据各种事件源(包括Prometheus查询)来驱动Pod的伸缩。

我们需要创建一个ScaledObject资源,它将连接Prometheus、我们的自定义指标和RayCluster的worker数量。

# keda-scaler.yaml
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: ray-worker-scaler
  namespace: default
spec:
  scaleTargetRef:
    # 关键:KEDA可以直接伸缩自定义资源,这里我们指向KubeRay创建的RayCluster
    apiVersion: ray.io/v1alpha1
    kind: RayCluster
    name: plot-generator-cluster
    # envSourceContainerName: ray-worker # for certain scaler types
  # 指示KEDA修改RayCluster对象中哪个路径下的副本数字段
  # 对于KubeRay,路径是 spec.workerGroupSpecs[0].replicas
  # KEDA目前对CRD的伸缩需要更明确的subresource支持,如果直接修改不生效,备用方案是伸缩一个Deployment,然后用一个operator同步到RayCluster
  # UPDATE: KEDA 2.0+ 配合 KubeRay 0.4.0+ 能够更好地协作,但最稳妥的方式是伸缩Ray worker的Deployment/StatefulSet。
  # KubeRay Operator会创建名为 ray-<cluster-name>-worker-<group-name> 的Deployment。
  # scaleTargetRef:
  #   name: ray-plot-generator-cluster-worker-small-group # 这才是KubeRay operator创建的实际Deployment
  
  pollingInterval: 20  # 每20秒查询一次指标
  cooldownPeriod:  120 # 缩容前的冷却时间,防止流量波动导致频繁缩容
  minReplicaCount: 1   # 对应RayCluster中的minReplicas
  maxReplicaCount: 10  # 对应RayCluster中的maxReplicas
  
  triggers:
  - type: prometheus
    metadata:
      serverAddress: http://prometheus-k8s.monitoring.svc.cluster.local:9090 # Prometheus服务地址
      # PromQL查询,计算待处理任务总数
      query: |
        sum(ray_pending_tasks{service="plot-api-svc"})
      # 阈值:平均每个worker处理2个积压任务时就考虑扩容
      # 例如,如果有2个worker,积压任务达到4个时,KEDA会计算出需要 (4/2)=2个副本,与当前副本数相同,不扩容。
      # 如果积压任务达到5个,KEDA计算出需要(5/2) -> ceil(2.5) = 3个副本,触发扩容。
      threshold: '2' 
      # safe to scale to zero
      activationThreshold: '0'

一个重要的实现细节: KEDA scaleTargetRef直接指向RayCluster CRD的能力可能受版本限制。一个更通用和健壮的方法是让KEDA伸缩由KubeRay Operator为worker group创建的那个Deployment。KubeRay Operator会监听这个Deploymentreplicas变化并相应地调整Ray集群。这个Deployment的命名通常是ray-<cluster-name>-worker-<group-name>。上述YAML注释中已指明此备选方案。

配置好ServiceMonitor让Prometheus能够抓取到plot-api-svc暴露的/metrics端点后,整个闭环就形成了:

  1. 外部请求涌入,通过FastAPI提交大量generate_complex_plot任务到Ray。
  2. 由于worker资源有限,任务开始在Ray内部队列中积压。
  3. FastAPI服务中的指标导出器检测到PENDING状态的任务增多,ray_pending_tasks指标值上升。
  4. Prometheus抓取到这个升高的指标。
  5. KEDA定期查询Prometheus,发现指标值超过了threshold * 当前副本数
  6. KEDA修改Ray worker Deployment的replicas数量。
  7. Kubernetes响应并创建新的Ray worker Pod。
  8. 新worker启动后加入Ray集群,开始处理积压的任务。
  9. 任务队列长度下降,ray_pending_tasks指标回落,系统达到新的平衡。
  10. 流量高峰过后,指标值下降,KEDA在cooldownPeriod后开始缩减worker数量,直至minReplicaCount

方案的局限性与未来迭代路径

此架构虽然实现了基于业务负载的智能伸缩,但在生产环境中仍有几个需要注意的局限性和优化点。

首先,指标的准确性至关重要。ray.state.tasks()在集群规模非常大时可能会有性能开销。更优化的方式是利用Ray内置的metrics系统,如果它能直接暴露队列长度,则可以省去我们自定义的查询逻辑。

其次,缩容策略需要非常谨慎。KEDA触发缩容时,Kubernetes会随机终止一个worker Pod。如果这个Pod正在执行一个耗时很长的任务,任务会失败。Ray的容错机制可以重新提交该任务,但这造成了计算资源的浪费。一个更优雅的缩容方案需要实现Pod的preStop钩子,通知Ray节点优雅下线(draining),即不再接受新任务,并在完成所有现有任务后再退出。

最后,Istio的引入虽然带来了诸多好处,但也增加了系统的复杂性和资源开销。对于每个Pod,都需要一个sidecar代理,这会消耗额外的CPU和内存。在性能极其敏感的场景下,需要评估这种开销是否可以接受,或者考虑使用更轻量级的服务网格方案,甚至是eBPF等技术在网络层面实现部分功能。当前的实现也没有利用Istio的流量管理能力来做更复杂的发布,例如,可以部署一个新版本的绘图worker,通过Istio的流量规则将一小部分请求路由到新版本,进行金丝雀发布验证。


  目录