利用 ZooKeeper 为 Ruby 与 Vercel Functions 构建轻量级事件驱动桥接


我们面临一个具体的工程问题:一个运行多年的 Ruby on Rails 核心业务系统需要将非核心、计算密集或需要与第三方API交互的任务异步化。团队的技术方向是拥抱 Serverless,Vercel Functions 因其简便的部署和弹性伸缩能力成为首选。问题在于,如何将这个单体 Rails 应用产生的事件,可靠、低成本地传递给 Vercel Functions 执行?

方案A:引入专业消息队列

最直接的思路是引入一个标准的消息队列(MQ),比如 RabbitMQ 或 AWS SQS。

  • 优点: 这是行业标准解法。功能完备,包括持久化、死信队列、重试机制、消息确认等。客户端生态成熟,Ruby 和 Node.js 都有高质量的库。
  • 缺点:
    1. 运维成本: 无论是自建 RabbitMQ 集群还是使用云服务 SQS,都意味着新的基础设施成本和维护负担。我们的团队已经为其他服务维护着一个高可用的 ZooKeeper 集群,引入新组件需要慎重评估其必要性。
    2. 过度设计: 当前的事件量并不巨大,每天约几万到几十万次调用,峰值不高。引入一个完整的 MQ 体系,对于当前阶段来说,有些杀鸡用牛刀。

方案B:基于 ZooKeeper 的分布式协调方案

考虑到我们已有的 ZooKeeper 基础设施,一个替代方案浮出水面:能否利用 ZooKeeper 的分布式协调能力,构建一个轻量级的任务分发与处理系统?

  • 优点:
    1. 复用设施: 无需引入新的基础设施组件,零额外固定成本。团队对 ZooKeeper 的运维已经非常熟悉。
    2. 实现可控: 核心逻辑自研,可以根据业务需求高度定制,避免被庞大的 MQ 系统功能所束缚。
  • 缺点:
    1. 非专业选手: ZooKeeper 本质是协调服务,不是消息队列。它不擅长高吞吐量的消息存储与传递,且 ZNode 的数据大小有1MB的限制。
    2. 实现复杂性: 任务的排队、消费者的负载均衡、任务租约、故障恢复等机制都需要自行实现。

决策: 权衡利弊,我们选择了方案B。这是一个务实的决定,旨在以最低的边际成本解决当前的问题,同时将系统的复杂性控制在团队内部。我们接受自行实现任务队列逻辑的挑战,换取更精简的技术栈和更低的运维开销。

架构设计与 ZNode 结构

核心思想是:生产者(Ruby)将任务作为 ZNode 写入 ZooKeeper,消费者(Vercel Functions)通过轮询和竞争性锁(租约)的方式获取并执行任务。

graph TD
    subgraph "Rails Monolith (Ruby)"
        Producer[Event Producer]
    end

    subgraph "ZooKeeper Cluster"
        ZTasks["/event-bridge/tasks"]
        ZLeases["/event-bridge/leases"]
    end

    subgraph "Vercel"
        Cron["Vercel Cron Job (every 1 min)"]
        ConsumerFunc["/api/process_events"]
    end

    Producer -- 1. Create task_node --> ZTasks
    Cron -- 2. Triggers --> ConsumerFunc
    ConsumerFunc -- 3. List tasks --> ZTasks
    ConsumerFunc -- 4. Attempt to acquire lease --> ZLeases
    ConsumerFunc -- 5. Process event --> ThirdParty[Third-Party API / DB]
    ConsumerFunc -- 6. Cleanup task & lease --> ZTasks
    ConsumerFunc -- 6. Cleanup task & lease --> ZLeases

为了实现这个流程,我们设计了如下的 ZNode 树形结构:

  • /event-bridge: 根节点,用于命名空间隔离。
  • /event-bridge/tasks: 持久节点,所有待处理任务的父节点。
    • /event-bridge/tasks/task-0000000001: 持久化顺序节点。节点名由 ZooKeeper 自动生成,保证了任务的顺序性。节点的数据部分(Data)存储任务的具体信息,格式为 JSON 字符串。
  • /event-bridge/leases: 持久节点,用于存放任务租约。
    • /event-bridge/leases/task-0000000001: 持久节点。当一个消费者决定处理某个任务时,会尝试创建这个同名节点作为“锁”。节点的数据包含消费者ID和租约获取的时间戳。

工作流程详解:

  1. 任务生产: Ruby 应用连接 ZooKeeper,在 /event-bridge/tasks/ 路径下创建一个持久化顺序节点,节点内容为事件的 JSON 序列化数据。
  2. 任务消费触发: Vercel 的 Cron Job 定时(例如每分钟)触发 /api/process_events 这个 Serverless Function。
  3. 任务发现: Function 连接 ZooKeeper,获取 /event-bridge/tasks 下的所有子节点(待处理任务列表)和 /event-bridge/leases 下的所有子节点(已被租约的任务列表)。
  4. 寻找未分配任务: 在本地计算出 tasks 列表与 leases 列表的差集,得到无人处理的任务。
  5. 竞争租约: 遍历未分配任务列表,对每个任务尝试在 /event-bridge/leases/ 下创建一个与任务同名的节点。ZooKeeper 的 create 操作是原子的,只有一个 Function 能成功创建,从而获得该任务的处理权(租约)。
  6. 执行与清理: 获得租约的 Function 从任务节点读取 payload,执行业务逻辑。执行成功后,原子地删除 /event-bridge/tasks/ 下对应的任务节点和 /event-bridge/leases/ 下的租约节点。
  7. 故障恢复: 如果一个 Function 在获得租约后、完成任务前崩溃,租约节点会一直存在。因此,消费逻辑中必须包含一个“清理”步骤:检查 /event-bridge/leases 下所有租约,如果其数据中的时间戳超过了一个预设的超时阈值(例如5分钟),就认为原消费者已死,删除该租约节点,使任务可以被其他消费者重新获取。

核心实现代码

1. Ruby 生产者 (Producer)

我们需要一个健壮的 ZooKeeper 客户端,能够在 Rails 应用中方便地使用。这里我们封装一个单例模式的客户端。

Gemfile

gem 'zookeeper'

config/initializers/zookeeper_client.rb

# frozen_string_literal: true

require 'zookeeper'
require 'singleton'
require 'json'

# A robust, singleton ZooKeeper client for the Rails application.
# Handles connection management, retries, and path creation.
class ZKClient
  include Singleton

  attr_reader :zk

  # Path constants
  TASKS_PATH = '/event-bridge/tasks'.freeze
  TASK_PREFIX = 'task-'.freeze

  def initialize
    @connection_string = ENV.fetch('ZOOKEEPER_CONNECTION_STRING', '127.0.0.1:2181')
    @logger = Rails.logger
    connect
  end

  # Publishes an event payload to the distributed queue.
  #
  # @param payload [Hash] The event data to be processed.
  # @return [String, nil] The path of the created task node, or nil on failure.
  def publish_event(payload)
    ensure_connected
    ensure_base_paths

    task_path = File.join(TASKS_PATH, TASK_PREFIX)
    serialized_payload = payload.to_json

    # ZNode data size limit is typically 1MB. Add a check for production safety.
    if serialized_payload.bytesize > 1_000_000
      @logger.error("ZooKeeper publish error: Payload size is too large (#{serialized_payload.bytesize} bytes).")
      return nil
    end

    begin
      # Create a persistent sequential node. This is the core "enqueue" operation.
      created_path = @zk.create(path: task_path, data: serialized_payload, sequence: true)
      @logger.info("Successfully published event to ZooKeeper path: #{created_path}")
      created_path
    rescue Zookeeper::Error::ConnectionLoss, Zookeeper::Error::SessionExpired => e
      @logger.error("ZooKeeper connection lost while publishing: #{e.message}. Attempting to reconnect and retry.")
      reconnect
      # A simple retry logic. For production, a more sophisticated backoff strategy is needed.
      # In this case, we let the caller decide whether to retry.
      nil
    rescue Zookeeper::Error => e
      @logger.error("Failed to publish event to ZooKeeper: #{e.class} - #{e.message}")
      nil
    end
  end

  private

  def connect
    @logger.info("Connecting to ZooKeeper at #{@connection_string}...")
    # The timeout is crucial for preventing the application from hanging on startup.
    @zk = Zookeeper.new(@connection_string, timeout: 5)
    @zk.on_state_change { |event| handle_state_change(event) }
  rescue StandardError => e
    @logger.error("Failed to initialize connection to ZooKeeper: #{e.message}")
    @zk = nil
  end

  def handle_state_change(event)
    @logger.info("ZooKeeper state changed: #{event.inspect}")
    # You might want to handle states like :expired by triggering a reconnect.
  end

  def ensure_connected
    # If connection is closed or nil, try to reconnect.
    reconnect if @zk.nil? || @zk.closed?
    raise 'ZooKeeper connection is not available' if @zk.nil? || @zk.closed?
  end

  def reconnect
    close_connection
    connect
  end

  def close_connection
    @zk&.close
  rescue StandardError => e
    @logger.warn("Error closing existing ZooKeeper connection: #{e.message}")
  ensure
    @zk = nil
  end

  # Ensures the base paths for the event bridge exist.
  def ensure_base_paths
    ['/event-bridge', '/event-bridge/tasks', '/event-bridge/leases'].each do |path|
      result = @zk.get(path: path)
      next if result[:stat].exists?

      @logger.info("Path #{path} does not exist. Creating...")
      @zk.create(path: path, data: '')
    end
  rescue Zookeeper::Error::NodeExists
    # Race condition is fine, another process created it.
  rescue StandardError => e
    @logger.error("Error ensuring base paths in ZooKeeper: #{e.message}")
    raise e
  end
end

# Usage in a Rails controller or service
# ZKClient.instance.publish_event({ user_id: 123, action: 'export_report' })

2. Vercel Function 消费者 (Consumer)

这是一个由 Cron 触发的 Node.js Serverless Function。

package.json

{
  "dependencies": {
    "node-zookeeper-client": "^1.1.3"
  }
}

vercel.json

{
  "crons": [
    {
      "path": "/api/process_events",
      "schedule": "*/1 * * * *"
    }
  ]
}

api/process_events.js

const zookeeper = require('node-zookeeper-client');

const CONNECTION_STRING = process.env.ZOOKEEPER_CONNECTION_STRING || '127.0.0.1:2181';
const LEASE_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes
const CONSUMER_ID = `vercel-func-${Math.random().toString(36).substring(2, 10)}`;

const ROOT_PATH = '/event-bridge';
const TASKS_PATH = `${ROOT_PATH}/tasks`;
const LEASES_PATH = `${ROOT_PATH}/leases`;

let client;

// Helper to manage ZooKeeper connection lifecycle within a serverless function
function getClient() {
  if (!client || client.getState() === zookeeper.State.DISCONNECTED) {
    client = zookeeper.createClient(CONNECTION_STRING, {
      sessionTimeout: 30000,
      spinDelay: 1000,
      retries: 2
    });

    return new Promise((resolve, reject) => {
      client.once('connected', () => {
        console.log('ZooKeeper client connected.');
        resolve(client);
      });
      client.once('error', reject);
      client.connect();
    });
  }
  return Promise.resolve(client);
}

// Promisified ZooKeeper client methods
const zkP = {
  getChildren: (path) => new Promise((resolve, reject) => client.getChildren(path, (err, children) => err ? reject(err) : resolve(children))),
  getData: (path) => new Promise((resolve, reject) => client.getData(path, (err, data) => err ? reject(err) : resolve(data))),
  create: (path, data, mode) => new Promise((resolve, reject) => client.create(path, data, mode, (err, p) => err ? reject(err) : resolve(p))),
  remove: (path) => new Promise((resolve, reject) => client.remove(path, -1, (err) => err ? reject(err) : resolve())),
  exists: (path) => new Promise((resolve, reject) => client.exists(path, (err, stat) => err ? reject(err) : resolve(stat))),
};

async function processEvent(taskName) {
  const taskPath = `${TASKS_PATH}/${taskName}`;
  console.log(`Processing task: ${taskName}`);
  
  try {
    const taskDataBuffer = await zkP.getData(taskPath);
    const payload = JSON.parse(taskDataBuffer.toString('utf8'));
    
    // =======================================================
    // == YOUR ACTUAL BUSINESS LOGIC GOES HERE              ==
    // == e.g., call a third-party API, process data, etc.  ==
    // =======================================================
    console.log(`Executing business logic for payload:`, payload);
    await new Promise(resolve => setTimeout(resolve, 2000)); // Simulating async work
    console.log(`Business logic for ${taskName} completed.`);
    // =======================================================

    return true; // Indicate success
  } catch (error) {
    console.error(`Error processing event ${taskName}:`, error);
    // In a real project, you might want to move this task to a "dead-letter" path
    // instead of just failing. For now, we leave it for retry.
    return false;
  }
}

async function cleanupStaleLeases() {
  console.log('Checking for stale leases...');
  let staleLeaseCount = 0;
  try {
    const leasedTasks = await zkP.getChildren(LEASES_PATH);
    for (const taskName of leasedTasks) {
      const leasePath = `${LEASES_PATH}/${taskName}`;
      try {
        const leaseDataBuffer = await zkP.getData(leasePath);
        const leaseInfo = JSON.parse(leaseDataBuffer.toString('utf8'));
        
        if (Date.now() - leaseInfo.timestamp > LEASE_TIMEOUT_MS) {
          console.warn(`Found stale lease for task ${taskName}. Removing lease.`);
          await zkP.remove(leasePath);
          staleLeaseCount++;
        }
      } catch (error) {
        // If node is gone between getChildren and getData, that's fine.
        if (error.getCode() !== zookeeper.Exception.NO_NODE) {
          console.error(`Error checking lease for ${taskName}:`, error);
        }
      }
    }
  } catch (error) {
    if (error.getCode() !== zookeeper.Exception.NO_NODE) {
      console.error('Error during stale lease cleanup:', error);
    }
  }
  console.log(`Cleaned up ${staleLeaseCount} stale leases.`);
}


module.exports = async (req, res) => {
  console.log(`Event processor function triggered at ${new Date().toISOString()}`);

  try {
    client = await getClient();

    // First, run garbage collection for failed consumers
    await cleanupStaleLeases();

    // Fetch available tasks and current leases
    const [allTasks, leasedTasks] = await Promise.all([
      zkP.getChildren(TASKS_PATH),
      zkP.getChildren(LEASES_PATH)
    ]);

    const leasedTaskSet = new Set(leasedTasks);
    const availableTasks = allTasks.filter(task => !leasedTaskSet.has(task));

    console.log(`Found ${availableTasks.length} available tasks out of ${allTasks.length} total.`);

    if (availableTasks.length === 0) {
      res.status(200).send('No new events to process.');
      if (client) { client.close(); client = null; }
      return;
    }

    // Attempt to process one task per invocation to keep function execution short
    const taskToProcess = availableTasks[0]; // Simple strategy: take the oldest
    const leasePath = `${LEASES_PATH}/${taskToProcess}`;
    const taskPath = `${TASKS_PATH}/${taskToProcess}`;

    try {
      // Attempt to acquire lease (atomic operation)
      const leaseData = Buffer.from(JSON.stringify({ consumerId: CONSUMER_ID, timestamp: Date.now() }));
      await zkP.create(leasePath, leaseData, zookeeper.CreateMode.PERSISTENT);
      
      console.log(`Successfully acquired lease for task: ${taskToProcess}`);
      
      const success = await processEvent(taskToProcess);

      if (success) {
        console.log(`Task ${taskToProcess} processed successfully. Cleaning up.`);
        // Cleanup on success. A multi-op transaction would be safer here if available.
        await zkP.remove(taskPath); 
        await zkP.remove(leasePath);
      } else {
        console.error(`Failed to process ${taskToProcess}. Releasing lease for retry.`);
        // On failure, only remove the lease so another worker can try again
        await zkP.remove(leasePath);
      }
    } catch (error) {
      if (error.getCode() === zookeeper.Exception.NODE_EXISTS) {
        // This is an expected race condition: another consumer got the lease first.
        console.log(`Failed to acquire lease for ${taskToProcess}, another consumer was faster.`);
      } else {
        console.error(`An unexpected error occurred while trying to process ${taskToProcess}:`, error);
      }
    }

    res.status(200).send(`Processed one cycle. Available tasks: ${availableTasks.length}`);

  } catch (error) {
    console.error('Main handler error:', error);
    res.status(500).send('An error occurred in the event processor.');
  } finally {
    // Ensure the client is closed to prevent hanging connections
    if (client) {
      client.close();
      client = null;
      console.log('ZooKeeper client closed.');
    }
  }
};

架构的局限性与适用边界

这个架构并非万能药,它的实用性建立在深刻理解其局限性的基础上。

  1. 延迟与吞吐量: 这是一个基于轮询的系统,其事件处理的最小延迟受限于 Vercel Cron Job 的最短执行间隔(当前为1分钟)。它完全不适用于需要低延迟响应的场景。同时,由于所有消费者都会扫描整个任务列表,当任务积压过多时,ZooKeeper 的读负载会显著增加,吞吐量存在明显上限。
  2. Payload 限制: ZooKeeper 的 ZNode 数据大小默认上限为 1MB。对于需要传递大文件的任务,必须采用“引用”模式,即将实际数据存放在S3等对象存储中,仅在 ZNode 中存放文件的引用地址。
  3. 惊群效应: 尽管我们的轮询模式规避了 ZooKeeper Watch 机制典型的惊群效应,但在任务列表很长的情况下,每次轮询都会对 ZooKeeper 产生大量 getChildren 请求,这是一种变相的、分散在时间上的“惊群”。
  4. 事务性缺失: 在任务处理成功后,删除 task 节点和 lease 节点是两个独立的操作。虽然失败的概率极低,但理论上存在只删除一个而留下另一个孤儿节点的可能性。ZooKeeper 的 multi-op 事务可以解决这个问题,但会增加代码复杂性。

该架构最适合的场景是:处理对延迟不敏感、任务量中等、且希望最大化复用现有 ZooKeeper 设施的异步任务。它在一个技术栈迁移的过渡期,或者在成本控制极为严格的项目中,提供了一个可靠且运维负担极小的有效解决方案。当业务规模增长,事件吞吐量要求达到新的量级时,这套系统的设计和经验可以平滑地迁移到更专业的 Kafka 或 Pulsar 等流处理平台。


  目录