APP下载

原理解析|Apache Flink结合Kafka构建端到端的 Exactly-Once 处理

消息来源:baojiabao.com 作者: 发布时间:2024-05-18

报价宝综合消息原理解析|Apache Flink结合Kafka构建端到端的 Exactly-Once 处理

文章目录:

Apache Flink 应用程序中的 Exactly-Once 语义Flink 应用程序端到端的 Exactly-Once 语义示例 Flink 应用程序启动预提交阶段在 Flink 中实现两阶段提交 Operator总结Apache Flink 自2017年12月释出的1.4.0版本开始,为流计算引入了一个重要的里程碑特性:TwoPhaseCommitSinkFunction(相关的Jira)。它提取了两阶段提交协议的通用逻辑,使得通过Flink来构建端到端的Exactly-Once程式成为可能。同时支援一些资料来源(source)和输出端(sink),包括Apache Kafka 0.11及更高版本。它提供了一个抽象层,使用者只需要实现少数方法就能实现端到端的Exactly-Once语义。

有关TwoPhaseCommitSinkFunction的使用详见文件: TwoPhaseCommitSinkFunction。或者可以直接阅读Kafka 0.11 sink的文件: kafka。

接下来会详细分析这个新功能以及Flink的实现逻辑,分为如下几点。

描述Flink checkpoint机制是如何保证Flink程式结果的Exactly-Once的显示Flink如何通过两阶段提交协议与资料来源和资料输出端互动,以提供端到端的Exactly-Once保证通过一个简单的示例,了解如何使用TwoPhaseCommitSinkFunction实现Exactly-Once的档案输出

一、Apache Flink应用程序中的Exactly-Once语义

当我们说‘Exactly-Once’时,指的是每个输入的事件只影响最终结果一次。即使机器或软件出现故障,既没有重复资料,也不会丢资料。

Flink很久之前就提供了Exactly-Once语义。在过去几年中,我们对Flink的checkpoint机制有过深入的描述,这是Flink有能力提供Exactly-Once语义的核心。Flink文件还提供了该功能的全面概述。

在继续之前,先看下对checkpoint机制的简要介绍,这对理解后面的主题至关重要。

次checkpoint是以下内容的一致性快照:应用程序的当前状态输入流的位置Flink可以配置一个固定的时间点,定期产生checkpoint,将checkpoint的资料写入持久储存系统,例如S3或HDFS。将checkpoint资料写入持久储存是异步发生的,这意味着Flink应用程序在checkpoint过程中可以继续处理资料。

如果发生机器或软件故障,重新启动后,Flink应用程序将从最新的checkpoint点恢复处理; Flink会恢复应用程序状态,将输入流回滚到上次checkpoint储存的位置,然后重新开始执行。这意味着Flink可以像从未发生过故障一样计算结果。

在Flink 1.4.0之前,Exactly-Once语义仅限于Flink应用程序内部,并没有扩充套件到Flink资料处理完后传送的大多数外部系统。Flink应用程序与各种资料输出端进行互动,开发人员需要有能力自己维护元件的上下文来保证Exactly-Once语义。

为了提供端到端的Exactly-Once语义 – 也就是说,除了Flink应用程序内部,Flink写入的外部系统也需要能满足Exactly-Once语义 – 这些外部系统必须提供提交或回滚的方法,然后通过Flink的checkpoint机制来协调。

分散式系统中,协调提交和回滚的常用方法是两阶段提交协议。在下一节中,我们将讨论Flink的TwoPhaseCommitSinkFunction是如何利用两阶段提交协议来提供端到端的Exactly-Once语义。

二、Flink应用程序端到端的Exactly-Once语义

我们将介绍两阶段提交协议,以及它如何在一个读写Kafka的Flink程式中实现端到端的Exactly-Once语义。Kafka是一个流行的讯息中介软件,经常与Flink一起使用。Kafka在最近的0.11版本中添加了对事务的支援。这意味着现在通过Flink读写Kafaka,并提供端到端的Exactly-Once语义有了必要的支援。

Flink对端到端的Exactly-Once语义的支援不仅局限于Kafka,您可以将它与任何一个提供了必要的协调机制的源/输出端一起使用。例如Pravega,来自DELL/EMC的开源流媒体储存系统,通过Flink的TwoPhaseCommitSinkFunction也能支援端到端的Exactly-Once语义。

在今天讨论的这个示例程式中,我们有:

从Kafka读取的资料来源(Flink内建的KafkaConsumer)视窗聚合将资料写回Kafka的资料输出端(Flink内建的KafkaProducer)要使资料输出端提供Exactly-Once保证,它必须将所有资料通过一个事务提交给Kafka。提交捆绑了两个checkpoint之间的所有要写入的资料。这可确保在发生故障时能回滚写入的资料。但是在分散式系统中,通常会有多个并发执行的写入任务的,简单的提交或回滚是不够的,因为所有元件必须在提交或回滚时“一致”才能确保一致的结果。Flink使用两阶段提交协议及预提交阶段来解决这个问题。

在checkpoint开始的时候,即两阶段提交协议的“预提交”阶段。当checkpoint开始时,Flink的JobManager会将checkpoint barrier(将资料流中的记录分为进入当前checkpoint与进入下一个checkpoint)注入资料流。

brarrier在operator之间传递。对于每一个operator,它触发operator的状态快照写入到state backend。

资料来源储存了消费Kafka的偏移量(offset),之后将checkpoint barrier传递给下一个operator。

这种方式仅适用于operator具有‘内部’状态。所谓内部状态,是指Flink state backend储存和管理的 -例如,第二个operator中window聚合算出来的sum值。当一个程序有它的内部状态的时候,除了在checkpoint之前需要将资料变更写入到state backend,不需要在预提交阶段执行任何其他操作。Flink负责在checkpoint成功的情况下正确提交这些写入,或者在出现故障时中止这些写入。

三、示例Flink应用程序启动预提交阶段

但是,当程序具有‘外部’状态时,需要作些额外的处理。外部状态通常以写入外部系统(如Kafka)的形式出现。在这种情况下,为了提供Exactly-Once保证,外部系统必须支援事务,这样才能和两阶段提交协议整合。

在本文示例中的资料需要写入Kafka,因此资料输出端(Data Sink)有外部状态。在这种情况下,在预提交阶段,除了将其状态写入state backend之外,资料输出端还必须预先提交其外部事务。

当checkpoint barrier在所有operator都传递了一遍,并且触发的checkpoint回拨成功完成时,预提交阶段就结束了。所有触发的状态快照都被视为该checkpoint的一部分。checkpoint是整个应用程序状态的快照,包括预先提交的外部状态。如果发生故障,我们可以回滚到上次成功完成快照的时间点。

下一步是通知所有operator,checkpoint已经成功了。这是两阶段提交协议的提交阶段,JobManager为应用程序中的每个operator发出checkpoint已完成的回拨。

资料来源和 widnow operator没有外部状态,因此在提交阶段,这些operator不必执行任何操作。但是,资料输出端(Data Sink)拥有外部状态,此时应该提交外部事务。

我们对上述知识点总结下:

旦所有operator完成预提交,就提交一个commit。如果至少有一个预提交失败,则所有其他提交都将中止,我们将回滚到上一个成功完成的checkpoint。在预提交成功之后,提交的commit需要保证最终成功 – operator和外部系统都需要保障这点。如果commit失败(例如,由于间歇性网络问题),整个Flink应用程序将失败,应用程序将根据使用者的重启策略重新启动,还会尝试再提交。这个过程至关重要,因为如果commit最终没有成功,将会导致资料丢失。因此,我们可以确定所有operator都同意checkpoint的最终结果:所有operator都同意资料已提交,或提交被中止并回滚。

四、在Flink中实现两阶段提交Operator

完整的实现两阶段提交协议可能有点复杂,这就是为什么Flink将它的通用逻辑提取到抽象类TwoPhaseCommitSinkFunction中的原因。

接下来基于输出到档案的简单示例,说明如何使用TwoPhaseCommitSinkFunction。使用者只需要实现四个函式,就能为资料输出端实现Exactly-Once语义:

beginTransaction – 在事务开始前,我们在目标档案系统的临时目录中建立一个临时档案。随后,我们可以在处理资料时将资料写入此档案。preCommit – 在预提交阶段,我们重新整理档案到储存,关闭档案,不再重新写入。我们还将为属于下一个checkpoint的任何后续档案写入启动一个新的事务。commit – 在提交阶段,我们将预提交阶段的档案原子地移动到真正的目标目录。需要注意的是,这会增加输出资料可见性的延迟。abort – 在中止阶段,我们删除临时档案。我们知道,如果发生任何故障,Flink会将应用程序的状态恢复到最新的一次checkpoint点。一种极端的情况是,预提交成功了,但在这次commit的通知到达operator之前发生了故障。在这种情况下,Flink会将operator的状态恢复到已经预提交,但尚未真正提交的状态。

我们需要在预提交阶段储存足够多的资讯到checkpoint状态中,以便在重启后能正确的中止或提交事务。在这个例子中,这些资讯是临时档案和目标目录的路径。

TwoPhaseCommitSinkFunction已经把这种情况考虑在内了,并且在从checkpoint点恢复状态时,会优先发出一个commit。我们需要以幂等方式实现提交,一般来说,这并不难。在这个示例中,我们可以识别出这样的情况:临时档案不在临时目录中,但已经移动到目标目录了。

在TwoPhaseCommitSinkFunction中,还有一些其他边界情况也会考虑在内,请参考Flink文件了解更多资讯。

总结

总结下本文涉及的一些要点:

Flink的checkpoint机制是支援两阶段提交协议并提供端到端的Exactly-Once语义的基础。这个方案的优点是: Flink不像其他一些系统那样,通过网络传输储存资料 – 不需要像大多数批处理程式那样将计算的每个阶段写入磁盘。Flink的TwoPhaseCommitSinkFunction提取了两阶段提交协议的通用逻辑,基于此将Flink和支援事务的外部系统结合,构建端到端的Exactly-Once成为可能。从Flink 1.4.0开始,Pravega和Kafka 0.11 producer都提供了Exactly-Once语义;Kafka在0.11版本首次引入了事务,为在Flink程式中使用Kafka producer提供Exactly-Once语义提供了可能性。Kafaka 0.11 producer的事务是在TwoPhaseCommitSinkFunction基础上实现的,和at-least-once producer相比只增加了非常低的开销。这是个令人兴奋的功能,期待Flink TwoPhaseCommitSinkFunction在未来支援更多的资料接收端。

via:

https://www.ververica.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka

作者:Piotr Nowojski

翻译| 周凯波

周凯波,阿里巴巴技术专家,四川大学硕士,2010年毕业后加入阿里搜寻事业部,从事搜寻离线平台的研发工作,参与将搜寻后台资料处理架构从MapReduce到Flink的重构。目前在阿里计算平台事业部,专注于基于Flink的一站式计算平台的建设。

2019-10-31 05:55:00

相关文章