在使用 AWS Lambda 处理 Apache Kafka 时的扩展改进 计算博客
使用 AWS Lambda 处理 Apache Kafka 的扩展改善
主要要点
AWS Lambda 正在改善其在处理 Apache Kafka 事件源时的自动扩展行为。Lambda 增加了预设初始消费者的数量,提升消费者的扩展速度,并防止消费者过快缩减。此次改进不需要额外采取任何行动,也不会产生额外费用。在 AWS 上运行 Kafka
Apache Kafka 是一个流行的开源平台,用于构建实时流数据管道和应用程序。您可以在本地或在 Amazon EC2 上部属和管理自己的 Kafka 解决方案。
Amazon 管理的 Apache Kafka 流服务 (MSK) 是一个完全管理的服务,使构建和运行使用 Kafka 处理流数据的应用程序变得更加简单。 MSK Serverless 是 Amazon MSK 的一种类型,允许您运行 Kafka 而无需管理和扩展集群容量。它会自动配置和扩展容量,同时管理主题中的分区,让您可以轻松地流式数据,而不必担心正确调整或扩展集群。MSK Serverless 提供基于吞吐量的定价模型,因此您仅需为使用的部分付费。欲了解更多信息,请参见 使用 Kafka 构建您的流应用程序。
使用 Lambda 消费 Kafka 中的记录
在传统的基于伺服器的架构下,处理流数据可能很复杂,尤其是在需要实时反应时。许多组织在管理和扩展其串流平台上花费了大量时间和成本。为了快速反应,他们必须为高峰负载进行配置,这增加了复杂性。
一元机场url配置Lambda 和无伺服器架构消除了处理 Kafka 流时的繁重工作。您无需管理基础设施,可以减少运营开销,降低成本,且按需扩展。这使您能更专注于构建流应用程序。您可以使用 多种编程语言 写 Lambda 函数,这在处理流数据时提供了灵活性。
Lambda 事件源映射
Lambda 可以本地集成到您的 Kafka 环境中,作为消费者即时处理流数据。
要从 Kafka 消费流数据,您需要在 Lambda 函数上配置 事件源映射 (ESM)。这是一个由 Lambda 服务管理的资源,与您的函数分开。它会不断从 Kafka 集群中的主题轮询记录。ESM 可以选择性地过滤记录,并将其批量处理成有效载荷。然后,它会调用 Lambda 调用 API 以同步方式将有效载荷传递给您的 Lambda 函数进行处理。
由于 Lambda 管理轮询器,您无需在多个团队间管理一组消费者。每个团队可以创建和配置自己的 ESM,而 Lambda 将处理轮询。
扩展和吞吐量
Kafka 使用分区来提高吞吐量并将记录负载分散到集群中的所有代理上。
Lambda 的 事件源映射 资源包括轮询器和处理器。轮询器具有从 Kafka 分区读取记录的消费者。轮询分配器将其发送到处理器,后者负责将记录批量处理并调用您的函数。
当您创建 Kafka 事件源映射时,Lambda 会分配消费者以处理 Kafka 主题中的所有分区。此前,Lambda 为每个消费者分配的处理器最少是一个。
通过这些扩展改善,Lambda 现在分配多个处理器来提升处理性能,减少单次调用拖慢整个处理流的可能性。
每个消费者将记录发送至多个平行运行的处理器,以应对增加的工作负载。每个分区中的记录仅分配给一个处理器,以保持顺序。
Lambda 自动根据工作负载上下调整消费者和处理器的数量。Lambda 每分钟采样主题中所有分区的消费者偏移延迟。如果延迟增加,这意味著 Lambda 无法跟上从分区处理记录的速度。
扩展算法考虑了当前的偏移延迟,以及新增消息到达主题的速率。Lambda 可以在三分钟内达到消费者的最大数量,以迅速降低偏移延迟。同时,Lambda 也在减少缩减的行为,以确保记录能更快处理且降低延迟,特别是对于突发工作负载。
所有轮询器的总处理器只能扩展到主题中的分区总数。
您可以通过消费者指标 consumerlag 和 consumeroffset 来监测 Kafka 主题的吞吐量。

要检查函数的平行调用次数,您还可以监测 并发指标。并发数大约等于所有轮询器中处理器的总数,具体取决于处理器的活动。比如,如果三个轮询器的五个处理器在某个 ESM 中运行,函数的并发数大约为 15 (5 5 5)。
看到改善的扩展效果
有许多 无伺服器模式 可以使用 Lambda 处理 Kafka 流。要设置 Amazon MSK Serverless,请参考 GitHub 仓库 中的说明:
创建一个包含 1000 个分区的示例 Amazon MSK Serverless 主题。bash /kafkatopicssh create bootstrapserver {bootstrapserver} commandconfig clientproperties replicationfactor 3 partitions 1000 topic msk1000p
使用 UUID 作为键来将记录添加到主题,以便在分区之间均匀分布记录。此范例添加了 1300 万条记录。bash for x in {113000000} do echo (uuidgen r)messagex done /kafkaconsoleproducersh brokerlist {bootstrapserver} topic msk1000p producerconfig clientproperties property parsekey=true property keyseparator= producerproperty acks=all
创建基于 此模式 的 Python 函数,记录处理的记录。修改函数代码以插入 01 秒的延迟以模拟记录处理。python import json import base64 import time
def lambdahandler(event context) i = 1 for record in event[records] for messages in event[records][record] print() print(Record number str(i)) print(Topic str(messages[topic])) print(Partition str(messages[partition])) print(Offset str(messages[offset])) print(Timestamp str(messages[timestamp])) print(TimestampType str(messages[timestampType])) decodedKey = base64b64decode(messages[key])decode(ascii) if messagesget(key) else null decodedValue = base64b64decode(messages[value])decode(ascii) if messagesget(value) else null print(Key = str(decodedKey)) print(Value = str(decodedValue)) i = 1 timesleep(01) return { statusCode 200 }
配置 ESM 指向之前创建的集群和主题。使用预设的批量大小 100。将 StartingPosition 设置为 TRIMHORIZON 以从流的开头进行处理。部署函数,这也会添加并配置 ESM。查看 Amazon CloudWatch 的 ConcurrentExecutions 和 OffsetLag 指标以查看处理情况。随著扩展改善,一旦配置好 ESM,ESM 和函数会自动扩展以处理分区数量。
提高数据处理吞吐量
您的函数必须跟上流量速率之重要性不言而喻。持续增长的偏移延迟表明函数处理无法跟上。如果延迟的年龄相对于流的保留时间过高,则可能会因记录到期而丢失数据。
此值通常不应超过流保留期的 50。当值达到流保留期的 100 时,数据将丢失。一个临时解决方案是增加流的保留时间,让您有更多时间解决问题,从而避免数据丢失。
有几种方法可以提高处理吞吐量。
通过使用 内容过滤 避免处理不必要的记录,控制 Lambda 发送到函数的记录。这有助于减少函数的流量,简化代码,降低总成本。Lambda 根据分区数量在所有轮询器之间分配处理器,每个分区最多分配一个并发的 Lambda 函数。您可以通过增加分区数量来增加处理的 Lambda 函数数量。对于计算密集型函数,您可以增加 分配给函数的内存,这也会增加可用的虚拟 CPU 数量。这可以帮助缩短处理函数的持续时间。Lambda 以可配置的批量大小从 Kafka 进行轮询。您可以增加批量大小以在单次调用中处理更多记录。这可以改善处理时间,并降低成本,特别是当您的函数有较长的初始化时间时。较大的批量大小增加了处理批中第一条记录的延迟,但可能减少了处理批中最后一条记录的延迟。对于优化分区容量,成本和延迟之间有一定的权衡,而决策则取决于您的工作负载需求。确保您的生产者使用有效的分区键策略均匀分配记录到各个分区。当单一键占主导地位时,工作负载会产生不均衡,形成热分区,进而影响吞吐量。参见 提高数据处理吞吐量 获得更多指导。
结论
如今, AWS Lambda 在处理来自 Apache Kafka 事件源的数据时,改善了自动扩展行为。Lambda 增加了预设初始消费者的数量,提升了他们的扩展速度,并确保他们不会过快缩减。这次改进不需要额外采取任何行动,也不会产生额外费用。
您可以在现有工作负载中探索这些扩展改善,或部署一个 Amazon MSK 集群,试用其中的 模式 以测量处理时间。
要探索使用 Lambda 处理 Kafka 流,请参见 学习指南。
欲获得更多无伺服器学习资源,请访问 Serverless Land。
标签 无伺服器
使用 AWS IoT 设备管理作业安排远程操作 官方博客 物联网
使用 AWS IoT 设备管理作业调度远程操作关键要点在本文中,我们将介绍如何使用 AWS IoT 设备管理作业功能,远程管理 IoT 设备的操作并调度必要的更新。您将学习如何创建调度作业,控制作业的启动和结束时间,以及在特定时间窗口内执行关键任务,以减少对正常使用的干扰。简介一旦物联网 (IoT)...
如何无缝迁移流量在 Direct Connect 网关之间 网络与内容交付
如何无缝迁移 Direct Connect 网关之间的流量关键要点在本文中,我们深入探讨了高盛如何在受控的情况下迁移其关键网络组件的所有权,特别是如何在保持端到端连接性的同时迁移 Direct Connect 网关之间的流量。这种迁移方式对于企业的合并与收购或减少技术负担等场景都可能具有实用价值。&...