整体概述

一个例子
创建

1
2
3
4
5
6
7
8
9
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

连接

1
2
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

处理

1
2
// Split each line into words
val words = lines.flatMap(_.split(" "))

1
2
3
4
5
6
7
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

开始执行

1
2
3
4
ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

$ nc -lk 9999

执行过程

  • 通过 DataStreamReader 创建 source
  • 执行转换操作,不断生成 DataFrame
  • 执行写入,DataStreamWriter
  • StreamingQueryManager 负责创建 StreamExecution 的实例

StreamExecution 的实现类

  • MicroBatchExecution,实现精确一次,100ms 延迟
  • ContinuousExecution,没有精确保障,1ms 延迟

启动的时候是调用 activeQueries 来触发的

执行过程是通过读取 源端的一段数据,用 micro batch 来模拟的
也会触发 spark 的逻辑计划到物理计划的过程

完整的执行过程

  • 首先获取 start-offset,这是根据上一次执行完的 offset决定的,然后写 WAL
  • 通过 commit log 保证事务性,同时确定 end offset
  • 确定source是否有数据,然后构造一段数据
  • 调用 getBatch 获取源端数据,此时不会真正触发执行
  • 将plan 转为 newBatchesPlan,创建 triggerLogicalPlan
  • 创建IncrementalExecution,开始做增量执行,类似普通spark任务也会有各种plan
  • 写入 sink,记录 commit log

Source

几个类

  • SparkDataStream,是最高层的接口,表示一个输入流
  • Source,表示输入源
    • getOffset,表示最大可用的 offset
    • last committed offset 和 the latest available offset,就可以确定一次要读取的一批数据
    • 通过 getBatch 读取数据
    • commit,表示已经处理完了,源端可以继续产生数据
  • SupportsAdmissionControl,这个可以用来控制读取速率,做限流的
    • FileStreamSource 和 KafkaSource 都支持限流
    • 用文件写入时间作为 event time
    • 支持类型:csv, test, json, Parquet, ORC

读取流程

  • 通过 fetchMaxOffset 获取读取文件的 offset,此时会检查上一次是否有未读取的文件
  • getBatch 通过元数据中包括的 start-offset 和 end-offset 来获取一批数据
  • 如果有则读取上次的,没有就新抓取一批
  • 如果新读取的文件内容 > maxFilesPerTrigger,则拆分成 2 部分
  • 如果新拆分的第 2 部分 太小,则跟下一批一起处理
  • 将 offser 记录到元数据中

提交

  • MicroBatchExecution 执行提交后,会通知source 端更新 offset
  • FileStreamSource 定义了内部抽象类 FileStreamSourceCleaner 来完成清理动作
  • 两个具体实现类:SourceFileArchiver(归档)、SourceFileRemover(清理)
  • 都是在独立的线程中完成的

Sink

sink 的主要位置

sink 介绍

  • Sink 是一个 trait,每个子类都需要实现 addBatch
  • addBatch 包括两个参数:batchId、DataFrame
  • 通过 id 可以让外部系统来决定是覆盖还是丢弃,实现精确一次操作

实现类

  • FileStreamSink,输出到 hadoop 兼容的文件系统,支持各种文件格式
  • KafkaSink,输出到 Kafka
  • DeltaSink,输出到 delta-lake 表格式
  • ForeachBatchSink,ForeachWriteTable,这两个最灵活允许用户注入自定义的输出目标端

ForeachBatchSink
可以传入自定义的输出逻辑,这里是: batchWriter: (Dataset[T], Long)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
class ForeachBatchSink[T](batchWriter: (Dataset[T], Long) => Unit, encoder: ExpressionEncoder[T])
  extends Sink {

  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    val node = LogicalRDD.fromDataset(rdd = data.queryExecution.toRdd, originDataset = data,
      isStreaming = false)
    implicit val enc = encoder
    val ds = Dataset.ofRows(data.sparkSession, node).as[T]
    batchWriter(ds, batchId)
  }

在 DataStreamWriter#startInternal() 内,创建 ForeachBatchSink 实例

1
2
3
4
5
6
7
    } else if (source == SOURCE_NAME_FOREACH_BATCH) {
      assertNotPartitioned(SOURCE_NAME_FOREACH_BATCH)
      if (trigger.isInstanceOf[ContinuousTrigger]) {
        throw QueryCompilationErrors.sourceNotSupportedWithContinuousTriggerError(source)
      }
      val sink = new ForeachBatchSink[T](foreachBatchWriter, ds.exprEnc)
      startQuery(sink, extraOptions, catalogTable = catalogTable)

ForeachWriteTable & ForeachWriter

  • ForeachBatchSink 是控制写一批数据
  • 而这两个是可以控制 到 row 这个级别

三个需要实现的函数

  • open – open connection for writing the current partition of data in the executor.
  • process – when the ‘open’ method returns ‘true’, write a row to the connection.
  • close – close the connection.

FileSystemSink

  • 每个 micro batch 由 FileFormatWriter 负责写到每个独立的目录
  • FileStreamSinkLog 负责记录写入成功的 列表信息
  • ManifestFileCommitProtocal 记录了提交的信息
  • 当写入成功后,将 pendingCommitFiles 增加到 FileStreamSinkLog

DeltaSink

  • 这是是属于 delta-lake 项目里的,借助 delta-lake 的 transaction log 实现原子操作
  • 通过 batchID 跟 txnVersion 比较,来判断是否忽略这个 batch
  • DeltaSink 支持 Append、Complete 两种模式
  • complete 通过删除所有文件来实现的,也就是 truncate + append 操作
  • SetTransaction 将当前的 batchID 设置为事务 version,防止重复提交

IncrementalExecution

IncrementalExecution

  • 继承自 QueryExecution,重用了spark的整个逻辑,包括分析,优化,执行逻辑计划等等
  • 他内部维护了两个批之间的状态,用来实现增量的查询

重用 QueryExecution

  • 这里重用了spark 的通用逻辑
  • 增量处理的时候,根据边界条件,和数据是否新达到了,每次都会生成一个新的 logical plan
  • 这里会有元信息保存提交的offset 等位点信息

物理计划,当前有这么一些

  • SpecialLimits
  • JoinSelection
  • StatefulAggregationStrategy
  • StreamingGlobalLimitStrategy
  • StreamingDeduplicationStrategy
  • StreamingJoinStrategy
  • WindowGroupLimit
  • InMemoryScans
  • StreamingRelationStrategy
  • FlatMapGroupsWithStateStrategy
  • FlatMapGroupsInPandasWithStateStrategy

状态存储

  • mapPartitionsWithStateStore 创建 StateStoreRDD,执行物理写操作
  • mapPartitionsWithReadStateStore 创建 ReadStateStoreRDD,执行读操作




Execution Preparation

  • QueryExecution 会增插入一些必要的算子,如 sort,exchange 等
  • IncrementalExecution 也会做一些准备操作
  • 会记录 statefulOperatorId、nextStatefulOperationStateInfo

Stateful

有状态的记录

  • 通过 StateStoreRDD,ReadStateStoreRDD 来写入、读取有状态数据
  • 将有状态数据写入到 k-v 存储中
  • 匿名函数会传递给 mapPartitionsWithStateStore、mapPartitionsWithReadStateStore
  • 然后 mapPartitionsWithStateStore 创建 StateStoreRDD 更新状态,为每个 executor的每个分区创建 state store,调用匿名函数
  • mapPartitionsWithReadStateStore 创建 ReadStateStoreRDD 做类似的工作,但是 state store 是只读的

StateStore

  • StateStoreId 和 分区之间 1:1 做映射
  • StateStoreId 是由:checkpointRootLocation, operatorId, partitionId, storeName 组成的
  • state store 维护了一个 k-v对当做版本,每个版本表示一个操作状态点
  • StateStore类封装了访问状态存储的方法
  • StateStore的每个实例代表一个特定版本的统计数据,他们的实例由 StateStoreProvider 提供
  • StateStoreProvider 会返回 getStore(version: Long),表示给定的版本

StateStoreProvider 的实现类

  • HDFSBackedStateStoreProvider,使用 ConcurrentHashMap 作为第一阶段存储,第二阶段将数据保存到hdfs兼容格式中
  • RocksDBStateStoreProvider,使用 RocksDB 来优化写入,避免 JVM内存开销

driver 和 executor

  • StateStoreCoordinator 用 map 来记录每个 executor的位置
  • StateStoreRDD 调用 driver,获取合适的 分区 locality 位置
  • 每个 executor运行一个 StateStoreCoordinatorRef,用来跟 driver 的通讯,并告知 driver自己的活跃状态

Limit

测试代码

1
2
3
4
    val df = spark.readStream.format("rate").option("rowsPerSecond", 3).load()
    println(df)
    val df2 = df.limit(3)
    df2.explain("extended")

打印的结果

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
== Analyzed Logical Plan ==
timestamp: timestamp, value: bigint
GlobalLimit 3
+- LocalLimit 3
   +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@1d3a7a1, rate, 
   org.apache.spark.sql.execution.streaming.sources.RateStreamTable@411474, [rowsPerSecond=3], [timestamp#24, value#25L]

== Optimized Logical Plan ==
GlobalLimit 3
+- LocalLimit 3
   +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@1d3a7a1, rate, 
   org.apache.spark.sql.execution.streaming.sources.RateStreamTable@411474, [rowsPerSecond=3], [timestamp#24, value#25L]

== Physical Plan ==
StreamingGlobalLimit 3, state info [ checkpoint = <unknown>, runId = 9f14f598-cc53-495a-b702-545baef62eb2, opId = 0, ver = 0, 
numPartitions = 200], Append
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=38]
   +- LocalLimit 3
      +- StreamingRelation rate, [timestamp#24, value#25L]

解释

  • 查询首先从 Rate 的source 流中生成 3个随机数每秒
  • 首先执行 LocalLimit 操作然后是 GlobalLimit操作
  • StreamingLocalLimitExec 从每个分区中取 limit条数据
  • 假设有 10 个分区,每个分区取 5条,一共取 50条
  • 这里会增加一个 Exchange操作,收集所有分区的数据,汇总成单个分区
  • 之后读取所有 row并累计,如果超过了 limit 限制则后面的 row就不再返回了
  • 当前的 micro-batch 完成后,将更新的 total row 写入到 state-store 中

Duplicate

代码

1
2
3
4
5
6
    val df = spark.readStream.format("rate").option("rowsPerSecond", 3).load()
    println(df)

    val df2 = df.withColumn("key", rand() * 10)
    val df3 = df2.dropDuplicates(("key"))
    df3.explain("extended")

打印结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
== Analyzed Logical Plan ==
timestamp: timestamp, value: bigint, key: double
Deduplicate [key#28]
+- Project [timestamp#24, value#25L, (rand(7252746682121171822) * cast(10 as double)) AS key#28]
   +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@1960f17, rate, 
   org.apache.spark.sql.execution.streaming.sources.RateStreamTable@11b3620, [rowsPerSecond=3], [timestamp#24, value#25L]

== Optimized Logical Plan ==
Deduplicate [key#28]
+- Project [timestamp#24, value#25L, (rand(2403598613654465872) * 10.0) AS key#28]
   +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@1960f17, rate, 
   org.apache.spark.sql.execution.streaming.sources.RateStreamTable@11b3620, [rowsPerSecond=3], [timestamp#24, value#25L]

== Physical Plan ==
StreamingDeduplicate [key#28], state info [ checkpoint = <unknown>, runId = 41e7f9e3-cdd8-4528-afd7-2550a1411175, 
opId = 0, ver = 0, numPartitions = 200], -9223372036854775808, -9223372036854775808
+- Exchange hashpartitioning(key#28, 200), ENSURE_REQUIREMENTS, [plan_id=39]
   +- Project [timestamp#24, value#25L, (rand(2403598613654465872) * 10.0) AS key#28]
      +- StreamingRelation rate, [timestamp#24, value#25L]

解释

  • 在这个流查询中,“key”值与现有行相同的行将被删除
  • 这里做了 reshuffled 操作,相同的key 都会在一个分区内被操作
  • 如果某个 key 在 state-store中说明之前处理过了,忽略即可

Aggregate

代码

1
2
3
4
5
    val df = spark.readStream.format("rate").option("rowsPerSecond", 3).load()
    println(df)
    val df2 = df.withColumn("key", rand() * 10)
    val df3 = df2.groupBy("key").count()
    df3.explain("extended")

打印的结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
== Analyzed Logical Plan ==
key: double, count: bigint
Aggregate [key#28], [key#28, count(1) AS count#36L]
+- Project [timestamp#24, value#25L, (rand(-4643212257580472272) * cast(10 as double)) AS key#28]
   +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@15ef1c3, rate,
   org.apache.spark.sql.execution.streaming.sources.RateStreamTable@1eab2e8, [rowsPerSecond=3], [timestamp#24, value#25L]

== Optimized Logical Plan ==
Aggregate [key#28], [key#28, count(1) AS count#36L]
+- Project [(rand(-5957448139151497533) * 10.0) AS key#28]
   +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@15ef1c3, rate, 
   org.apache.spark.sql.execution.streaming.sources.RateStreamTable@1eab2e8, [rowsPerSecond=3], [timestamp#24, value#25L]

== Physical Plan ==
HashAggregate(keys=[key#28], functions=[count(1)], output=[key#28, count#36L])
+- StateStoreSave [key#28], state info [ checkpoint = <unknown>, 
runId = 19d70483-2c1c-40e2-8e8a-6f18ce6d57e1, opId = 0, ver = 0, numPartitions = 200], Append, -9223372036854775808, -9223372036854775808, 2
   +- HashAggregate(keys=[key#28], functions=[merge_count(1)], output=[key#28, count#40L])
      +- StateStoreRestore [key#28], state info [ checkpoint = <unknown>, runId = 19d70483-2c1c-40e2-8e8a-6f18ce6d57e1, 
	  opId = 0, ver = 0, numPartitions = 200], 2
         +- HashAggregate(keys=[key#28], functions=[merge_count(1)], output=[key#28, count#40L])
            +- Exchange hashpartitioning(key#28, 200), ENSURE_REQUIREMENTS, [plan_id=66]
               +- HashAggregate(keys=[knownfloatingpointnormalized(normalizenanandzero(key#28)) AS 
			   key#28], functions=[partial_count(1)], output=[key#28, count#40L])
                  +- Project [(rand(-5957448139151497533) * 10.0) AS key#28]
                     +- StreamingRelation rate, [timestamp#24, value#25L]

解释

  • 从物理计划看,流的执行过程跟 batch 的逻辑很类似,只是多了 state-store
  • 主要的不同是多了 StateStoreRestore、StateStoreSave 这些额外的操作
  • StateStoreRestoreExec 从前面的 batch 中执行聚合操作
  • 更新的聚合值会通过 StateStoreSave 写入到 state-store中

Stream-Stream Join

跟普通的 batch 方式 join不同,stream join 时,一端的 input data 可能已经到达了
但是另一端的数据可能还未达到,需要用 buffer 缓存这些临时数据

测试代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
    val df = spark.readStream.format("rate").option("rowsPerSecond", 3).load()
    val left = df
      .withWatermark("timestamp", "1 hours")
      .withColumn("left_key", rand() * 10)
      .withColumn("left_value", rand() * 10)

    val df2 = spark.readStream.format("rate").option("rowsPerSecond", 3).load()
    val right = df2
      .withWatermark("timestamp", "2 hours")
      .withColumn("right_key", rand() * 10)
      .withColumn("right_value", rand() * 10)

    val res = left.join(right, expr(
      """
        | left_key = right_key AND
        | left_value > 5 AND
        | right_value <= 5
        |""".stripMargin),
        "inner")
    res.explain("extended")

打印结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
== Analyzed Logical Plan ==
timestamp: timestamp, value: bigint, left_key: double, left_value: double, 
timestamp: timestamp, value: bigint, right_key: double, right_value: double
Join Inner, (((left_key#4 = right_key#17) AND (left_value#8 > cast(5 as double))) AND (right_value#21 <= cast(5 as double)))
:- Project [timestamp#0-T3600000ms, value#1L, left_key#4, (rand(-6247155663028336164) * cast(10 as double)) AS left_value#8]
:  +- Project [timestamp#0-T3600000ms, value#1L, (rand(-2059797804374421584) * cast(10 as double)) AS left_key#4]
:     +- EventTimeWatermark timestamp#0: timestamp, 1 hours
:        +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2dfa02c1, rate,
 org.apache.spark.sql.execution.streaming.sources.RateStreamTable@6314df3c, [rowsPerSecond=3], [timestamp#0, value#1L]
+- Project [timestamp#13-T7200000ms, value#14L, right_key#17, (rand(1974764346186854212) * cast(10 as double)) AS right_value#21]
   +- Project [timestamp#13-T7200000ms, value#14L, (rand(-3791612200140673158) * cast(10 as double)) AS right_key#17]
      +- EventTimeWatermark timestamp#13: timestamp, 2 hours
         +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@37d28938, rate,
		 org.apache.spark.sql.execution.streaming.sources.RateStreamTable@7d0cd23c, [rowsPerSecond=3], [timestamp#13, value#14L]

== Optimized Logical Plan ==
Join Inner, (knownfloatingpointnormalized(normalizenanandzero(left_key#4)) = knownfloatingpointnormalized(normalizenanandzero(right_key#17)))
:- Filter (left_value#8 > 5.0)
:  +- Project [timestamp#0-T3600000ms, value#1L, left_key#4, (rand(-99232845556501030) * 10.0) AS left_value#8]
:     +- Project [timestamp#0-T3600000ms, value#1L, (rand(-332688255858197378) * 10.0) AS left_key#4]
:        +- EventTimeWatermark timestamp#0: timestamp, 1 hours
:           +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2dfa02c1, rate,
 org.apache.spark.sql.execution.streaming.sources.RateStreamTable@6314df3c, [rowsPerSecond=3], [timestamp#0, value#1L]
+- Filter (right_value#21 <= 5.0)
   +- Project [timestamp#13-T7200000ms, value#14L, right_key#17, (rand(843538550904813538) * 10.0) AS right_value#21]
      +- Project [timestamp#13-T7200000ms, value#14L, (rand(5313924642516271519) * 10.0) AS right_key#17]
         +- EventTimeWatermark timestamp#13: timestamp, 2 hours
            +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@37d28938, rate,
			org.apache.spark.sql.execution.streaming.sources.RateStreamTable@7d0cd23c, [rowsPerSecond=3], [timestamp#13, value#14L]

== Physical Plan ==
StreamingSymmetricHashJoin [knownfloatingpointnormalized(normalizenanandzero(left_key#4))],
 [knownfloatingpointnormalized(normalizenanandzero(right_key#17))], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null
 ], state info [ checkpoint = <unknown>, runId = 60ca7e11-68bf-4ef2-999c-70807f898494, opId = 0, ver = 0, numPartitions = 1], -9223372036854775808,
 -9223372036854775808, state cleanup [ left = null, right = null ], 2
:- Exchange hashpartitioning(knownfloatingpointnormalized(normalizenanandzero(left_key#4)), 1), ENSURE_REQUIREMENTS, [plan_id=53]
:  +- *(1) Filter (left_value#8 > 5.0)
:     +- *(1) Project [timestamp#0-T3600000ms, value#1L, left_key#4, (rand(-99232845556501030) * 10.0) AS left_value#8]
:        +- *(1) Project [timestamp#0-T3600000ms, value#1L, (rand(-332688255858197378) * 10.0) AS left_key#4]
:           +- EventTimeWatermark timestamp#0: timestamp, 1 hours
:              +- StreamingRelation rate, [timestamp#0, value#1L]
+- Exchange hashpartitioning(knownfloatingpointnormalized(normalizenanandzero(right_key#17)), 1), ENSURE_REQUIREMENTS, [plan_id=59]
   +- *(2) Filter (right_value#21 <= 5.0)
      +- *(2) Project [timestamp#13-T7200000ms, value#14L, right_key#17, (rand(843538550904813538) * 10.0) AS right_value#21]
         +- *(2) Project [timestamp#13-T7200000ms, value#14L, (rand(5313924642516271519) * 10.0) AS right_key#17]
            +- EventTimeWatermark timestamp#13: timestamp, 2 hours
               +- StreamingRelation rate, [timestamp#13, value#14L]

解释

  • 生成的物理计划为:StreamingSymmetricHashJoinExec
  • EventTimeWatermark 增加到了两端,作为 水印的 filter
  • 通过 HashClusteredDistribution,将两边的数据做 exchange,保证每个分区读到相同的key
  • StreamingSymmetricHashJoinExec 执行前使用 StateStoreAwareZipPartitionsRDD 将两边的分区数据准备好

相比于普通的 join,stream join 有两点不同

  • Use Symmetric Hash Join algorithm
  • Join with buffered rows in state store

说明

  • 普通的 hash join,是一端build hash,然后遍历另一端取出 row,跟hash-table里面的比较看是否匹配
  • 由于两端的数据可能不是同一时间到达,不适合用普通 hash join 方式
  • spark streaming 采用了 symmetric hash join algorithm
  • 对于左右端,他们都会查找对方的 state-store 是否有匹配的key,也就是说,两端的 join 都有 build 和 probe行为
  • 左右两端都用 OneSideHashJoiner,封装了consume input data 逻辑,使用对方的流状态作为输出
  • 每个 OneSideHashJoiner 都维护了 SymmetricHashJoinStateManager,用来管理 state-store 状态

storeAndJoinWithOtherSide

  • 这个是 StreamingSymmetricHashJoinExec 的核心逻辑,用来真实的执行 join操作
  • left 端执行左边的 input数据;right 端执行 右边的input 数据
  • 会通过水印标记,删除老的数据
  • 提取输入行,使用 getJoinedRows 从对端 state-store中查询出匹配的 row
  • 之后合并两端的数据输出,低于水印的 state row会被清除
  • 除了 inner,也支持 left out,right out 等

StreamingSymmetricHashJoinExec 中的注释

  • 这里会添加一些水印操作
  • 如果 left 端水印超过指定的时间,就可以从 state-store 中删除
1
2
3
4
5
6
7
 *                             /-----------------------\
 *   left side input --------->|    left side state    |------\
 *                             \-----------------------/      |
 *                                                            |--------> joined output
 *                             /-----------------------\      |
 *   right side input -------->|    right side state   |------/
 *                             \-----------------------/

Session Window

Tumbling window、 Sliding window 这两种窗口的长度,session 起始时间都是归哪个的
而 session 窗口长度是不固定的,开始、结束时间也不固定,依赖于 input data
他将非空闲期间内的一些 session 分组到不同的窗口中
会话间隙内发生的任何事件都将合并到现有会话中

例子代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
    val df = spark.readStream.format("rate").option("rowsPerSecond", 3).load()
    println(df)
    val df2 = df.withColumn("key", (rand() * 10).cast("int"))

    val df3 = df2.groupBy(
        col("key"),
        session_window(col("timestamp"), "10 minutes")
    ).count() // Perform an aggregation (count) within the session window

    df3.explain("extended")

    val query = df3.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .option("truncate", "false")
      .start()
    query.awaitTermination()

打印结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
== Analyzed Logical Plan ==
key: int, session_window: struct<start:timestamp,end:timestamp>, count: bigint
Aggregate [key#28, session_window#42], [key#28, session_window#42 AS session_window#32, count(1) AS count#37L]
+- Filter isnotnull(timestamp#24)
   +- Project [named_struct(start, precisetimestampconversion(precisetimestampconversion(timestamp#24, TimestampType, LongType), LongType, TimestampType), end,
   knownnullable(precisetimestampconversion(precisetimestampconversion(cast(timestamp#24 + cast(10 minutes as interval) as timestamp), TimestampType, LongType), 
   LongType, TimestampType))) AS session_window#42, timestamp#24, value#25L, key#28]
      +- Project [timestamp#24, value#25L, cast((rand(1567828180694956017) * cast(10 as double)) as int) AS key#28]
         +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@b0fd88, rate,
		 org.apache.spark.sql.execution.streaming.sources.RateStreamTable@dc3344, [rowsPerSecond=3], [timestamp#24, value#25L]

== Optimized Logical Plan ==
Aggregate [key#28, session_window#42], [key#28, session_window#42, count(1) AS count#37L]
+- Project [named_struct(start, precisetimestampconversion(precisetimestampconversion(timestamp#24, TimestampType, LongType), LongType, TimestampType), end,
 knownnullable(precisetimestampconversion(precisetimestampconversion(timestamp#24 + 10 minutes, TimestampType, LongType), LongType, 
 TimestampType))) AS session_window#42, key#28]
   +- Filter isnotnull(timestamp#24)
      +- Project [timestamp#24, cast((rand(7593491097234282634) * 10.0) as int) AS key#28]
         +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@b0fd88, rate,
		 org.apache.spark.sql.execution.streaming.sources.RateStreamTable@dc3344, [rowsPerSecond=3], [timestamp#24, value#25L]

== Physical Plan ==
HashAggregate(keys=[key#28, session_window#42], functions=[count(1)], output=[key#28, session_window#42, count#37L])
+- SessionWindowStateStoreSave [key#28], session_window#42: struct<start: timestamp, end: timestamp>, state info [ checkpoint = <unknown>, runId =
 150974bd-1b56-4fe3-b48c-9dfe10b00f98, opId = 0, ver = 0, numPartitions = 200], Append, -9223372036854775808, -9223372036854775808, 1
   +- MergingSessions ArrayBuffer(key#28), true, 200, [key#28, session_window#42], session_window#42: struct<start: timestamp, end: timestamp>, [merge_count(1)], 
   [count(1)#36L], 2, [key#28, session_window#42, count#44L]
      +- SessionWindowStateStoreRestore [key#28], session_window#42: struct<start: timestamp, end: timestamp>, state info [ checkpoint = <unknown>, runId =
	  150974bd-1b56-4fe3-b48c-9dfe10b00f98, opId = 0, ver = 0, numPartitions = 200], -9223372036854775808, -9223372036854775808, 1
         +- Sort [key#28 ASC NULLS FIRST, session_window#42 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(key#28, 200), ENSURE_REQUIREMENTS, [plan_id=72]
               +- HashAggregate(keys=[key#28, session_window#42], functions=[partial_count(1)], output=[key#28, session_window#42, count#44L])
                  +- Project [named_struct(start, precisetimestampconversion(precisetimestampconversion(timestamp#24, TimestampType, LongType), 
				  LongType, TimestampType), end,
				  knownnullable(precisetimestampconversion(precisetimestampconversion(timestamp#24 + 10 minutes, TimestampType, LongType), LongType, TimestampType))) AS
				  session_window#42, key#28]
                     +- Filter isnotnull(timestamp#24)
                        +- Project [timestamp#24, cast((rand(7593491097234282634) * 10.0) as int) AS key#28]
                           +- StreamingRelation rate, [timestamp#24, value#25L]

session_window

  • 第一个参数时 event timestamp column for windowing by time
  • specifies the session gap duration which can be also viewed as timeout of the session
  • spark 内部会定义 SessionWindow,
  • analyzer 定义的 session windows是:
  • [“start” = timestamp, “end” = timestamp + session gap].

物理计划

  • 由 StatefulAggregationStrategy 负责调用 planStreamingAggregationForSession
  • 生成对应的物理计划

执行过程

  • Partial Aggregation 根据 key and session_window,对分区数据做分组
  • Merging Sessions (optional),在 shuffle 之前先做一次merge,减少 shuffle 的数据量
  • merge之前会插入一个 Sort算子
  • Shuffle,相同 hash 的key会被分组到同一个分区,此时会去掉 session_window
  • SessionWindowStateStoreRestore,从state-store中毒 session windows,将他们合并到当前的 micro-batch中,跟 mergingSessionExec 类似
  • 另一个 MergingSessionExec 操作,从 input stream 和 state store 中合并,再计算聚合值
  • SessionWindowStateStoreSave,将 session windwos 保存,为下一个 micro-batch 做准备
  • Final Aggregation,最后输出结果

Merging Sessions

  • 实现会话窗口的核心需求是决定哪些事件可以放置到会话窗口中,以及会话窗口何时结束。
  • 开始时间是 input row达到了
  • 结束时间是 [the event timestamp + the session gap duration]
  • 也就是,新来的数据在这个窗口中,窗口就 expand,否则窗口就 close

MergingSessionExec 物理算子的工作过程

  • requiredChildOrdering,增加一个 Sort 算子
  • 之后就可以按顺序一个一个处理了
  • 当处理一行的时候,检查当前的key(session_windows) 是否在当前窗口中
  • 如果不是,就另开启一个session 窗口
  • 再检查 event timestamp 是否 > 当前 session end,如果是加入到当前窗口中,否则新启动一个窗口
1
2
3
  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
    Seq((keyWithoutSessionExpressions ++ Seq(sessionExpression)).map(SortOrder(_, Ascending)))
  }

SessionWindowStateStoreRestoreExec

  • 用于从状态存储中恢复会话窗口
  • 根据 aggregate key 和 session windows,从当前 micro-batch 的排序中合并输入 row
  • 真实的合并、排序操作定义在:MergingSortWithSessionWindowStateIterator

SessionWindowStateStoreSaveExec

  • 一个物理算子
  • 更新 state-store 中的 的状态
  • 下图展示了 session windows 是如何存储在 state-store 中的
  • 按照聚合key,用数组来存储 session-windows

参考