构建基于 Linkerd mTLS、Redis Streams 与 ASP.NET Core 的零信任事件驱动架构


我们团队最近在重构一个核心业务系统,目标是将其从一个紧耦合的单体应用拆分为一组基于事件驱动的微服务。技术栈选型很快确定下来:ASP.NET Core 提供高性能的服务运行时,Redis Streams 作为轻量级、支持持久化和消费组的消息代理。然而,当讨论进入到安全层面时,一个棘手的问题浮出水面:如何在一个动态的 Kubernetes 环境中,以一种统一且透明的方式,确保从生产者到消费者、包括与 Redis Broker 之间所有通信的绝对安全?

传统的做法是为 Redis 启用 TLS,然后在每个微服务中配置复杂的证书、密钥和连接字符串。这不仅增加了运维负担,而且在证书轮换、服务身份验证等环节极易出错。更重要的是,它与我们正在推行的服务网格(Service Mesh)理念背道而驰。我们希望安全策略是平台层面的,而不是应用层面的,应用应该专注于业务逻辑。我们的目标是:能否将 Redis 也无缝地纳入服务网格的零信任安全体系中,让 Linkerd 自动为所有进出 Redis 的流量注入 mTLS,就像它为服务间 HTTP/gRPC 调用所做的那样?

这便是我们这次实践的起点:构建一个架构,其中 ASP.NET Core 服务通过 Redis Streams 进行异步通信,而 Linkerd 服务网格则负责为整个通信链路(包括应用到 Redis 的 TCP 连接)提供强制性的、自动化的 mTLS 加密和授权策略。

初步构想与基础服务搭建

在引入 Linkerd 之前,我们首先需要一个能正常工作的事件驱动系统。这个系统包含三个组件:

  1. EventProducer: 一个 ASP.NET Core Web API,接收外部请求,然后向 Redis Stream 推送事件。
  2. EventConsumer: 一个 ASP.NET Core 后台服务,作为消费组的一员,持续从 Redis Stream 读取并处理事件。
  3. Redis: 标准的 Redis 实例,作为我们的消息代理。

这里的关键在于代码的健壮性。即使是在最简单的场景下,生产级的代码也必须考虑连接管理、错误处理和日志记录。

生产者服务的核心逻辑

生产者使用 StackExchange.Redis 库,并通过依赖注入管理 IConnectionMultiplexer 的单例实例,这是该库的最佳实践。

// In ProducerService/Program.cs

using StackExchange.Redis;

var builder = WebApplication.CreateBuilder(args);

// ... services registration ...

// Redis ConnectionMultiplexer should be a singleton
var redisConnectionString = builder.Configuration.GetValue<string>("Redis:ConnectionString");
if (string.IsNullOrEmpty(redisConnectionString))
{
    throw new InvalidOperationException("Redis connection string is not configured.");
}
builder.Services.AddSingleton<IConnectionMultiplexer>(sp => 
    ConnectionMultiplexer.Connect(redisConnectionString));

var app = builder.Build();

// API Endpoint
app.MapPost("/events", async (IConnectionMultiplexer redis, ILogger<Program> logger) =>
{
    var db = redis.GetDatabase();
    var streamName = "my-event-stream";
    var eventId = Guid.NewGuid().ToString();
    var eventData = new NameValueEntry[]
    {
        new("eventId", eventId),
        new("timestamp", DateTime.UtcNow.ToString("o")),
        new("payload", "Some critical business data")
    };

    try
    {
        // XADD command to append a new entry to the stream
        var redisId = await db.StreamAddAsync(streamName, eventData);
        logger.LogInformation("Successfully produced event {EventId} with Redis ID {RedisId}", eventId, redisId);
        return Results.Ok(new { EventId = eventId, RedisId = redisId.ToString() });
    }
    catch (RedisConnectionException ex)
    {
        logger.LogError(ex, "Failed to connect to Redis while producing event.");
        return Results.StatusCode(503); // Service Unavailable
    }
    catch (Exception ex)
    {
        logger.LogError(ex, "An unexpected error occurred while producing event {EventId}", eventId);
        return Results.StatusCode(500);
    }
});

app.Run();

消费者服务的后台工作者

消费者是一个 BackgroundService,它在应用启动后便开始循环拉取消息。它会创建消费组,确保即使服务重启或横向扩展,消息也能被不重不漏地处理。

// In ConsumerService/EventConsumerWorker.cs

public class EventConsumerWorker : BackgroundService
{
    private readonly ILogger<EventConsumerWorker> _logger;
    private readonly IConnectionMultiplexer _redis;
    private const string StreamName = "my-event-stream";
    private const string GroupName = "my-consumer-group";
    private const string ConsumerName = "consumer-"; // Unique name per instance

    public EventConsumerWorker(IConnectionMultiplexer redis, ILogger<EventConsumerWorker> logger)
    {
        _redis = redis;
        _logger = logger;
        ConsumerName += Environment.MachineName;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var db = _redis.GetDatabase();

        // Ensure the consumer group exists. This is idempotent.
        try
        {
            if (!(await db.KeyExistsAsync(StreamName)) || 
                (await db.StreamGroupInfoAsync(StreamName)).All(g => g.Name != GroupName))
            {
                // Create stream and group starting from the beginning
                await db.StreamCreateConsumerGroupAsync(StreamName, GroupName, "0-0", createStream: true);
                _logger.LogInformation("Created Redis Stream '{StreamName}' and consumer group '{GroupName}'", StreamName, GroupName);
            }
        }
        catch(Exception ex)
        {
            _logger.LogError(ex, "Failed to create consumer group. Worker will not start.");
            return;
        }

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                // XREADGROUP command to read from the stream as part of a group
                // ">" means only new messages that arrive after we start listening
                var entries = await db.StreamReadGroupAsync(StreamName, GroupName, ConsumerName, ">", 10, 10000);

                if (!entries.Any())
                {
                    _logger.LogTrace("No new events received. Waiting...");
                    continue;
                }

                foreach (var entry in entries)
                {
                    _logger.LogInformation("Processing event {RedisId}", entry.Id);
                    // Simulate work
                    await Task.Delay(100, stoppingToken);

                    // Acknowledge the message so it's not redelivered
                    await db.StreamAcknowledgeAsync(StreamName, GroupName, entry.Id);
                }
            }
            catch (RedisConnectionException ex)
            {
                _logger.LogError(ex, "Redis connection lost. Retrying in 5 seconds...");
                await Task.Delay(5000, stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "An unexpected error occurred in consumer loop. Retrying in 5 seconds...");
                await Task.Delay(5000, stoppingToken);
            }
        }
    }
}

这些代码本身不包含任何安全相关的逻辑,连接字符串也只是一个简单的 redis-master:6379。这正是我们想要的效果。

部署到 Kubernetes 并引入 Linkerd

接下来,我们将这些服务容器化并部署到 Kubernetes。

Kubernetes 清单

我们为生产者、消费者和 Redis 分别创建 DeploymentService

redis-deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: redis-master
spec:
  replicas: 1
  selector:
    matchLabels:
      app: redis
  template:
    metadata:
      labels:
        app: redis
    spec:
      containers:
      - name: redis
        image: redis:7.0-alpine
        ports:
        - containerPort: 6379
---
apiVersion: v1
kind: Service
metadata:
  name: redis-master
spec:
  ports:
  - port: 6379
    targetPort: 6379
  selector:
    app: redis

producer-deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer-service
spec:
  replicas: 1
  selector:
    matchLabels:
      app: producer-service
  template:
    metadata:
      labels:
        app: producer-service
    spec:
      containers:
      - name: producer
        image: your-repo/producer-service:latest # Replace with your image
        ports:
        - containerPort: 8080
        env:
        - name: Redis__ConnectionString
          value: "redis-master:6379"
---
apiVersion: v1
kind: Service
metadata:
  name: producer-service
spec:
  ports:
  - name: http
    port: 80
    targetPort: 8080
  selector:
    app: producer-service

消费者服务的清单与生产者类似,此不赘述。

部署完成后,系统可以正常工作。现在,到了关键一步:注入 Linkerd。我们通过 Linkerd CLI 将代理注入到我们的部署中:

# Inject Linkerd proxy into our services
kubectl get deploy -l app -o yaml \
  | linkerd inject - \
  | kubectl apply -f -

注入完成后,我们立刻遇到了第一个问题。消费者服务的日志开始疯狂报错:

fail: ConsumerService.EventConsumerWorker[0]
      Failed to create consumer group. Worker will not start.
      StackExchange.Redis.RedisConnectionException: It was not possible to connect to the redis server(s). Error connecting right now. To allow this multiplexer to continue retrying until it's able to connect, use AbortOnConnectFail=false.

生产者服务在尝试连接 Redis 时也报同样的错误。服务之间的通信被 Linkerd 代理了,但它们无法与 Redis 建立连接。

解决协议检测的陷阱

这里的坑在于 Linkerd 的自动协议检测机制。默认情况下,Linkerd 会检查流经代理的流量,以确定它是 HTTP/1、HTTP/2 还是 gRPC。如果协议无法识别,它会将其作为不透明的 TCP 流量来处理。然而,Redis 的协议是一种自定义的二进制协议,Linkerd 的探测可能会失败或超时,导致连接建立失败。

解决方案是明确地告诉 Linkerd:对于发往 Redis 6379 端口的流量,不要尝试协议检测,直接作为不透明 TCP 流量进行代理。我们通过在 Redis 的 Service 资源上添加一个 annotation 来实现这一点。

修改 redis-deployment.yaml 中的 Service:

apiVersion: v1
kind: Service
metadata:
  name: redis-master
  # This annotation is the key to solving the problem
  annotations:
    config.linkerd.io/opaque-ports: "6379"
spec:
  ports:
  - port: 6379
    targetPort: 6379
  selector:
    app: redis

重新应用这个清单后,奇迹发生了。消费者和生产者服务立即成功连接到 Redis,事件流开始正常流动。

现在,我们可以使用 linkerd viz 来验证我们的成果。

# Check TCP connections and their security status
linkerd viz edges deployment

输出会清晰地显示 producer-serviceconsumer-serviceredis-master 的连接,并且 TLS 列的值为 true

SRC                 DST               SRC_NS      DST_NS      TLS
...
producer-service    redis-master      default     default     true
consumer-service    redis-master      default     default     true
...

这证明了我们的核心构想是可行的:Linkerd 成功地为应用到 Redis 的 TCP 连接提供了透明的 mTLS 加密。应用程序代码一行未改,却获得了与服务间调用同等级别的传输层安全。

实施零信任:网络授权策略

仅仅加密流量是不够的。零信任架构的核心是“从不信任,总是验证”。这意味着我们需要明确的策略,规定“谁”可以访问“什么”。在我们的场景中,只有生产者和消费者服务应该能够访问 Redis 服务。

Linkerd 通过 AuthorizationPolicyServer 两种 CRD 来实现这一点。

  1. Server: 定义了一个或多个 Pod 上的一个端口,代表一个逻辑服务,可以成为授权策略的目标。
  2. AuthorizationPolicy: 选择一个或多个 Server,并定义允许访问这些 Server 的客户端身份。

首先,我们为 Redis 服务创建一个 Server 资源。

redis-server.yaml:

apiVersion: policy.linkerd.io/v1beta1
kind: Server
metadata:
  name: redis-server
  namespace: default
spec:
  podSelector:
    matchLabels:
      app: redis
  port: 6379

这个 Server 选择了所有带有 app: redis 标签的 Pod,并暴露了它们的 6379 端口。

接下来,我们创建 AuthorizationPolicy。Linkerd 中的身份是基于 Kubernetes 的 ServiceAccount。因此,我们需要确保生产者和消费者 Pod 都有各自的 ServiceAccount

假设我们已经为它们创建了名为 producer-saconsumer-saServiceAccount,并更新了 Deployment 以使用它们。现在,我们可以定义策略。

redis-auth-policy.yaml:

apiVersion: policy.linkerd.io/v1alpha1
kind: AuthorizationPolicy
metadata:
  name: redis-access-policy
  namespace: default
spec:
  targetRef:
    group: policy.linkerd.io
    kind: Server
    name: redis-server
  requiredAuthenticationRefs:
    - name: producer-auth
      kind: MeshTLSAuthentication
    - name: consumer-auth
      kind: MeshTLSAuthentication
---
apiVersion: policy.linkerd.io/v1alpha1
kind: MeshTLSAuthentication
metadata:
  name: producer-auth
  namespace: default
spec:
  identities:
    # This identity string format is: <serviceAccountName>.<namespace>.serviceaccount.identity.<clusterDomain>
    - "producer-sa.default.serviceaccount.identity.cluster.local"
---
apiVersion: policy.linkerd.io/v1alpha1
kind: MeshTLSAuthentication
metadata:
  name: consumer-auth
  namespace: default
spec:
  identities:
    - "consumer-sa.default.serviceaccount.identity.cluster.local"

这个策略非常明确:

  • 它作用于我们刚刚定义的 redis-server
  • 它要求客户端必须通过 MeshTLSAuthentication 进行认证。
  • 允许的身份(identities)只有两个:producer-saconsumer-sa

应用这些策略后,我们来做一个实验。创建一个没有 ServiceAccount 或者使用不同 ServiceAccount 的临时 Pod,尝试用 redis-cli 连接 redis-master:6379。连接将会被 Linkerd 代理拒绝。这验证了我们的零信任网络策略已经生效。

最终架构与代码层面的不变性

最终,我们的架构看起来是这样的:

graph TD
    subgraph Kubernetes Cluster
        subgraph "Producer Pod"
            P_App[ASP.NET Core Producer]
            P_Proxy[Linkerd Proxy]
            P_App -- "redis-master:6379" --> P_Proxy
        end

        subgraph "Consumer Pod"
            C_App[ASP.NET Core Consumer]
            C_Proxy[Linkerd Proxy]
            C_App -- "redis-master:6379" --> C_Proxy
        end

        subgraph "Redis Pod"
            R_App[Redis Server]
            R_Proxy[Linkerd Proxy]
            R_Proxy -- "localhost:6379" --> R_App
        end

        P_Proxy -- "mTLS over TCP" --> R_Proxy
        C_Proxy -- "mTLS over TCP" --> R_Proxy
    end

    ExternalClient --> |HTTP POST /events| P_Proxy

    linkStyle 0 stroke-width:0px;

这个架构最优雅的地方在于,ASP.NET Core 层的代码完全不需要感知 mTLSAuthorizationPolicy 的存在。ConnectionMultiplexer 依然连接到 redis-master:6379,它认为自己正在建立一个普通的、未加密的 TCP 连接。实际上,这个连接被 Pod 内的 Linkerd 代理拦截,代理代表应用与 Redis Pod 的代理建立了 mTLS 加密的隧道,并通过了身份验证。

这种平台层面的安全抽象,极大地降低了开发者的心智负担,提升了系统的整体安全性。安全不再是一个需要每个开发者都去关心的应用层问题,而是由平台统一提供和强制执行的保障。

局限性与未来展望

这个方案并非没有边界。首先,它解决的是传输层安全(in-transit),数据在 Redis 中仍然是明文存储的(at-rest)。如果需要对落盘数据加密,还需要依赖 Redis 本身或存储层的加密能力。

其次,引入代理不可避免地会带来微小的性能开销。Linkerd 的代理(linkerd-proxy)是用 Rust 编写的,性能极高,通常这种开销在微秒级别,对于大多数应用可以忽略不计。但在每秒处理数十万甚至上百万消息的极端场景下,必须进行详尽的性能压测来评估其影响。

未来的一个演进方向是,将这种模式扩展到其他类型的 TCP 后端服务,如 PostgreSQL、Elasticsearch 等。通过为它们的服务配置 opaque-ports annotation 并应用相应的 AuthorizationPolicy,我们可以用一套统一的、基于服务网格的零信任模型来保护所有内部基础设施的访问,构建一个真正意义上的安全内网。


  目录