我们团队的一个核心产品,其前端交互体验是基于 Elixir 的 Phoenix LiveView 构建的。这种选择为我们带来了服务端渲染 (SSR) 的所有好处,同时通过持久化的 WebSocket 连接提供了类似 SPA 的富交互体验。最初,我们将其部署在 Azure 的几台固定规模的虚拟机上,运行稳定。但随着用户量的快速增长,固定的资源池很快成为了瓶颈。流量高峰期,服务器 CPU 飙升,响应延迟急剧增加;而在夜间,大量资源又处于闲置状态,造成了不小的成本浪费。
上云原生,迁移到 Azure Kubernetes Service (AKS) 似乎是理所当然的下一步。然而,一个棘手的问题很快浮现:如何为这种有状态连接的 SSR 应用实现真正有效的弹性伸缩?
在真实项目中,标准的 Kubernetes HPA (Horizontal Pod Autoscaler) 几乎无法胜任这个任务。HPA 主要依赖 CPU 和内存使用率作为伸缩指标。对于 Phoenix LiveView 应用而言,一个 Pod 可能承载了 1000 个活跃的 WebSocket 连接,但其 CPU 占用率可能只有 30%。而另一个新启动的 Pod 可能只有 50 个连接,CPU 却因为其他后台任务而达到 60%。基于平均 CPU 使用率的伸缩决策在这种场景下是完全失准的。
更致命的是缩容(Scale-in)过程。当 HPA 决定终止一个 Pod 时,Kubernetes 会直接发送 SIGTERM
信号。对于一个承载着数百个活跃用户会话的 Pod 来说,这意味着所有连接瞬间被切断,用户正在进行的操作被粗暴打断。这种体验是生产环境绝对无法接受的。我们需要一种更智能的、能够感知应用内部状态的伸缩机制。
初步的构想是,伸缩决策必须基于对我们最重要的业务指标:每个 Pod 承载的活跃用户连接数。而缩容过程必须是“优雅”的,即在 Pod 终止前,必须先将其中的用户会话平滑地迁移出去。
这个构想将我们引向了一个新的技术组合:使用 Prometheus 暴露 Elixir 应用的内部指标,并利用 KEDA (Kubernetes Event-driven Autoscaling) 来消费这些自定义指标,从而驱动更智能的伸缩。同时,我们需要深度利用 Kubernetes 的生命周期钩子(Lifecycle Hooks)与 Elixir BEAM 虚拟机的能力,实现一个优雅的连接排空(Connection Draining)机制。
第一步:从 Elixir 应用内部暴露连接指标
万事开头难。要实现基于连接数的伸缩,首先得让 Kubernetes 集群的“眼睛”——Prometheus,能够“看到”每个 Elixir Pod 内部的实时连接数。我们选用了 prom_ex
这个库,它可以与 Elixir 的 telemetry
事件系统深度集成,将应用内部的各种指标转化为 Prometheus 格式。
我们需要创建一个自定义的 prom_ex
插件,用于追踪 Phoenix Channel 的连接与断开事件。当一个用户通过 WebSocket 连接到我们的 LiveView 时,Phoenix 会启动一个 Channel 进程。我们只需要统计这些进程的数量即可。
# lib/my_app/metrics.ex
defmodule MyApp.Metrics do
use GenServer
require Logger
# prom_ex acks all telemetry events that it handles, so we set a timeout
# to warn us if we are not receiving events.
@receive_timeout :timer.seconds(60)
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(_opts) do
# Attach to Phoenix endpoint telemetry events for channel connections
:telemetry.attach_many(
"metrics-handler",
[
[:phoenix, :channel, :join],
[:phoenix, :channel, :leave]
],
&__MODULE__.handle_telemetry_event/4,
nil
)
Logger.info("Metrics handler attached to Phoenix telemetry events.")
{:ok, %{}}
end
@impl true
def handle_call(:get_active_channels, _from, state) do
{:reply, PromEx.get_metric_value(:my_app, :active_phoenix_channels), state}
end
def handle_telemetry_event([:phoenix, :channel, :join], _measurements, _metadata, _config) do
# When a user joins a channel, increment the gauge.
PromEx.inc(:my_app, :active_phoenix_channels, 1)
end
def handle_telemetry_event([:phoenix, :channel, :leave], _measurements, _metadata, _config) do
# When a user leaves a channel, decrement the gauge.
# We must ensure the gauge doesn't go below zero in case of weird race conditions.
current_value = PromEx.get_metric_value(:my_app, :active_phoenix_channels)
if current_value > 0 do
PromEx.dec(:my_app, :active_phoenix_channels, 1)
end
end
end
# lib/my_app/application.ex
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
# ... other children
MyApp.Repo,
MyAppWeb.Endpoint,
{PromEx, prometheus: [endpoint: MyAppWeb.Endpoint]},
MyApp.Metrics # Start our custom metrics handler
]
# ... supervisor setup
end
end
# lib/my_app/prom_ex.ex
defmodule MyApp.PromEx do
use PromEx, otp_app: :my_app
@impl true
def plugins do
[
# ... other default plugins like BEAM, Ecto, Phoenix
{PromEx.Plugins.Application, registered_name: MyApp.Metrics},
# Define our custom metric
%PromEx.Metric.Gauge{
metric_name: :active_phoenix_channels,
description: "The number of currently active Phoenix channels.",
source: self() # This metric is managed manually by our handler
}
]
end
end
这段代码的核心在于 MyApp.Metrics
模块。它作为一个 GenServer
,在启动时通过 :telemetry.attach_many
监听了 Phoenix Channel 的 :join
和 :leave
事件。每当有用户连接或断开,handle_telemetry_event/4
就会被调用,相应地增加或减少我们定义的 active_phoenix_channels
这个 Prometheus Gauge 指标。
最后,在 MyApp.PromEx
模块中,我们正式定义了这个 Gauge 指标。这样,当 Prometheus 来抓取 /metrics
端点时,就能获取到这个实时更新的连接数。
第二步:在 AKS 上部署 Prometheus 并抓取指标
接下来是在 AKS 集群中配置 Prometheus 来发现并抓取我们 Elixir 应用暴露的指标。我们使用 prometheus-community/kube-prometheus-stack
这个 Helm chart,它可以一键部署 Prometheus、Grafana 和 Alertmanager。
关键在于创建一个 ServiceMonitor
对象,它会告诉 Prometheus Operator 如何找到我们的应用 Pod 并抓取它们的 /metrics
端点。
# prometheus-servicemonitor.yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: elixir-ssr-app-monitor
namespace: my-app-namespace
labels:
# This label must match the 'serviceMonitorSelector' in your Prometheus Operator configuration
release: prometheus
spec:
selector:
matchLabels:
# This label must match the label on your application's Service
app: elixir-ssr-app
namespaceSelector:
matchNames:
- my-app-namespace
endpoints:
- port: http # The name of the port in your Service definition
path: /metrics # The path where metrics are exposed
interval: 15s # Scrape interval
将这个 ServiceMonitor
应用到集群后,Prometheus 就会开始每 15 秒从所有带有 app: elixir-ssr-app
标签的 Service 关联的 Pod 中抓取指标。我们可以在 Prometheus 的 UI 中查询 active_phoenix_channels
来验证数据是否被正确采集。
第三步:配置 KEDA 实现自定义指标伸缩
有了可靠的数据源,现在就可以请 KEDA 登场了。KEDA 的核心是一个名为 ScaledObject
的 CRD (Custom Resource Definition)。我们通过定义一个 ScaledObject
来描述伸缩规则。
# keda-scaledobject.yaml
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: elixir-ssr-app-scaler
namespace: my-app-namespace
spec:
scaleTargetRef:
name: elixir-ssr-app # The name of the Deployment to scale
minReplicas: 2
maxReplicas: 20
cooldownPeriod: 60 # Cooldown period after the last trigger
pollingInterval: 30 # How often to check the trigger
triggers:
- type: prometheus
metadata:
serverAddress: http://prometheus-kube-prometheus-prometheus.monitoring.svc.cluster.local:9090
# We want to scale when the AVERAGE number of connections per pod exceeds 500
query: |
sum(active_phoenix_channels{namespace="my-app-namespace"}) / count(kube_pod_info{pod=~"elixir-ssr-app-.*", namespace="my-app-namespace"})
threshold: '500' # The target value
这个 ScaledObject
的配置是整个方案的核心。
-
scaleTargetRef
: 指定了要伸缩的目标,即我们的 Elixir 应用的 Deployment。 -
minReplicas
/maxReplicas
: 定义了伸缩的边界。 triggers
: 定义了伸缩触发器。这里我们使用prometheus
类型。-
serverAddress
: 指向集群内部的 Prometheus 服务。 -
query
: 这是最关键的部分,一个 PromQL 查询。sum(active_phoenix_channels{...})
计算了所有 Pod 的总连接数,然后除以count(kube_pod_info{...})
(即当前运行的 Pod 数量),得到了每个 Pod 的平均连接数。 -
threshold
: 阈值。当上述查询结果超过 500 时,KEDA 就会指示 Kubernetes 增加 Pod 数量,直到平均值降到 500 以下。反之,当平均值远低于 500 时,它会触发缩容。
-
部署这个 ScaledObject
后,KEDA 会接管 Deployment 的 replicas
字段。它会持续执行 PromQL 查询,并根据结果动态调整 Pod 数量。我们终于有了一个能理解“用户负载”的伸缩系统。
第四步:实现优雅停机与连接排空
解决了“如何伸”,接下来是更困难的“如何缩”。当 KEDA 决定减少一个 Pod 时,Kubernetes 的标准流程是:
- 从 Service 的 Endpoints 中移除该 Pod。
- 向 Pod 内的主进程发送
SIGTERM
信号。 - 等待
terminationGracePeriodSeconds
(默认为 30 秒)。 - 如果进程还未退出,发送
SIGKILL
强制终止。
我们必须在这个流程中插入我们的“连接排空”逻辑。这需要 Kubernetes 和 Elixir 应用的协同工作。
首先,修改 Kubernetes Deployment,增加一个 preStop
生命周期钩子。
# elixir-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: elixir-ssr-app
namespace: my-app-namespace
spec:
replicas: 2 # Initial replicas, KEDA will control this later
template:
spec:
# This grace period MUST be long enough for draining to complete
terminationGracePeriodSeconds: 120
containers:
- name: app
image: my-registry/elixir-ssr-app:latest
lifecycle:
preStop:
exec:
# This command is executed before SIGTERM is sent
command: ["/app/bin/drain.sh"]
ports:
- containerPort: 80
name: http
# ... other configurations
这里的关键是 preStop
钩子和 terminationGracePeriodSeconds
。当 Kubernetes 决定终止这个 Pod 时,它会首先执行 /app/bin/drain.sh
脚本,并且给予 Pod 总共 120 秒的时间来完成关闭。
drain.sh
脚本的作用是通知正在运行的 Elixir 节点开始排空。
#!/bin/sh
# /app/bin/drain.sh
# Use Elixir's remote shell to call a function on the running node.
# The node name is configured via environment variables.
/app/bin/my_app remote "MyApp.ClusterManager.start_drain()"
echo "Drain command sent to Elixir node. Waiting for shutdown..."
# The script can exit here. Kubernetes will then send SIGTERM.
# The real waiting happens inside the Elixir VM.
最后,是 Elixir 端的实现。我们需要一个 ClusterManager
来处理这个排空指令。
# lib/my_app/cluster_manager.ex
defmodule MyApp.ClusterManager do
use GenServer
require Logger
def start_link(_opts) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
# This is called remotely by the drain.sh script
def start_drain do
GenServer.cast(__MODULE__, :start_drain)
end
@impl true
def init(:ok) do
# Set initial state to accepting connections
Process.put(:is_draining, false)
{:ok, %{}}
end
@impl true
def handle_cast(:start_drain, state) do
Logger.warn("Node is entering drain mode. No new connections will be accepted.")
Process.put(:is_draining, true)
# Broadcast a reconnect message to all connected clients
MyAppWeb.Endpoint.broadcast("system:events", "system:reconnect", %{})
# Start polling for active connections to drop to zero
Process.send_after(self(), :check_drain_status, 1_000)
{:noreply, state}
end
@impl true
def handle_info(:check_drain_status, state) do
active_channels = PromEx.get_metric_value(:my_app, :active_phoenix_channels)
if active_channels > 0 do
Logger.info("Draining... #{active_channels} connections remaining.")
# Check again in a second
Process.send_after(self(), :check_drain_status, 1_000)
else
Logger.warn("All connections drained. Shutting down node.")
# This will initiate a clean shutdown of the BEAM VM
:init.stop()
end
{:noreply, state}
end
end
# We also need a Plug to reject new connections during drain
# lib/my_app_web/plugs/drain_plug.ex
defmodule MyAppWeb.Plugs.DrainPlug do
import Plug.Conn
def init(options), do: options
def call(conn, _opts) do
if Process.get(:is_draining, false) do
conn
|> put_resp_content_type("text/plain")
|> send_resp(503, "Service is shutting down, please reconnect.")
|> halt()
else
conn
end
end
end
# And in endpoint.ex, add it to the pipeline
# lib/my_app_web/endpoint.ex
plug MyAppWeb.Plugs.DrainPlug
当 start_drain/0
被调用时:
-
ClusterManager
将一个进程字典标志:is_draining
设置为true
。我们的DrainPlug
会检查这个标志,并对任何新的 HTTP 请求返回 503 Service Unavailable,这能有效阻止新的 WebSocket 连接建立。 - 它通过 Phoenix Endpoint 向所有已连接的客户端广播一个
"system:reconnect"
消息。 - 前端的 JavaScript 代码需要监听这个消息,并在收到后主动关闭当前 WebSocket 连接,然后立即重新发起连接。由于旧 Pod 已经不在 Kubernetes Service 的负载均衡池中,新的连接请求会被路由到其他健康的 Pod 上。
-
ClusterManager
开始每秒轮询一次活跃连接数。一旦连接数降为零,它就调用:init.stop()
,这会触发整个 BEAM 虚拟机的有序关闭,进程退出码为 0,Kubernetes 会认为这是一个成功的关闭。
这个机制确保了在 Pod 因为缩容而被终止时,用户会经历一个几乎无感的重连过程,而不是一次粗暴的断开。
整个伸缩流程的可视化
为了更清晰地理解整个系统的工作方式,我们可以用 Mermaid 图来描绘这个流程。
sequenceDiagram participant User participant K8sService participant ElixirPod participant Prometheus participant KEDA participant K8sApiServer User->>+K8sService: Connect (WebSocket) K8sService->>+ElixirPod: Route connection ElixirPod-->>Prometheus: Exposes active_channels metric loop Scaling Check KEDA->>+Prometheus: Query avg(active_channels) Prometheus-->>-KEDA: Returns value (e.g., 600) Note over KEDA: Avg connections (600) > Threshold (500) KEDA->>+K8sApiServer: Request to scale Deployment (N -> N+1) K8sApiServer-->>-KEDA: Acknowledges end loop Graceful Shutdown (Scale-in) KEDA->>+Prometheus: Query avg(active_channels) Prometheus-->>-KEDA: Returns value (e.g., 300) Note over KEDA: Avg connections (300) < Threshold (500) KEDA->>+K8sApiServer: Request to scale Deployment (N -> N-1) K8sApiServer->>ElixirPod: Initiate termination on one pod Note over ElixirPod: K8s executes preStop hook ElixirPod->>ElixirPod: start_drain() is called ElixirPod->>User: Push "system:reconnect" event User->>User: Close old WebSocket, open new User->>+K8sService: Reconnect request K8sService->>+ElixirPod: Route to a healthy pod Note over ElixirPod: Draining pod waits for connections to be 0 ElixirPod->>K8sApiServer: Exits gracefully (code 0) end
这个架构的最终成果是一个能够根据真实用户负载自我调整资源,并在缩容时对用户体验影响最小化的系统。它不是一个简单的“部署上 K8s”方案,而是深度结合了 Elixir/BEAM、Prometheus、KEDA 和 Kubernetes 自身机制,针对特定应用场景(有状态 SSR)的定制化运维方案。
当然,这个方案并非没有局限性。客户端的重连逻辑会引入一个短暂的连接中断,对于某些对实时性要求极高的应用(如在线游戏)可能仍然不够完美。terminationGracePeriodSeconds
是一个固定的超时时间,如果由于网络或其他问题导致连接排空速度过慢,超出这个时间后 Pod 依然会被强制终止。更复杂的方案可能需要引入服务网格(Service Mesh)来做更精细的流量控制,或者在 Elixir 层实现进程状态的跨节点迁移,但这些都会带来指数级增长的复杂性。对于绝大多数 Web 应用而言,当前的连接排空模型是在运维复杂度和用户体验之间一个非常务实的平衡点。