实现 Apache Flink 实时数据增强模式 大数据博客
实现 Apache Flink 实时数据丰富模式
作者:Luis Morales 和 Lorenzo Nicora 日期:2023年11月15日分类: 高级, Amazon 管理的 Apache Flink 服务, 分析, 思想领导力原文链接 评论
关键要点
实时数据处理使企业能够对数据做出及时响应,从而提升客户体验。数据流工作负载常常需要通过外部源如数据库或其他数据流对数据进行丰富。本文讨论了三种数据丰富模式:同步丰富、异步丰富和缓存丰富,并使用合成数据进行了性能测试。不同模型的吞吐量存在显著差异,缓存丰富模式表现最佳。数据流处理让您能够实时对数据进行操作。实时数据分析可以帮助您实现及时的优化响应,同时提升整体客户体验。
流数据工作负载通常需要通过外部源例如数据库或其他数据流对流中的数据进行丰富。预加载参考数据可以提供低延迟和高吞吐量。然而,这种模式可能不适合某些类型的工作负载,例如:
参考数据频繁更新流应用需要进行外部调用以计算业务逻辑输出的准确性很重要,不应使用过时的数据参考数据的基数非常高,参考数据集过于庞大,无法保持在流应用的状态中
例如,如果您从传感器网络接收温度数据,并需要获取额外的传感器元数据以分析这些传感器与物理地理位置的关系,则需要用传感器元数据丰富温度数据。
Apache Flink 是一个分布式计算框架,提供有状态的实时数据处理。它为批处理和流处理构建了统一的 API,使开发人员轻松处理有界和无界数据。 Amazon 管理的 Apache Flink 服务Amazon Kinesis 数据分析的继任者是 AWS 提供的一项服务,可以为运行 Apache Flink 应用程序提供无服务器、全托管的基础设施。开发人员可以轻松构建高可用、容错性强和可扩展的 Apache Flink 应用程序,而无需成为 AWS 上构建、配置和维护 Apache Flink 集群的专家。
针对实时数据丰富,您可以在 Amazon 管理的 Apache Flink 服务中使用几种不同的方法,具体取决于使用案例和 Apache Flink 抽象层级。每种方法对吞吐量、网络流量及 CPU或内存使用率有不同的影响。如需了解数据丰富模式的概述,请参考 Amazon 管理的 Apache Flink 中的常见流数据丰富模式。
本文将介绍如何使用 Apache Flink 实现实时流事件的数据丰富以及如何优化性能。为了比较不同丰富模式的性能,我们基于合成数据进行了性能测试。该测试结果可用作一般参考。需要注意的是,您实际的 Flink 工作负载性能将取决于多种因素,例如 API 延迟、吞吐量、事件大小和缓存命中率。
我们将讨论三种丰富模式,详情见下表:
同步丰富异步丰富缓存丰富丰富方法同步、每条记录请求外部端点非阻塞并行请求外部端点,使用异步 I/O经常访问的信息缓存到 Flink 应用状态中,具有固定的 TTL数据新鲜度始终最新的丰富数据始终最新的丰富数据丰富数据可能过时,取决于 TTL开发复杂性模型简单难以调试,因涉及多线程难以调试,依赖于 Flink 状态错误处理简单更复杂,使用回调简单对丰富 API 的影响最大:每条消息一个请求最大:每条消息一个请求减少对丰富 API 的 I/O取决于缓存 TTL应用延迟对丰富 API 延迟敏感对丰富 API 延迟不太敏感减少应用延迟取决于缓存命中率其他注意事项无无可定制 TTL,自 117 版本以来仅支持同步实现比较测试结果吞吐量350 事件每秒2000 事件每秒28000 事件每秒解决方案概述
在本篇文章中,我们使用一个温度传感器网络的示例下文架构图中的组件 1,该网络发出传感器信息,例如温度、传感器 ID、状态以及该事件生成的时间戳。这些温度事件被输入到 Amazon Kinesis 数据流2。下游系统还需要传感器的品牌和国家代码信息,以便分析,例如每个品牌的可靠性和每个工厂的温度。
基于传感器 ID,我们从传感器信息 API3丰富传感器信息,该 API 提供品牌、位置和图像信息。最终得到的丰富流被发送到另一个 Kinesis 数据流,并可以在 Amazon 管理的 Apache Flink Studio 笔记本中进行分析4。
前提条件
要开始实施实时数据丰富模式,您可以复制或下载 GitHub 仓库 中的代码。该仓库实现了我们描述的 Flink 流应用程序。您可以在 READMEmd 文件中找到有关如何在 Amazon 管理的 Apache Flink 或其他可用的 Flink 部署选项中设置 Flink 的说明。
如果您想了解如何实现这些模式以及如何优化 Flink 应用程序的性能,可以直接按照本文的描述进行,无需部署示例。
项目概述
该项目的结构如下:
docs/ 包含项目文档src/ main/java/ 包含所有 Flink 应用程序代码 ProcessTemperatureStream 决定丰富策略的主类 enrichment 包含各种丰富策略同步、异步和缓存 event 事件 POJOs serialize 序列化工具 utils 参数解析工具 test/ 包含所有 Flink 测试代码
ProcessTemperatureStream 类中的 main 方法设置运行环境,并根据参数从命令行读取参数如果是本地环境,或者使用 Amazon 管理的 Apache Flink 的应用属性。根据 EnrichmentStrategy 参数,它决定选择哪种实现:同步丰富默认、异步丰富或基于 Flink 概念的缓存丰富 KeyedState。
javapublic static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironmentgetExecutionEnvironment() ParameterTool parameter = ParameterToolUtilsgetParameters(args env)
String strategy = parameterget(EnrichmentStrategy SYNC)switch (strategy) { case SYNC new SyncProcessTemperatureStreamStrategy()run(env parameter) break case ASYNC new AsyncProcessTemperatureStreamStrategy()run(env parameter) break case CACHED new CachedProcessTemperatureStreamStrategy()run(env parameter) break default throw new InvalidParameterException(请选择现有丰富策略之一 (SYNCASYNCCACHED))}}
我们将在以下部分详细讨论这三种方法。
同步数据丰富
当您希望从外部提供者丰富数据时,可以使用同步每条记录查找。当 Flink 应用处理传入事件时,它会进行外部 HTTP 调用,并在发送每个请求后,必须等待直到收到响应。
由于 Flink 是同步处理事件,因此执行丰富的线程会被阻塞,直到收到 HTTP 响应。这导致处理器在处理时间中闲置了较长时间。另一方面,同步模型在设计、调试和追踪上更简单。此外,它也允许您始终拥有最新数据。
可以这样将其集成进您的流应用程序:
javaDataStreamltEnrichedTemperaturegt enrichedTemperatureDataStream = temperatureDataStream map(new SyncEnrichmentFunction(parameterget(SensorApiUrl DEFAULTAPIURL)))
丰富功能的实现代码如下:
javapublic class SyncEnrichmentFunction extends RichMapFunction {
// HTTP 客户端和 ObjectMapper 的设置@Overridepublic EnrichedTemperature map(Temperature temperature) throws Exception { String url = thisgetRequestUrl temperaturegetSensorId() // 从传感器信息 API 获取响应 Response response = client prepareGet(url) execute() toCompletableFuture() get() // 解析传感器信息 SensorInfo sensorInfo = parseSensorInfo(responsegetResponseBody()) // 合并温度传感器数据和传感器信息数据 return getEnrichedTemperature(temperature sensorInfo)}// }
为优化同步丰富的性能,可以使用 KeepAlive 标志,因为 HTTP 客户端将用于多个事件的重用。
对于 I/O 绑定的操作符如外部数据丰富,提高应用程序的并行性是有意义的,而不会增加分配给应用的资源。这可以通过增加 Amazon 管理的 Apache Flink 应用程序的 ParallelismPerKPU 设置来实现。该配置描述了每个 Kinesis 处理单元KPU应用程序可以执行的并行子任务数量,较高的 ParallelismPerKPU 值可以实现 KPU 资源的充分利用。但请记住,增加并行性并不总是有效的,例如,当您 从具有少量分片或分区的源中消费。
在 Amazon 管理的 Apache Flink 的合成测试中,我们看到在单个 KPU 上,吞吐量约为每秒 350 个事件,使用每 KPU 4 的并行度和默认设置。
异步数据丰富
同步丰富并未充分利用计算资源。这是因为 Flink 等待 HTTP 响应。但 Flink 提供了用于外部数据访问的 异步 I/O。这允许您异步丰富流事件,因此可以在等待第一个元素的响应时发送其他元素的请求,并可以批处理请求以提高效率。
使用此模式时,您需要在 unorderedWait接收到响应后立即将结果发往下一个操作符,不考虑流中元素的顺序与 orderedWait在所有未完成的 I/O 操作完成后,将结果按原先放置在流中的顺序发送给下一个操作符之间进行选择。当您的用例不需要事件顺序时,unorderedWait 提供更好的吞吐量和更少的闲置时间。有关该模式的更多信息,请参考 使用 Amazon 管理的 Apache Flink 异步丰富您的数据流。
鲸鱼加速器最新版异步丰富可以如下添加:
javaSingleOutputStreamOperatorltEnrichedTemperaturegt asyncEnrichedTemperatureSingleOutputStream = AsyncDataStream unorderedWait( temperatureDataStream new AsyncEnrichmentFunction(parameterget(SensorApiUrl DEFAULTAPIURL)) ASYNCOPERATORTIMEOUT TimeUnitMILLISECONDS ASYNCOPERATORCAPACITY)
丰富功能的实现与同步实现相似。它首先作为 Java Future 获取传感器信息,表示异步计算的结果。信息一旦可用,它会解析该信息,然后将两个对象合并为 EnrichedTemperature:
javapublic class AsyncEnrichmentFunction extends RichAsyncFunction {
// HTTP 客户端和 ObjectMapper 的设置@Overridepublic void asyncInvoke(final Temperature temperature final ResultFutureltEnrichedTemperaturegt resultFuture) { String url = thisgetRequestUrl temperaturegetSensorId() // 从传感器信息 API 获取响应 FutureltResponsegt future = client prepareGet(url) execute() CompletableFuture supplyAsync(() gt { try { Response response = futureget() // 一旦传感器信息可用,解析它 return parseSensorInfo(responsegetResponseBody()) } catch (Exception e) { return null } }) thenAccept((SensorInfo sensorInfo) gt // 合并温度传感器数据和传感器信息数据 resultFuturecomplete(getEnrichedTemperature(temperature sensorInfo)))}// }
在 Amazon 管理的 Apache Flink 的测试中,我们看到在单个 KPU 上,吞吐量达到每秒 2000 个事件,使用每 KPU 2 的并行度和默认设置。
同步缓存数据丰富
尽管数据流中的许多操作侧重于独立处理单个事件例如事件解析,但某些操作会跨多个事件保留信息。这些操作,例如窗口操作,被称为有状态的,因为它们能够维护状态。
键控状态存储在内嵌的键值存储中,构想为 Flink 架构的一部分。此状态与状态操作符所消耗的流一起分区和分布。因此,访问键值状态被限制在键控流中,只能在键控或分区数据交换后访问,并且仅限于与当前事件的键相关的值。有关概念的更多信息,请参考 有状态流处理。
您可以使用键控状态来存储不经常变化的频繁访问信息,例如传感器信息。这不仅可以减少对下游资源的负载,还可以提高数据丰富的效率,因为无需为已获取的键重新请求外部资源也不需要重新计算信息。不过,请注意,Amazon 管理的 Apache Flink 将临时数据存储在 RocksDB 后端,这会增加检索信息的延迟。然而,由于 RocksDB 是本地节点处理数据的,因此这一过程比向外部资源请求信息要快,下面给出的示例就是证明。
要使用键控流,您必须使用 keyBy() 方法对流进行分区,这样确保同一键在此案例中为传感器 ID的事件会路由到同一工作节点。您可以如下实现:
javaSingleOutputStreamOperatorltEnrichedTemperaturegt cachedEnrichedTemperatureSingleOutputStream = temperatureDataStream keyBy(TemperaturegetSensorId) process(new CachedEnrichmentFunction( parameterget(SensorApiUrl DEFAULTAPIURL) parameterget(CachedItemsTTL StringvalueOf(CACHEDITEMSTTL))))
我们使用传感