我们面临一个具体的工程问题:一个运行多年的 Ruby on Rails 核心业务系统需要将非核心、计算密集或需要与第三方API交互的任务异步化。团队的技术方向是拥抱 Serverless,Vercel Functions 因其简便的部署和弹性伸缩能力成为首选。问题在于,如何将这个单体 Rails 应用产生的事件,可靠、低成本地传递给 Vercel Functions 执行?
方案A:引入专业消息队列
最直接的思路是引入一个标准的消息队列(MQ),比如 RabbitMQ 或 AWS SQS。
- 优点: 这是行业标准解法。功能完备,包括持久化、死信队列、重试机制、消息确认等。客户端生态成熟,Ruby 和 Node.js 都有高质量的库。
- 缺点:
- 运维成本: 无论是自建 RabbitMQ 集群还是使用云服务 SQS,都意味着新的基础设施成本和维护负担。我们的团队已经为其他服务维护着一个高可用的 ZooKeeper 集群,引入新组件需要慎重评估其必要性。
- 过度设计: 当前的事件量并不巨大,每天约几万到几十万次调用,峰值不高。引入一个完整的 MQ 体系,对于当前阶段来说,有些杀鸡用牛刀。
方案B:基于 ZooKeeper 的分布式协调方案
考虑到我们已有的 ZooKeeper 基础设施,一个替代方案浮出水面:能否利用 ZooKeeper 的分布式协调能力,构建一个轻量级的任务分发与处理系统?
- 优点:
- 复用设施: 无需引入新的基础设施组件,零额外固定成本。团队对 ZooKeeper 的运维已经非常熟悉。
- 实现可控: 核心逻辑自研,可以根据业务需求高度定制,避免被庞大的 MQ 系统功能所束缚。
- 缺点:
- 非专业选手: ZooKeeper 本质是协调服务,不是消息队列。它不擅长高吞吐量的消息存储与传递,且 ZNode 的数据大小有1MB的限制。
- 实现复杂性: 任务的排队、消费者的负载均衡、任务租约、故障恢复等机制都需要自行实现。
决策: 权衡利弊,我们选择了方案B。这是一个务实的决定,旨在以最低的边际成本解决当前的问题,同时将系统的复杂性控制在团队内部。我们接受自行实现任务队列逻辑的挑战,换取更精简的技术栈和更低的运维开销。
架构设计与 ZNode 结构
核心思想是:生产者(Ruby)将任务作为 ZNode 写入 ZooKeeper,消费者(Vercel Functions)通过轮询和竞争性锁(租约)的方式获取并执行任务。
graph TD subgraph "Rails Monolith (Ruby)" Producer[Event Producer] end subgraph "ZooKeeper Cluster" ZTasks["/event-bridge/tasks"] ZLeases["/event-bridge/leases"] end subgraph "Vercel" Cron["Vercel Cron Job (every 1 min)"] ConsumerFunc["/api/process_events"] end Producer -- 1. Create task_node --> ZTasks Cron -- 2. Triggers --> ConsumerFunc ConsumerFunc -- 3. List tasks --> ZTasks ConsumerFunc -- 4. Attempt to acquire lease --> ZLeases ConsumerFunc -- 5. Process event --> ThirdParty[Third-Party API / DB] ConsumerFunc -- 6. Cleanup task & lease --> ZTasks ConsumerFunc -- 6. Cleanup task & lease --> ZLeases
为了实现这个流程,我们设计了如下的 ZNode 树形结构:
-
/event-bridge
: 根节点,用于命名空间隔离。 /event-bridge/tasks
: 持久节点,所有待处理任务的父节点。-
/event-bridge/tasks/task-0000000001
: 持久化顺序节点。节点名由 ZooKeeper 自动生成,保证了任务的顺序性。节点的数据部分(Data)存储任务的具体信息,格式为 JSON 字符串。
-
/event-bridge/leases
: 持久节点,用于存放任务租约。-
/event-bridge/leases/task-0000000001
: 持久节点。当一个消费者决定处理某个任务时,会尝试创建这个同名节点作为“锁”。节点的数据包含消费者ID和租约获取的时间戳。
-
工作流程详解:
- 任务生产: Ruby 应用连接 ZooKeeper,在
/event-bridge/tasks/
路径下创建一个持久化顺序节点,节点内容为事件的 JSON 序列化数据。 - 任务消费触发: Vercel 的 Cron Job 定时(例如每分钟)触发
/api/process_events
这个 Serverless Function。 - 任务发现: Function 连接 ZooKeeper,获取
/event-bridge/tasks
下的所有子节点(待处理任务列表)和/event-bridge/leases
下的所有子节点(已被租约的任务列表)。 - 寻找未分配任务: 在本地计算出
tasks
列表与leases
列表的差集,得到无人处理的任务。 - 竞争租约: 遍历未分配任务列表,对每个任务尝试在
/event-bridge/leases/
下创建一个与任务同名的节点。ZooKeeper 的create
操作是原子的,只有一个 Function 能成功创建,从而获得该任务的处理权(租约)。 - 执行与清理: 获得租约的 Function 从任务节点读取 payload,执行业务逻辑。执行成功后,原子地删除
/event-bridge/tasks/
下对应的任务节点和/event-bridge/leases/
下的租约节点。 - 故障恢复: 如果一个 Function 在获得租约后、完成任务前崩溃,租约节点会一直存在。因此,消费逻辑中必须包含一个“清理”步骤:检查
/event-bridge/leases
下所有租约,如果其数据中的时间戳超过了一个预设的超时阈值(例如5分钟),就认为原消费者已死,删除该租约节点,使任务可以被其他消费者重新获取。
核心实现代码
1. Ruby 生产者 (Producer)
我们需要一个健壮的 ZooKeeper 客户端,能够在 Rails 应用中方便地使用。这里我们封装一个单例模式的客户端。
Gemfile
gem 'zookeeper'
config/initializers/zookeeper_client.rb
# frozen_string_literal: true
require 'zookeeper'
require 'singleton'
require 'json'
# A robust, singleton ZooKeeper client for the Rails application.
# Handles connection management, retries, and path creation.
class ZKClient
include Singleton
attr_reader :zk
# Path constants
TASKS_PATH = '/event-bridge/tasks'.freeze
TASK_PREFIX = 'task-'.freeze
def initialize
@connection_string = ENV.fetch('ZOOKEEPER_CONNECTION_STRING', '127.0.0.1:2181')
@logger = Rails.logger
connect
end
# Publishes an event payload to the distributed queue.
#
# @param payload [Hash] The event data to be processed.
# @return [String, nil] The path of the created task node, or nil on failure.
def publish_event(payload)
ensure_connected
ensure_base_paths
task_path = File.join(TASKS_PATH, TASK_PREFIX)
serialized_payload = payload.to_json
# ZNode data size limit is typically 1MB. Add a check for production safety.
if serialized_payload.bytesize > 1_000_000
@logger.error("ZooKeeper publish error: Payload size is too large (#{serialized_payload.bytesize} bytes).")
return nil
end
begin
# Create a persistent sequential node. This is the core "enqueue" operation.
created_path = @zk.create(path: task_path, data: serialized_payload, sequence: true)
@logger.info("Successfully published event to ZooKeeper path: #{created_path}")
created_path
rescue Zookeeper::Error::ConnectionLoss, Zookeeper::Error::SessionExpired => e
@logger.error("ZooKeeper connection lost while publishing: #{e.message}. Attempting to reconnect and retry.")
reconnect
# A simple retry logic. For production, a more sophisticated backoff strategy is needed.
# In this case, we let the caller decide whether to retry.
nil
rescue Zookeeper::Error => e
@logger.error("Failed to publish event to ZooKeeper: #{e.class} - #{e.message}")
nil
end
end
private
def connect
@logger.info("Connecting to ZooKeeper at #{@connection_string}...")
# The timeout is crucial for preventing the application from hanging on startup.
@zk = Zookeeper.new(@connection_string, timeout: 5)
@zk.on_state_change { |event| handle_state_change(event) }
rescue StandardError => e
@logger.error("Failed to initialize connection to ZooKeeper: #{e.message}")
@zk = nil
end
def handle_state_change(event)
@logger.info("ZooKeeper state changed: #{event.inspect}")
# You might want to handle states like :expired by triggering a reconnect.
end
def ensure_connected
# If connection is closed or nil, try to reconnect.
reconnect if @zk.nil? || @zk.closed?
raise 'ZooKeeper connection is not available' if @zk.nil? || @zk.closed?
end
def reconnect
close_connection
connect
end
def close_connection
@zk&.close
rescue StandardError => e
@logger.warn("Error closing existing ZooKeeper connection: #{e.message}")
ensure
@zk = nil
end
# Ensures the base paths for the event bridge exist.
def ensure_base_paths
['/event-bridge', '/event-bridge/tasks', '/event-bridge/leases'].each do |path|
result = @zk.get(path: path)
next if result[:stat].exists?
@logger.info("Path #{path} does not exist. Creating...")
@zk.create(path: path, data: '')
end
rescue Zookeeper::Error::NodeExists
# Race condition is fine, another process created it.
rescue StandardError => e
@logger.error("Error ensuring base paths in ZooKeeper: #{e.message}")
raise e
end
end
# Usage in a Rails controller or service
# ZKClient.instance.publish_event({ user_id: 123, action: 'export_report' })
2. Vercel Function 消费者 (Consumer)
这是一个由 Cron 触发的 Node.js Serverless Function。
package.json
{
"dependencies": {
"node-zookeeper-client": "^1.1.3"
}
}
vercel.json
{
"crons": [
{
"path": "/api/process_events",
"schedule": "*/1 * * * *"
}
]
}
api/process_events.js
const zookeeper = require('node-zookeeper-client');
const CONNECTION_STRING = process.env.ZOOKEEPER_CONNECTION_STRING || '127.0.0.1:2181';
const LEASE_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes
const CONSUMER_ID = `vercel-func-${Math.random().toString(36).substring(2, 10)}`;
const ROOT_PATH = '/event-bridge';
const TASKS_PATH = `${ROOT_PATH}/tasks`;
const LEASES_PATH = `${ROOT_PATH}/leases`;
let client;
// Helper to manage ZooKeeper connection lifecycle within a serverless function
function getClient() {
if (!client || client.getState() === zookeeper.State.DISCONNECTED) {
client = zookeeper.createClient(CONNECTION_STRING, {
sessionTimeout: 30000,
spinDelay: 1000,
retries: 2
});
return new Promise((resolve, reject) => {
client.once('connected', () => {
console.log('ZooKeeper client connected.');
resolve(client);
});
client.once('error', reject);
client.connect();
});
}
return Promise.resolve(client);
}
// Promisified ZooKeeper client methods
const zkP = {
getChildren: (path) => new Promise((resolve, reject) => client.getChildren(path, (err, children) => err ? reject(err) : resolve(children))),
getData: (path) => new Promise((resolve, reject) => client.getData(path, (err, data) => err ? reject(err) : resolve(data))),
create: (path, data, mode) => new Promise((resolve, reject) => client.create(path, data, mode, (err, p) => err ? reject(err) : resolve(p))),
remove: (path) => new Promise((resolve, reject) => client.remove(path, -1, (err) => err ? reject(err) : resolve())),
exists: (path) => new Promise((resolve, reject) => client.exists(path, (err, stat) => err ? reject(err) : resolve(stat))),
};
async function processEvent(taskName) {
const taskPath = `${TASKS_PATH}/${taskName}`;
console.log(`Processing task: ${taskName}`);
try {
const taskDataBuffer = await zkP.getData(taskPath);
const payload = JSON.parse(taskDataBuffer.toString('utf8'));
// =======================================================
// == YOUR ACTUAL BUSINESS LOGIC GOES HERE ==
// == e.g., call a third-party API, process data, etc. ==
// =======================================================
console.log(`Executing business logic for payload:`, payload);
await new Promise(resolve => setTimeout(resolve, 2000)); // Simulating async work
console.log(`Business logic for ${taskName} completed.`);
// =======================================================
return true; // Indicate success
} catch (error) {
console.error(`Error processing event ${taskName}:`, error);
// In a real project, you might want to move this task to a "dead-letter" path
// instead of just failing. For now, we leave it for retry.
return false;
}
}
async function cleanupStaleLeases() {
console.log('Checking for stale leases...');
let staleLeaseCount = 0;
try {
const leasedTasks = await zkP.getChildren(LEASES_PATH);
for (const taskName of leasedTasks) {
const leasePath = `${LEASES_PATH}/${taskName}`;
try {
const leaseDataBuffer = await zkP.getData(leasePath);
const leaseInfo = JSON.parse(leaseDataBuffer.toString('utf8'));
if (Date.now() - leaseInfo.timestamp > LEASE_TIMEOUT_MS) {
console.warn(`Found stale lease for task ${taskName}. Removing lease.`);
await zkP.remove(leasePath);
staleLeaseCount++;
}
} catch (error) {
// If node is gone between getChildren and getData, that's fine.
if (error.getCode() !== zookeeper.Exception.NO_NODE) {
console.error(`Error checking lease for ${taskName}:`, error);
}
}
}
} catch (error) {
if (error.getCode() !== zookeeper.Exception.NO_NODE) {
console.error('Error during stale lease cleanup:', error);
}
}
console.log(`Cleaned up ${staleLeaseCount} stale leases.`);
}
module.exports = async (req, res) => {
console.log(`Event processor function triggered at ${new Date().toISOString()}`);
try {
client = await getClient();
// First, run garbage collection for failed consumers
await cleanupStaleLeases();
// Fetch available tasks and current leases
const [allTasks, leasedTasks] = await Promise.all([
zkP.getChildren(TASKS_PATH),
zkP.getChildren(LEASES_PATH)
]);
const leasedTaskSet = new Set(leasedTasks);
const availableTasks = allTasks.filter(task => !leasedTaskSet.has(task));
console.log(`Found ${availableTasks.length} available tasks out of ${allTasks.length} total.`);
if (availableTasks.length === 0) {
res.status(200).send('No new events to process.');
if (client) { client.close(); client = null; }
return;
}
// Attempt to process one task per invocation to keep function execution short
const taskToProcess = availableTasks[0]; // Simple strategy: take the oldest
const leasePath = `${LEASES_PATH}/${taskToProcess}`;
const taskPath = `${TASKS_PATH}/${taskToProcess}`;
try {
// Attempt to acquire lease (atomic operation)
const leaseData = Buffer.from(JSON.stringify({ consumerId: CONSUMER_ID, timestamp: Date.now() }));
await zkP.create(leasePath, leaseData, zookeeper.CreateMode.PERSISTENT);
console.log(`Successfully acquired lease for task: ${taskToProcess}`);
const success = await processEvent(taskToProcess);
if (success) {
console.log(`Task ${taskToProcess} processed successfully. Cleaning up.`);
// Cleanup on success. A multi-op transaction would be safer here if available.
await zkP.remove(taskPath);
await zkP.remove(leasePath);
} else {
console.error(`Failed to process ${taskToProcess}. Releasing lease for retry.`);
// On failure, only remove the lease so another worker can try again
await zkP.remove(leasePath);
}
} catch (error) {
if (error.getCode() === zookeeper.Exception.NODE_EXISTS) {
// This is an expected race condition: another consumer got the lease first.
console.log(`Failed to acquire lease for ${taskToProcess}, another consumer was faster.`);
} else {
console.error(`An unexpected error occurred while trying to process ${taskToProcess}:`, error);
}
}
res.status(200).send(`Processed one cycle. Available tasks: ${availableTasks.length}`);
} catch (error) {
console.error('Main handler error:', error);
res.status(500).send('An error occurred in the event processor.');
} finally {
// Ensure the client is closed to prevent hanging connections
if (client) {
client.close();
client = null;
console.log('ZooKeeper client closed.');
}
}
};
架构的局限性与适用边界
这个架构并非万能药,它的实用性建立在深刻理解其局限性的基础上。
- 延迟与吞吐量: 这是一个基于轮询的系统,其事件处理的最小延迟受限于 Vercel Cron Job 的最短执行间隔(当前为1分钟)。它完全不适用于需要低延迟响应的场景。同时,由于所有消费者都会扫描整个任务列表,当任务积压过多时,ZooKeeper 的读负载会显著增加,吞吐量存在明显上限。
- Payload 限制: ZooKeeper 的 ZNode 数据大小默认上限为 1MB。对于需要传递大文件的任务,必须采用“引用”模式,即将实际数据存放在S3等对象存储中,仅在 ZNode 中存放文件的引用地址。
- 惊群效应: 尽管我们的轮询模式规避了 ZooKeeper Watch 机制典型的惊群效应,但在任务列表很长的情况下,每次轮询都会对 ZooKeeper 产生大量
getChildren
请求,这是一种变相的、分散在时间上的“惊群”。 - 事务性缺失: 在任务处理成功后,删除
task
节点和lease
节点是两个独立的操作。虽然失败的概率极低,但理论上存在只删除一个而留下另一个孤儿节点的可能性。ZooKeeper 的 multi-op 事务可以解决这个问题,但会增加代码复杂性。
该架构最适合的场景是:处理对延迟不敏感、任务量中等、且希望最大化复用现有 ZooKeeper 设施的异步任务。它在一个技术栈迁移的过渡期,或者在成本控制极为严格的项目中,提供了一个可靠且运维负担极小的有效解决方案。当业务规模增长,事件吞吐量要求达到新的量级时,这套系统的设计和经验可以平滑地迁移到更专业的 Kafka 或 Pulsar 等流处理平台。