实现基于 KEDA 自定义指标的 Elixir SSR 应用在 Azure K8s 上的状态感知自动伸缩


我们团队的一个核心产品,其前端交互体验是基于 Elixir 的 Phoenix LiveView 构建的。这种选择为我们带来了服务端渲染 (SSR) 的所有好处,同时通过持久化的 WebSocket 连接提供了类似 SPA 的富交互体验。最初,我们将其部署在 Azure 的几台固定规模的虚拟机上,运行稳定。但随着用户量的快速增长,固定的资源池很快成为了瓶颈。流量高峰期,服务器 CPU 飙升,响应延迟急剧增加;而在夜间,大量资源又处于闲置状态,造成了不小的成本浪费。

上云原生,迁移到 Azure Kubernetes Service (AKS) 似乎是理所当然的下一步。然而,一个棘手的问题很快浮现:如何为这种有状态连接的 SSR 应用实现真正有效的弹性伸缩?

在真实项目中,标准的 Kubernetes HPA (Horizontal Pod Autoscaler) 几乎无法胜任这个任务。HPA 主要依赖 CPU 和内存使用率作为伸缩指标。对于 Phoenix LiveView 应用而言,一个 Pod 可能承载了 1000 个活跃的 WebSocket 连接,但其 CPU 占用率可能只有 30%。而另一个新启动的 Pod 可能只有 50 个连接,CPU 却因为其他后台任务而达到 60%。基于平均 CPU 使用率的伸缩决策在这种场景下是完全失准的。

更致命的是缩容(Scale-in)过程。当 HPA 决定终止一个 Pod 时,Kubernetes 会直接发送 SIGTERM 信号。对于一个承载着数百个活跃用户会话的 Pod 来说,这意味着所有连接瞬间被切断,用户正在进行的操作被粗暴打断。这种体验是生产环境绝对无法接受的。我们需要一种更智能的、能够感知应用内部状态的伸缩机制。

初步的构想是,伸缩决策必须基于对我们最重要的业务指标:每个 Pod 承载的活跃用户连接数。而缩容过程必须是“优雅”的,即在 Pod 终止前,必须先将其中的用户会话平滑地迁移出去。

这个构想将我们引向了一个新的技术组合:使用 Prometheus 暴露 Elixir 应用的内部指标,并利用 KEDA (Kubernetes Event-driven Autoscaling) 来消费这些自定义指标,从而驱动更智能的伸缩。同时,我们需要深度利用 Kubernetes 的生命周期钩子(Lifecycle Hooks)与 Elixir BEAM 虚拟机的能力,实现一个优雅的连接排空(Connection Draining)机制。

第一步:从 Elixir 应用内部暴露连接指标

万事开头难。要实现基于连接数的伸缩,首先得让 Kubernetes 集群的“眼睛”——Prometheus,能够“看到”每个 Elixir Pod 内部的实时连接数。我们选用了 prom_ex 这个库,它可以与 Elixir 的 telemetry 事件系统深度集成,将应用内部的各种指标转化为 Prometheus 格式。

我们需要创建一个自定义的 prom_ex 插件,用于追踪 Phoenix Channel 的连接与断开事件。当一个用户通过 WebSocket 连接到我们的 LiveView 时,Phoenix 会启动一个 Channel 进程。我们只需要统计这些进程的数量即可。

# lib/my_app/metrics.ex
defmodule MyApp.Metrics do
  use GenServer
  require Logger

  # prom_ex acks all telemetry events that it handles, so we set a timeout
  # to warn us if we are not receiving events.
  @receive_timeout :timer.seconds(60)

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @impl true
  def init(_opts) do
    # Attach to Phoenix endpoint telemetry events for channel connections
    :telemetry.attach_many(
      "metrics-handler",
      [
        [:phoenix, :channel, :join],
        [:phoenix, :channel, :leave]
      ],
      &__MODULE__.handle_telemetry_event/4,
      nil
    )

    Logger.info("Metrics handler attached to Phoenix telemetry events.")
    {:ok, %{}}
  end

  @impl true
  def handle_call(:get_active_channels, _from, state) do
    {:reply, PromEx.get_metric_value(:my_app, :active_phoenix_channels), state}
  end

  def handle_telemetry_event([:phoenix, :channel, :join], _measurements, _metadata, _config) do
    # When a user joins a channel, increment the gauge.
    PromEx.inc(:my_app, :active_phoenix_channels, 1)
  end

  def handle_telemetry_event([:phoenix, :channel, :leave], _measurements, _metadata, _config) do
    # When a user leaves a channel, decrement the gauge.
    # We must ensure the gauge doesn't go below zero in case of weird race conditions.
    current_value = PromEx.get_metric_value(:my_app, :active_phoenix_channels)
    if current_value > 0 do
      PromEx.dec(:my_app, :active_phoenix_channels, 1)
    end
  end
end

# lib/my_app/application.ex
defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      # ... other children
      MyApp.Repo,
      MyAppWeb.Endpoint,
      {PromEx, prometheus: [endpoint: MyAppWeb.Endpoint]},
      MyApp.Metrics # Start our custom metrics handler
    ]

    # ... supervisor setup
  end
end

# lib/my_app/prom_ex.ex
defmodule MyApp.PromEx do
  use PromEx, otp_app: :my_app

  @impl true
  def plugins do
    [
      # ... other default plugins like BEAM, Ecto, Phoenix
      {PromEx.Plugins.Application, registered_name: MyApp.Metrics},
      # Define our custom metric
      %PromEx.Metric.Gauge{
        metric_name: :active_phoenix_channels,
        description: "The number of currently active Phoenix channels.",
        source: self() # This metric is managed manually by our handler
      }
    ]
  end
end

这段代码的核心在于 MyApp.Metrics 模块。它作为一个 GenServer,在启动时通过 :telemetry.attach_many 监听了 Phoenix Channel 的 :join:leave 事件。每当有用户连接或断开,handle_telemetry_event/4 就会被调用,相应地增加或减少我们定义的 active_phoenix_channels 这个 Prometheus Gauge 指标。

最后,在 MyApp.PromEx 模块中,我们正式定义了这个 Gauge 指标。这样,当 Prometheus 来抓取 /metrics 端点时,就能获取到这个实时更新的连接数。

第二步:在 AKS 上部署 Prometheus 并抓取指标

接下来是在 AKS 集群中配置 Prometheus 来发现并抓取我们 Elixir 应用暴露的指标。我们使用 prometheus-community/kube-prometheus-stack 这个 Helm chart,它可以一键部署 Prometheus、Grafana 和 Alertmanager。

关键在于创建一个 ServiceMonitor 对象,它会告诉 Prometheus Operator 如何找到我们的应用 Pod 并抓取它们的 /metrics 端点。

# prometheus-servicemonitor.yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: elixir-ssr-app-monitor
  namespace: my-app-namespace
  labels:
    # This label must match the 'serviceMonitorSelector' in your Prometheus Operator configuration
    release: prometheus
spec:
  selector:
    matchLabels:
      # This label must match the label on your application's Service
      app: elixir-ssr-app
  namespaceSelector:
    matchNames:
      - my-app-namespace
  endpoints:
  - port: http # The name of the port in your Service definition
    path: /metrics # The path where metrics are exposed
    interval: 15s # Scrape interval

将这个 ServiceMonitor 应用到集群后,Prometheus 就会开始每 15 秒从所有带有 app: elixir-ssr-app 标签的 Service 关联的 Pod 中抓取指标。我们可以在 Prometheus 的 UI 中查询 active_phoenix_channels 来验证数据是否被正确采集。

第三步:配置 KEDA 实现自定义指标伸缩

有了可靠的数据源,现在就可以请 KEDA 登场了。KEDA 的核心是一个名为 ScaledObject 的 CRD (Custom Resource Definition)。我们通过定义一个 ScaledObject 来描述伸缩规则。

# keda-scaledobject.yaml
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: elixir-ssr-app-scaler
  namespace: my-app-namespace
spec:
  scaleTargetRef:
    name: elixir-ssr-app # The name of the Deployment to scale
  minReplicas: 2
  maxReplicas: 20
  cooldownPeriod: 60  # Cooldown period after the last trigger
  pollingInterval: 30 # How often to check the trigger
  triggers:
  - type: prometheus
    metadata:
      serverAddress: http://prometheus-kube-prometheus-prometheus.monitoring.svc.cluster.local:9090
      # We want to scale when the AVERAGE number of connections per pod exceeds 500
      query: |
        sum(active_phoenix_channels{namespace="my-app-namespace"}) / count(kube_pod_info{pod=~"elixir-ssr-app-.*", namespace="my-app-namespace"})
      threshold: '500' # The target value

这个 ScaledObject 的配置是整个方案的核心。

  • scaleTargetRef: 指定了要伸缩的目标,即我们的 Elixir 应用的 Deployment。
  • minReplicas / maxReplicas: 定义了伸缩的边界。
  • triggers: 定义了伸缩触发器。这里我们使用 prometheus 类型。
    • serverAddress: 指向集群内部的 Prometheus 服务。
    • query: 这是最关键的部分,一个 PromQL 查询。sum(active_phoenix_channels{...}) 计算了所有 Pod 的总连接数,然后除以 count(kube_pod_info{...}) (即当前运行的 Pod 数量),得到了每个 Pod 的平均连接数。
    • threshold: 阈值。当上述查询结果超过 500 时,KEDA 就会指示 Kubernetes 增加 Pod 数量,直到平均值降到 500 以下。反之,当平均值远低于 500 时,它会触发缩容。

部署这个 ScaledObject 后,KEDA 会接管 Deployment 的 replicas 字段。它会持续执行 PromQL 查询,并根据结果动态调整 Pod 数量。我们终于有了一个能理解“用户负载”的伸缩系统。

第四步:实现优雅停机与连接排空

解决了“如何伸”,接下来是更困难的“如何缩”。当 KEDA 决定减少一个 Pod 时,Kubernetes 的标准流程是:

  1. 从 Service 的 Endpoints 中移除该 Pod。
  2. 向 Pod 内的主进程发送 SIGTERM 信号。
  3. 等待 terminationGracePeriodSeconds (默认为 30 秒)。
  4. 如果进程还未退出,发送 SIGKILL 强制终止。

我们必须在这个流程中插入我们的“连接排空”逻辑。这需要 Kubernetes 和 Elixir 应用的协同工作。

首先,修改 Kubernetes Deployment,增加一个 preStop 生命周期钩子。

# elixir-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: elixir-ssr-app
  namespace: my-app-namespace
spec:
  replicas: 2 # Initial replicas, KEDA will control this later
  template:
    spec:
      # This grace period MUST be long enough for draining to complete
      terminationGracePeriodSeconds: 120 
      containers:
      - name: app
        image: my-registry/elixir-ssr-app:latest
        lifecycle:
          preStop:
            exec:
              # This command is executed before SIGTERM is sent
              command: ["/app/bin/drain.sh"]
        ports:
        - containerPort: 80
          name: http
        # ... other configurations

这里的关键是 preStop 钩子和 terminationGracePeriodSeconds。当 Kubernetes 决定终止这个 Pod 时,它会首先执行 /app/bin/drain.sh 脚本,并且给予 Pod 总共 120 秒的时间来完成关闭。

drain.sh 脚本的作用是通知正在运行的 Elixir 节点开始排空。

#!/bin/sh
# /app/bin/drain.sh

# Use Elixir's remote shell to call a function on the running node.
# The node name is configured via environment variables.
/app/bin/my_app remote "MyApp.ClusterManager.start_drain()"

echo "Drain command sent to Elixir node. Waiting for shutdown..."
# The script can exit here. Kubernetes will then send SIGTERM.
# The real waiting happens inside the Elixir VM.

最后,是 Elixir 端的实现。我们需要一个 ClusterManager 来处理这个排空指令。

# lib/my_app/cluster_manager.ex
defmodule MyApp.ClusterManager do
  use GenServer
  require Logger

  def start_link(_opts) do
    GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  # This is called remotely by the drain.sh script
  def start_drain do
    GenServer.cast(__MODULE__, :start_drain)
  end

  @impl true
  def init(:ok) do
    # Set initial state to accepting connections
    Process.put(:is_draining, false)
    {:ok, %{}}
  end

  @impl true
  def handle_cast(:start_drain, state) do
    Logger.warn("Node is entering drain mode. No new connections will be accepted.")
    Process.put(:is_draining, true)

    # Broadcast a reconnect message to all connected clients
    MyAppWeb.Endpoint.broadcast("system:events", "system:reconnect", %{})
    
    # Start polling for active connections to drop to zero
    Process.send_after(self(), :check_drain_status, 1_000)
    
    {:noreply, state}
  end

  @impl true
  def handle_info(:check_drain_status, state) do
    active_channels = PromEx.get_metric_value(:my_app, :active_phoenix_channels)

    if active_channels > 0 do
      Logger.info("Draining... #{active_channels} connections remaining.")
      # Check again in a second
      Process.send_after(self(), :check_drain_status, 1_000)
    else
      Logger.warn("All connections drained. Shutting down node.")
      # This will initiate a clean shutdown of the BEAM VM
      :init.stop()
    end

    {:noreply, state}
  end
end

# We also need a Plug to reject new connections during drain
# lib/my_app_web/plugs/drain_plug.ex
defmodule MyAppWeb.Plugs.DrainPlug do
  import Plug.Conn

  def init(options), do: options

  def call(conn, _opts) do
    if Process.get(:is_draining, false) do
      conn
      |> put_resp_content_type("text/plain")
      |> send_resp(503, "Service is shutting down, please reconnect.")
      |> halt()
    else
      conn
    end
  end
end

# And in endpoint.ex, add it to the pipeline
# lib/my_app_web/endpoint.ex
plug MyAppWeb.Plugs.DrainPlug

start_drain/0 被调用时:

  1. ClusterManager 将一个进程字典标志 :is_draining 设置为 true。我们的 DrainPlug 会检查这个标志,并对任何新的 HTTP 请求返回 503 Service Unavailable,这能有效阻止新的 WebSocket 连接建立。
  2. 它通过 Phoenix Endpoint 向所有已连接的客户端广播一个 "system:reconnect" 消息。
  3. 前端的 JavaScript 代码需要监听这个消息,并在收到后主动关闭当前 WebSocket 连接,然后立即重新发起连接。由于旧 Pod 已经不在 Kubernetes Service 的负载均衡池中,新的连接请求会被路由到其他健康的 Pod 上。
  4. ClusterManager 开始每秒轮询一次活跃连接数。一旦连接数降为零,它就调用 :init.stop(),这会触发整个 BEAM 虚拟机的有序关闭,进程退出码为 0,Kubernetes 会认为这是一个成功的关闭。

这个机制确保了在 Pod 因为缩容而被终止时,用户会经历一个几乎无感的重连过程,而不是一次粗暴的断开。

整个伸缩流程的可视化

为了更清晰地理解整个系统的工作方式,我们可以用 Mermaid 图来描绘这个流程。

sequenceDiagram
    participant User
    participant K8sService
    participant ElixirPod
    participant Prometheus
    participant KEDA
    participant K8sApiServer

    User->>+K8sService: Connect (WebSocket)
    K8sService->>+ElixirPod: Route connection
    ElixirPod-->>Prometheus: Exposes active_channels metric
    
    loop Scaling Check
        KEDA->>+Prometheus: Query avg(active_channels)
        Prometheus-->>-KEDA: Returns value (e.g., 600)
        Note over KEDA: Avg connections (600) > Threshold (500)
        KEDA->>+K8sApiServer: Request to scale Deployment (N -> N+1)
        K8sApiServer-->>-KEDA: Acknowledges
    end

    loop Graceful Shutdown (Scale-in)
        KEDA->>+Prometheus: Query avg(active_channels)
        Prometheus-->>-KEDA: Returns value (e.g., 300)
        Note over KEDA: Avg connections (300) < Threshold (500)
        KEDA->>+K8sApiServer: Request to scale Deployment (N -> N-1)
        K8sApiServer->>ElixirPod: Initiate termination on one pod
        Note over ElixirPod: K8s executes preStop hook
        ElixirPod->>ElixirPod: start_drain() is called
        ElixirPod->>User: Push "system:reconnect" event
        User->>User: Close old WebSocket, open new
        User->>+K8sService: Reconnect request
        K8sService->>+ElixirPod: Route to a healthy pod
        Note over ElixirPod: Draining pod waits for connections to be 0
        ElixirPod->>K8sApiServer: Exits gracefully (code 0)
    end

这个架构的最终成果是一个能够根据真实用户负载自我调整资源,并在缩容时对用户体验影响最小化的系统。它不是一个简单的“部署上 K8s”方案,而是深度结合了 Elixir/BEAM、Prometheus、KEDA 和 Kubernetes 自身机制,针对特定应用场景(有状态 SSR)的定制化运维方案。

当然,这个方案并非没有局限性。客户端的重连逻辑会引入一个短暂的连接中断,对于某些对实时性要求极高的应用(如在线游戏)可能仍然不够完美。terminationGracePeriodSeconds 是一个固定的超时时间,如果由于网络或其他问题导致连接排空速度过慢,超出这个时间后 Pod 依然会被强制终止。更复杂的方案可能需要引入服务网格(Service Mesh)来做更精细的流量控制,或者在 Elixir 层实现进程状态的跨节点迁移,但这些都会带来指数级增长的复杂性。对于绝大多数 Web 应用而言,当前的连接排空模型是在运维复杂度和用户体验之间一个非常务实的平衡点。


  目录