论文:
Apache Flink™: Stream and Batch Processing in a Single Engine
官网

介绍

首先介绍了Flink是个啥东西,是一个流批一体的处理系统
传统情况下要同时处理流、批,一般需要部署两套系统,这需要两套API模型
其开发、部署、维护成本都比较高,相当于高两套
论文也提到了,以前对实时数据处理要求没那么多,基本上都是批的为主,不过现在流处理,实时处理的要求也越来越多了

lambda 架构是混合了流、批系统的
流系统可能会返回大致数据,然后批系统不断迭代处理,最终纠正出一个精确结果

相关的流系统:

  • Apache Storm
  • IBM Infosphere Streams
  • Microsoft StreamInsight
  • Streambase

相关的批系统

  • Apache Spark
  • Apache Drill

Flink可以处理:

  • 实时分析
  • 连续的数据管道
  • 历史数据处理,批处理
  • 交互式算法,机器学习、图分析

如果假设 持久化的消息对了,那么数据流可以任意重放
API模型都是统一的,对于流批都是一样的,区别就是流的处理的开始点上
对于批处理,Flink是把它当做流处理的一个特例,有固定窗口的,使用了特定的数据结构和算法以及调度策略,这样其上层就可以支持不同的批处理模型,如:机器学习、图分析,但是底层还是基于流系统的

支持不用的事件事件

  • event-time
  • ingestion-time
  • processing-time

不过论文中也提到了,有两种情况,还是需要批处理

  • 比如传统遗留的流处理实现
  • 某些特定的分析算法,这些在流处理模型上没有 高效的处理算法

架构

从其软件栈来看分为四层

  • 部署层,本地模式,集群模式,云环境模式
  • 核心层,Distributed Streaming Dataflow
  • API层,包括:DataSet API(批处理)、DataStream API(流处理)
  • 库函数层,包括:Flink ML(机器学习)、GElly(图)、Table API(批);CEP(复杂事件处理)、Talbe API(流)

Flink的核心层是分布式的数据流引擎,用来执行数据流程序
Flink的运行程序是连接数据流的 有状态算子DAG,其核心层API包含两种

  • 用于批处理的 DataSet API,处理有边界数据
  • 用于流吹了的 DataStream API,处理无边界数据

Flink的核心层是被当做一个数据流引擎,所以可以同时处理这种情况 在往上就是一些DSL库,它们有DataSet 和 DataStream API生成
当前有支持机器学习的Flink ML,图计算的Gelly,以及处理SQL的 Table API

1

集群架构包含三个组件

  • 客户端
    • 将程序代码转换为数据流图,提交到JobManager
    • 转换阶段还包含了一些数据类型校验
    • DataSet程序还有CBO优化
  • JobManager
    • 协调分布式数据流的执行,跟踪每个算子和流的状态,发布新的算子
    • 协调checkpoint和恢复
    • 每个checkpoint包含了最小化的元数据集,这样备份的JobManager可以用来重建和恢复数据流执行
  • TaskManagers
    • 真正做数据处理的
    • 每个集群至少要有一个TaskManagers,用来执行流产生的算子
    • 定期向JobManager汇报自身状态
    • 内部维护了一个buffer pool,也会将流做物化
    • 会跟其他TaskManager通过网络交互,交互算子之间的数据流

2

Streaming Dataflows

这里介绍 DataSet和DataStream API的下层,数据流
不管上层如果编写代码,最终由Flink执行的就是数据流图

Dataflow Graphs

数据流图就是 DAG,由两部分组成

  • 有状态的算子
  • 数据流:被一个算子产生,可以被另一个算子消费

数据流图可以被并行执行的,算子可以被并行化到一个、或者多个:subtasks
流会被拆分到 一个、或者多个流分区中,每个产生的 subtask 一个分区
所谓的有状态算子,就是一堆逻辑操作:如filter,hash-join,stream window functions等等
流在:生产者 和 消费者 之间分发数据,这里包含各种模式:

  • 点对点
  • 广播
  • 重分区
  • 扇出
  • 合并

Data Exchange through Intermediate Data Streams

3

上图中包含了:中间数据,这个中间数据是由一个算子的逻辑操作产生的
它可以被其他一个、多个算子消费,中间数据可能会被物化、也可能没有
数据流的特殊行为参数,是由上层的API,如DataSet API控制的

相关概念 Pipelined intermediate streams

  • 在并非运行的 生产者 消费者之间交换数据,从而实现流水线执行
  • 这里可能会存在消费端处理不过来,背压生产者,使用了buffer pool,弥补了短期波动
  • Flink使用流水线来处理 流数据传播,以及处理大部分批数据
  • blocking streams 用来处理有界数据,在消费端可用之前,会缓存所有产生的算子数据
  • 所以 blocking streams 会将产生和消费的算子隔离到不同的阶段,它需要更多内存,会溢出到磁盘,但不会产生背压
  • 对于sort-merge、joins,会将流水线计划中断,于是用来隔离这种连续算子操作的

Balancing Latency and Throughput

  • Flink的数据交换是根据buffer交换实现的
  • 当数据在生产者端可用时,会写入buffer,然后发送给消费者
  • 这里可以实现两种场景: 高吞吐、高性能
  • 如果buffer满了再发送就是 高吞吐场景;如果给buffer设置了超时时间则是 高性能场景
  • 图4中是buffer超时的效果,30个机器,120core,99%的延迟为20ms时,吞吐量为1.5M/s
  • 如果超时时间调大,99%延迟为50ms,吞吐量为80M/s,随着延迟的增加吞吐量也在增加

4

Control Events

  • 它被当做交换数据的一部分发送,接受者收到这些数据后就会执行对应的控制命令,控制事件包括
  • checkpoint barriers,将流 拆分为 pre-checkpoint 和 postcheckpoint 来协调checkpoint
  • watermarks,流区分中的事件时间进展信号
  • iteration barriers,流分区已经到达superstep的信号,在循环数据流之上的Bulk/StaleSynchronous并行迭代算法
  • 控制事件假设是按顺序达到的,为此使用一元运算符(unary operator)消费单个流分区,这样实现了FIFO
  • 算子以先来后到的方式,接受多余一个流分区合并流,为保持流的速率,避免背压
  • 在没有任何形式的重分区、广播、留给算子实现的无序记录处理时,Flink中的 streaming dataflows不提供顺序保证
  • 所以就不提供任务顺序保证,因为大多数算子 hash-join,map也不需要顺序
  • 为弥补这一点,可以通过 event-time来实现

Fault Tolerance

Flink提供了exactly-once-processing保证,通过checkpoint机制来实现,对于失败场景只需要部分重执行即可为此需要源端

  • 提供持久化、可重放的保证,比如Apache Kafka
  • 对于没有持久化保证的,其实也能做到,那就是源算子支持WAL就行了

Flink的exactly-once-processing是通过分布式一致性快照实现的
Flink获取了算子状态的快照,每隔一段时间包含当前输入流的位置
这种实现的挑战是:不能把正在执行的任务给停住,所以要在保持延迟/吞吐量的前提下,实现快照
Flink使用的技术是:Asynchronous Barrier Snapshotting,论文的引用中也提到了这篇论文

大致实现

  • Barriers是 将控制流注入到 逻辑时间相对应的输入流中

  • 之后将这个流逻辑上分割为两部分:当前时间快照的部分,快照之后的部分

  • 一个算子从上游接收到 barriers后,就执行了一个校准阶段,确保所有输入的barriers都被接受

  • 之后算子会将它的状态,比如 滑动窗口的内容、自定义的数据结构等等写入持久存储中(HDFS)

  • 一旦这个备份完成后,算子就将barrier转发给下游,最终所有的算子都会将自身的状态注册到快照中,于是全局的快照也完成了

  • 图5中, 快照 t2包含了 在 t2-barrier之前的消费所有记录结构的,所有算子状态

  • ABS有点类似 Chandy-Lamport 的异步的分布式快照算法

  • 这里并不需要checkpoint新产生的记录,只需要将当前状态写入持久存储即可,这样也能保持存储的最小化

  • 恢复:所有算子返回的失败状态 –> 最后成功的快照中这些算子对应的状态

  • 从最新barrier的快照中,重启输入流

  • 恢复时重计算的最大数量: 两个连续barriers中间的输入记录总数,也就是 t2 barrier 到 t1 barrier这段

  • 重放上游subtask 中的未处理记录buffer,就可以实现部分恢复

  • ABS这种机制的好处:不需要暂停计算就能实现exactly-once

  • 跟其他形式的控制流完全解耦(通过触发窗口计算事件,不用将窗口机制限制为checkpoint的倍数)

  • 跟存储设备也完全解耦(可以存到文件系统、数据中)

5

Iterative Dataflows

像机器学习、图处理这种的,都需要增量、迭代模式
一般是通过如下几种实现的

  • 每次迭代时提交新job
  • 增加一个额外node到DAG中
  • feedback edges,反馈边缘

Flink中的迭代模式实现叫做:iteration steps
这其中有一些特殊的算子,他们本身是包含了一个执行图的,如下图6所示 主要实现如下:

  • Flink中的迭代 head、tail这些任务会隐式的连接到 feeback edges
  • head、tail这些任务会建立一个主动的feedback channel 反馈频道
  • 也会为 反馈频道内传输的从处理数据记录 提供协调
  • 对于实现了任何结构化并行迭代模式的类型,都需要协调
  • 比如:Bulk Synchronous Parallel (BSP)这种模型,它是用控制事件实现的

6

Stream Analytics on Top of Dataflows

Flink的 DataStream API基于运行时环境,提供了一个完全的流分析框架,包括

  • 管理无序的事件事件
  • 定义窗口
  • 维护和更新 用户定义的状态

而DataStream API也是基于 DataStream这个概念来的,它就是一个无边界的,给定类型的不可变元素集合
Flink的运行时环境已经提供了 流水线数据传输、连续的有状态算子、容错机制
而在运行时环境之上覆盖了一层 流处理器,其实就是实现了 窗口系统 和 状态接口,这两个东西对于运行时环境来时是不可见的,只当他两是 有状态算子实现而已

The Notion of Time

Flink有两种时间概念

  • 事件事件,一个事件源产生的时间,如传感器信号管理的时间戳、移动设备的时间等
  • 处理时间,用于处理数据机器的墙上时间

因为分布式系统存在时钟偏移,事件事件和处理时间之间的偏差可能会无限大,如果是基于墙上时钟来处理事件事件可能会有任意长的延迟
Flink为避免这种情况,会定期插入了一个特殊的事件:low watermarks,用来标记全局的进展
水印中包含了一个 事件属性t,表示所有 低于t的事件都已经进入算子了
水印可以帮助 执行引擎 以正确的事件顺序处理事件,并序列化算子,比如:通过统一的进度度量做窗口计算

大致原理

  • 水印来自于于拓扑图的源头,这样就可以确定未来元素的固有事件
  • 水印从源头,通过其他数据流的算子进行传播
  • 算子也决定了如何跟水印交互,比如简单的map、filter就直接转发接收到的水印
  • 而复杂的算子会基于水印(事件时间)首先计算出通过水印触发的结果,然后再转发
  • 如果算子的输入超过一个,系统只转发最小的输入水印,以确保正确结果
  • 基于处理时间的Flink程序依靠机器时间,这样会导致时间不可靠,也就是恢复时的回放会不一致
  • 而基于事件时间就能提供更高可靠性语义,但是event-time-processing-time可能会导致延迟
  • Flink为此提供了第三种时间概念,作为事件时间点一种特例,ingestion-time 摄入时间
  • 这是事件进入Flink的时间,它提供了比事件时间更低的延迟,也比处理时间 有更准确的结果

Stateful Stream Processing

大多数DataStream API算子看起来像是函数式的,无副作用的算子,不过他们提供了高效的有状态计算
比如

  • 机器学习的模型建立
  • 图分析
  • 用户session处理
  • 窗口聚合

这里有很多种类型,简单的比如 counter、或者 sum 这样的,也有复杂的,比如:机器学习应用中的分类树、或者大型稀疏矩阵
流窗口 是一个有状态的算子,将记录分配给内存中不断更新的buckets,作为算子状态的一部分

Flink中的状态是通过API显示提供的

  • 算子接口、注释,用于在一个算子的作用域内 静态注册显示的局部变量
  • 算子状态抽象,用于声明key-value状态分区,以及他们关联的算子

Flink的状态也可以自定义的,这样就实现了流应用中的状态的高度灵活性管理
使用 StateBackend 来配置自定义的状态存储、以及checkpoint,而Flink的exactly-once语义也保证了这些状态的一致性

Stream Windows

无边界的数据没法直接计算,是通过不断的在小范围内迭代计算出来的,这种不断演化的逻辑视图就是 窗口
Flink的有状态算子中集成了窗口操作,包含了三个核心功能

  • 一个窗口的分配器 assigner
  • 一个可选的触发器 trigger
  • 一个驱逐器 evictor

这三个功能内置就已经提供了,比如 滑动窗口,当然用户也可以自定义
分配器的责任是将每个记录分配给逻辑窗口,比如:

  • 当涉及到事件事件窗口时,它是基于记录的时间戳
  • 而对于滑动窗口,一个元素可以属于多个逻辑窗口

触发器,当算子关联的窗口定义被执行时,就会调用到触发器 驱逐器,也是一个可选的,它决定每个窗口内哪些记录可以被保留

Flink的窗口能覆盖各种范围,比如:周期时间、计数、标点符号、地标、session、增量窗口等
Flink的窗口机制整合了乱序处理,跟谷歌的Dataflow类似

如下是一个 6秒范围的窗口定义,每2秒滑动一次(分配器),一旦水印通过窗口(触发器)末端就计算结果

1
2
3
stream
  .window(SlidingTimeWindows.of(Time.of(6, SECONDS), Time.of(2, SECONDS))
  .trigger(EventTimeTrigger.create())

下面的例子是定义了一个全局窗口(分配器),每1000个事件会调用算子(触发器),保留最后100个元素(驱逐器)

1
2
3
4
stream
  .window(GlobalWindow.create())
  .trigger(Count.of(1000))
  .evict(Count.of(100))

注意,如果在窗口执行期间,上面的流基于key做了分区,那么上面的窗口算子是本地的,不需要worker做协调;这种机制可以实现各种窗口功能

Asynchronous Stream Iterations

流中的循环对某些应用来说是必不可少的,比如增量构建、训练机器学习模型,强化学习、近似图等
大多数情况下,反馈循环不是必须的
异步迭代覆盖了流应用通讯需求,它跟 基于有限数据集上的结构化迭代的并行优化问题不同

像图6那样,Flink的执行模型已经覆盖了 没有开启迭代控制机制下的异步迭代
反馈流被被视为 隐式迭代头算子中的算子状态,也是全局快照的一部分
DataStream API允许显式的反馈流定义,并可以简单地包含对流和进度跟踪上的结构化循环的支持

Batch Analytics on Top of Dataflows

因为批处理其实是流处理的特例,一个流程序,如果将其所有的数据都插入到一个窗口中了,那么就可以当做批处理来对待
但是还需要一些额外的工作

  • 批处理计算的API语法可以被简化(比如不需要手动设置全局窗口定义)
  • 对于批处理还需要一些额外的优化,对于容错,阶段调度也需要更有效

Flink的批处理工作方式如下:

  • 批处理在同一个时刻可以按照流计算的方式执行,可以通过阻塞数据流的方式,将一个大的计算拆分多个 可以被连续调度的独立阶段
  • 当负载较高时候会关闭周期性的快照,失败恢复通过重放最近的物化中间结果(也可能是源头),来恢复丢失的流分区
  • 阻塞的算子比如sort,实现很简单就是一直阻塞,直到将所有输入数据都处理完,运行时不用关系是否阻塞,这些算子的内存都是Flink分配的(JVM堆或非堆),并且也能溢出到磁盘
  • DataSet API提供了批处理计算的抽象,包括带容错的DataSet数据结构,并可以在DataSet之上(如join、聚合、迭代等)实现各种转换
  • 查询优化层会将DataSet程序转换为一个更高效的可执行程序

Query Optimization

Flink的优化是基于并行数据库系统技术的,比如:例如计划等价、成本建模和感兴趣的属性传播
但是 DAG上的UDF使的Flink没法用传统技术来优化,因为这些算子隐藏了他们的语义,所以基础、以及成本估算方式很难被使用
Flink运行时支持各种执行策略,如:重分区、广播数据转换,以及基于排序的分区,基于sort、hash的join实现等等
Flink的优化器会基于感兴趣的属性传播概念,来枚举不同的物理计划,并时代基于代价的方式从多个物理计划中选择一个合适的计划,这些代价包括:

  • 网络
  • 磁盘I/O
  • CPU开销

为了UDF存在的克服基数评估问题,Flink使用程序员的hints

Memory Management

基于数据库技术,Flink序列化数据到内存segment中
Flink没有在JVM堆上分配动态数据记录的buffer,对于像排序和join这样的算子,尽可能的直接使用二进制数据,这样可以保持 序列化和反序列化的开销最小化,并在需要的时候将部分数据溢出到磁盘
为处理任意对象,Flink使用了类型接口,以及自定义的序列化机制
为了在二进制表示的非堆上做数据处理,Flink管理了高效的缓存和鲁棒性的算法,减少了垃圾收集的开销,并能在内存压力的情况下优雅的扩展

Batch Iterations

迭代图分析、并行梯度下降、以及优化技术,已经在 Bulk Synchronous Parallel (BSP) 和 Stale Synchronous Parallel (SSP)模式之上实现了
Flink执行模式允许在这之上实现, 任何结构化迭代逻辑类型,这些是使用迭代控制事件 实现的
比如,BSP执行时,迭代控制事件会在迭代计算中标记 开始、结束
Flink还引入了更新的优化技术:增量迭代概念,它可以利用稀疏的计算依赖实现增量迭代,已经在Flink的图计算API Gelly中实现了

Conclusion

这里总结下批处理相关的事情

  • Hadoop这个不用说了,最流行的开源系统,基于了Map-Reduce范式
  • Dryad引入了 UDF,生成了基于DAG的数据流图
  • SCOPE对其做了增强,并在其上增加了SQL优化
  • Tez可以看做Dryad的开源实现
  • MPP数据库,Drill 和 Impala限制了他们的API,使用了SQL变体
  • Spark,基于DAG的数据处理框架,提供了SQL优化,执行基于驱动的迭代,处理无边界数据是当做微批来做的
  • 相比之下,Flink是唯一的系统因为其整合了:分布式数据流,利用了流水线执行 批和流工作
  • 通过轻量级checkpoint实现精确一次,原生迭代处理,支持各种复杂的窗口语义,支持乱序处理

批处理,之前学术和工业界都有大量先驱者

  • SEEP、Naiad、Microsoft StreamInsight、IBM Streams.
  • 这些系统中的许多都是基于数据库社区的研究
  • 而上面这些系统都有一些问题:学术界的原型、不开源的商业产品、不能在普通商用机器集群上做水平扩展
  • 数据流处理领域出现了一些可以支持水平扩展、组合数据流操作、带有弱一致性保证的系统,如:Strom、Samza
  • 乱序处理的概念获得了很大吸引力,并被MillWhell 采用,这是后来提供的Apache Beam/谷歌商业执行程序的内部版本数据流
  • Millwhell提供了精确一次的、低延迟的数据流处理、以及乱序处理,对Flink的发展产生了重大影响
  • Flink是目前唯一开源支持:事件事件和乱序事件处理,提供带精确一次保证的一致性管理状态,实现高吞吐和低延迟,同时支持批和流

Flink是实现了统一数据流引擎的平台,可以执行流处理和批分析
Flink数据流引擎将算子状态、逻辑中间结果 当做一等公民,通过使用不同参数,可以被批、流API同时使用
流API是基于Flink数据流引擎之上的,提供了状态恢复、分区、转换、聚合数据流窗口等功能
由于批处理只是流的一个特例,Flink会特殊对待,使用一个查询优化器来优化他们的执行;并实现了一个阻塞的算子,这个算子在内存不足时会溢出到磁盘

References