Spark-Core 相关的模块

  • 执行环境,Env和SparkContext
  • RPC 相关
  • 存储体系
  • 计算
  • scheduler
  • deploy
  • API 相关

存储体系

整体架构

BlockManager 相关的架构图

BlockId 相关

BlockId Implementation Purpose
RDDBlockId Identifies a partition of an RDD.
ShuffleBlockId Identifies shuffle data output by a shuffle stage.
BroadcastBlockId Identifies broadcast variables for tasks.
TaskResultBlockId Identifies task result blocks stored temporarily.

一些核心组件

  • BlockManager
  • BlockInfoManager
  • BlockManagerMaster
  • MemoryManager
  • MemoryStore
  • DiskBlockManager
  • DiskStore
  • BlockManagerMasterEndpoint
  • BlockManagerSlaveEndpoint
  • BlockTransferService

内存相关类

Component Layer Scope Responsibilities
MemoryManager Low-Level Executor-specific Manages memory allocation between execution and storage pools.
MemoryStore High-Level Executor-specific Manages in-memory storage of blocks, relies on MemoryManager for memory allocation.

MemoryManager 将内存分为

  • execution memory
  • storage memory

而 MemoryStore 则作为一个更高层的组件,负责 block 的管理

  • 块的创建
  • 从内存中驱逐块,LRU 算法

磁盘存储相关

Component Layer Scope Responsibilities
DiskBlockManager Low-Level Executor-specific Handles physical file operations for block storage on disk.
DiskStore High-Level Executor-specific Provides a block-level interface for disk storage, relies on DiskBlockManager.

DiskStore 和 DiskBlockManager 的关系,跟 内存的类似,只是读写的是磁盘


主从节点 相关的几个类

  • BlockManagerMaster,这个是中央类,包含了:block_id –> 具体 block 所在的节点
  • BlockManager,每个executor 上都有一个,负责跨集群的块管理,block 的生命周期管理等
  • BlockInfoManager,每个executor 一个,存储:block id -> 大小,存储级别等元信息

RPC 相关的

  • BlockTransferService,当本地内存、磁盘都没有,则向远端 其他节点的 block-manager获取数据
  • BlockManagerMasterEndpoint,主节点 RPC,负责接受 block-manager的一些查询,以及元数据更新请求
  • BlockManagerSlaveEndpoint,从节点 RPC,向 master 发送元数据更新等信息

执行流程

另一个视角

一个完整的流程
存储一个block

  • executor 创建一个 分区
  • 存储这些block,发送给 BlockManager
  • 存储的 block 放在 RDD_0_0,如果内存不足就放磁盘
  • BlockInfoManager,记录元信息
  • 全局注册,发送给 BlockManagerMaster 记录 block_id -> 块的具体位置信息

检索块信息

  • executor 上的 task,检索:RDD_0_0
  • 调用:BlockManager.getBlockData(BlockId) 获取块信息
  • 这里会使用 BlockInfoManager 做一些读、写加锁操作,防止并发问题
  • 调用 MemoryStore 、DiskStore 从内存或者磁盘读取
  • 如果本地没有,则向 BlockManagerMaster 通讯,查询 RDD_0_0 的具体位置
  • 通过 BlockTransferService 向远端机器真正的获取数据
  • 之后将这个 block 存储在本地内存/磁盘

shuffle 操作

  • map task 写中间结果到磁盘,这里会委托 BlockManager 完成 block 的写磁盘操作
  • BlockManager 将 block 的信息发送给BlockManagerMaster,记录block 的元信息
  • reduce task 请求这些 shuffle 数据
  • executor 上的 BlockManager 从 BlockManagerMaster 上获取 shuffle 的 block 具体位置
  • 通过 BlockTransferService 从远程节点上获取数据

block 存储的内容一般为:

  • RDD partitions: Each RDD partition is stored as a block.
  • Shuffle data: Intermediate data generated during shuffle operations.
  • Broadcast variables: Data shared across tasks.
  • Task results: Intermediate task results temporarily stored for fault tolerance.

block 大小和内容

  • 一般跟分区是 1 对 1 映射的,多少分区,多少 block
  • 可以配置 shuffle 的文件大小,也就是 shuffle block 的大小
  • 内存中一般是 raw 的java对象,磁盘中是序列化后的内容
  • 广播变量等也会存储为 block,但是大小不固定

测试代码,观察本地 block 的情况

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SparkSession

object BlockStorageTest {
  def main(args: Array[String]): Unit = {
    // Step 1: Create Spark Session
    val spark = SparkSession.builder()
      .appName("Block Storage Test")
      .master("local[*]")
      .config("spark.local.dir", "D:\\tmp\\spark_data")
      .getOrCreate()

    val sc = spark.sparkContext

    // Step 2: Create an RDD
    val data = 1 to 1000 // Sample data
    val rdd = sc.parallelize(data, numSlices = 4)

    // Step 3: Persist the RDD to memory and disk
    rdd.persist(StorageLevel.DISK_ONLY)

    // Step 4: Trigger an action to store the blocks
    rdd.count() // Action to compute and store the blocks

    // Step 5: Log the disk storage location
    println("Disk storage location: " + sc.getConf.get("spark.local.dir"))

    // Keep the application alive to inspect the disk manually
    Thread.sleep(60000)

    // Stop SparkSession
    spark.stop()
  }
}

在 spark.local.dir 目录下,生成了

  • blockmgr-f733139a-820e-4ef9-9b1d-cae27c6debfa

这里目录又包含了

  • 0e
  • 11
  • 14,里面有 rdd_0_0 文件
  • 15,里面有 rdd_0_1 文件
  • 16,有 rdd_0_2
  • 17,有 rdd_0_3

本地可以观察 spark-ui

BoradcastManager 执行流程

Map 监控体系

Tracker 的几个类

  • MapOutputTracker,父类
  • MapOutputTrackerMaster
  • MapOutputTrackerWorker

这些类还引用了:ShuffleStatus

相关的类

1
private[spark] sealed trait MapOutputTrackerMessage

MapOutputTrackerMessage 的实现类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
private[spark] case class GetMapOutputStatuses(shuffleId: Int)
  extends MapOutputTrackerMessage
private[spark] case class GetMapAndMergeResultStatuses(shuffleId: Int)
  extends MapOutputTrackerMessage
private[spark] case class GetShufflePushMergerLocations(shuffleId: Int)
  extends MapOutputTrackerMessage
private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage

private[spark] sealed trait MapOutputTrackerMasterMessage
private[spark] case class GetMapOutputMessage(shuffleId: Int,
  context: RpcCallContext) extends MapOutputTrackerMasterMessage
private[spark] case class GetMapAndMergeOutputMessage(shuffleId: Int,
  context: RpcCallContext) extends MapOutputTrackerMasterMessage
private[spark] case class GetShufflePushMergersMessage(shuffleId: Int,
  context: RpcCallContext) extends MapOutputTrackerMasterMessage
private[spark] case class MapSizesByExecutorId(
  iter: Iterator[(BlockManagerId, 

调用解释

  • (1)表示某个Executor调用MapOutputTrackerWorker的getStatuses方法获取某个shuffle的map任务状态信息
  • (2)MessageLoop线程从mapOutputRequests队头取出GetMapOutputMessage
  • (3)从shuffleIdLocks数组中取出与当前GetMapOutputMessage携带的shuffleId相对应的锁
  • (4)首先从cachedSerializedStatuses缓存中获取shuffleId对应的序列化任务状态信息
  • (5)第四步没找到,从 mapStatuses中缓存的shuffleId对应的任务状态数组
  • (6)序列化后广播
  • (7)将序列化的任务状态放入cachedSerializedStatuses缓存中
  • (8)将广播对象放入cachedSerializedBroadcast缓存中
  • (9)将获得的序列化任务状态信息,通过回调GetMapOutputMessage消息携带的RpcCallContext的reply方法回复客户端

WEB-UI

整体结构

入口类: org.apache.spark.ui.SparkUI

层次体系

  • SparkUI,该类继承子WebUI,中枢类,负责启动jetty,保存页面和URL Path之间的关系等
  • SparkUITab(继承自WebUITab) ,就是首页的标签栏
  • WebUIPage,这个是具体的页面。
  • SparkUI 负责整个Spark UI构建是,同时它是一切页面的根对象。
  • 对应的层级结构为: SparkUI -> WebUITab -> WebUIPage

初始化类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
  def initialize(): Unit = {
    val jobsTab = new JobsTab(this, store)
    attachTab(jobsTab)
    val stagesTab = new StagesTab(this, store)
    attachTab(stagesTab)
    attachTab(new StorageTab(this, store))
    attachTab(new EnvironmentTab(this, store))
    attachTab(new ExecutorsTab(this))
    addStaticHandler(SparkUI.STATIC_RESOURCE_DIR)
    attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
    attachHandler(ApiRootResource.getServletHandler(this))
    if (sc.map(_.conf.get(UI_PROMETHEUS_ENABLED)).getOrElse(false)) {
      attachHandler(PrometheusResource.getServletHandler(this))
    }

    // These should be POST only, but, the YARN AM proxy won't proxy POSTs
    attachHandler(createRedirectHandler(
      "/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST")))
    attachHandler(createRedirectHandler(
      "/stages/stage/kill", "/stages/", stagesTab.handleKillRequest,
      httpMethods = Set("GET", "POST")))
  }

attachPage 的核心逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
  def attachPage(page: WebUIPage): Unit = {
    val pagePath = "/" + page.prefix
    val renderHandler = createServletHandler(pagePath,
      (request: HttpServletRequest) => page.render(request), conf, basePath)
    val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json",
      (request: HttpServletRequest) => page.renderJson(request), conf, basePath)
    attachHandler(renderHandler)
    attachHandler(renderJsonHandler)
    val handlers = pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
    handlers += renderHandler
    handlers += renderJsonHandler
  }

创建 createServletHandler 的逻辑
就是 创建一个 HttpServlet,然后加入到 jetty 的体系中

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
  def createServletHandler(
      path: String,
      servlet: HttpServlet,
      basePath: String): ServletContextHandler = {
    val prefixedPath = if (basePath == "" && path == "/") {
      path
    } else {
      (basePath + path).stripSuffix("/")
    }
    val contextHandler = new ServletContextHandler
    val holder = new ServletHolder(servlet)
    contextHandler.setContextPath(prefixedPath)
    contextHandler.addServlet(holder, "/")
    contextHandler
  }

Web table 继承关系

WEB-ui page 继承关系

而 WebUI 会创建 HttpServlet,并加入到 Jetty 的 Hanlder 体系中

自定义 UI

增加一个自定义的 页面

1
2
3
4
5
6
class MyTab(parent: SparkUI, store: AppStatusStore)
  extends SparkUITab(parent, "my_tab") {

  attachPage(new MyStoragePage(this, store))

}

MyStoragePage 内容内容拷贝自 StoragePage
注意要将这个类中的 [storage] 删除,不然编译会出错

启动类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
  def go(): Unit = {
    val spark = SparkSession.builder()
      .appName("Block Storage Test")
      .master("local[*]")
      .getOrCreate()

    val sc = spark.sparkContext
    val ui: SparkUI = sc.ui.getOrElse(throw new RuntimeException("Not support web ui."))
    val store = ui.store
    ui.attachTab(new MyTab(ui, store))
    Thread.sleep(Long.MaxValue)
  }

启动后,spark-ui 中就多了一个 page

也可以增加其他类型

1
2
3
    ui.attachHandler()
    ui.addStaticHandler()
    ui.attachPage()

执行环境

env

安全体系

  • SecurityManager
  • 用于设置 yarn,hadoop 的 secret key

SparkContext 会附带初始化

  • Metrics 体系
  • Listener
  • SparkUI
  • RPC 整套体系
  • BlockManager,storage 体系
  • Executor 体系
  • Heartbeater
  • KVStore
  • SerializerManager

度量类

  • MetricsSystem
  • 提供了 source、sink

日志体系

  • org.apache.spark.internal.Logging
  • 整合了 org.slf4j
  • 提供了 info,warn 等函数

PRC 体系

客户端服务端例子

一个服务端的类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.spark.{SparkConf, SparkContext}

object RpcServer {

  class HelloWorldEndpoint(val rpcEnv: RpcEnv) extends RpcEndpoint {
    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
      case "Hello" =>
        println("Received 'Hello' message")
        context.reply("Hello from the server!")
      case _ =>
        context.reply("Unknown message")
    }

    override def receive: PartialFunction[Any, Unit] = {
      case msg => println(s"Received: $msg")
    }

    override def onStop(): Unit = {
      println("Stopping the server endpoint")
    }
  }

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Spark RPC Hello World").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val rpcEnv: RpcEnv = RpcEnv.create("HelloWorldServer", "localhost",
      9999, conf, sc.env.securityManager)

    // Register the HelloWorld endpoint
    val helloEndpoint = rpcEnv.setupEndpoint("helloEndpoint", new HelloWorldEndpoint(rpcEnv))

    println("Server is running and waiting for messages...")
    Thread.sleep(Long.MaxValue)  // Keep the server running
    rpcEnv.stop(helloEndpoint)  // Shutdown the server
    sc.stop()  // Stop SparkContext
  }
}

客户端类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.apache.spark.{SparkConf, SparkContext}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

object RpcClient {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Spark RPC Hello World Client").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val rpcEnv: RpcEnv = RpcEnv.create("HelloWorldClient", "localhost",
      9999, conf, sc.env.securityManager)

    // Connect to the server endpoint
    val address = new RpcAddress("localhost", 9999)
    val helloEndpoint = rpcEnv.setupEndpointRef(address, "helloEndpoint")

    // Send a message to the server
    helloEndpoint.ask[String]("Hello").onComplete {
      case Success(response) =>
        println(s"Received from server: $response")
      case Failure(e) =>
        println(s"Failed to receive response from server: ${e.getMessage}")
    }

    // Let the client wait for the response
    Thread.sleep(2000)

    sc.stop()
  }
}

network 包

org.apache.spark.network 包的结构

解释

  • 这个包里面的主要是对 netty 的封装,还有一些 server 和 client 类
  • 以及 安全传输方面的,sasl 的
  • shuffle db,包括了 LevelDB、RocksDB
  • protocl 是 spark继承了 netty 体系,发送时封装的包协议
  • buffer 是 发送的 protocol 中用到的 一些buffer

初始化过程

发送过程

RpcEndpoint 和 RpcEndpointRef 之间的关系

发送过程

NettyRpcEnv 中维护了 Outbox 的 map

1
val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()

而 Outbox 内部为了 OutboxMessage 的 list

1
val messages = new java.util.LinkedList[OutboxMessage]

发送体系的结构如下图,所以一个节点是可以发送给多个接收方的

接收过程

接收的体系

解释

  • MessageLoop 中维护了 Inbox 的链表
  • 每个Inbox 中为了 InboxMessage 的链表
  • InboxMessage 是抽象类,实现类如上图
  • 如果是 RpcMessage 还需要回复响应,调用 endpoint.receiveAndReply 回复
  • 这里的 endpoint 类型非常多,比如有 HeartbeatReceiver 这种的,他的 函数大致如下:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    // Messages sent and received locally
    case ExecutorRegistered(executorId) =>
      executorLastSeen(executorId) = clock.getTimeMillis()
      context.reply(true)
    case ExecutorRemoved(executorId) =>
      executorLastSeen.remove(executorId)
      context.reply(true)
    case TaskSchedulerIsSet =>
      scheduler = sc.taskScheduler
      context.reply(true)
    case ExpireDeadHosts =>
      expireDeadHosts()
      context.reply(true)
...
...
}	  

读取的时序图

  • netty 的线程接收到请求,交给 Dispatcher
  • Dispatcher 中定义了 endpoint 名字 和 MessageLoop 的关系
  • 获取到 MessageLoop 后,交给 MessageLoop 去处理
  • 在 MessageLoop#post 中,将消息放到 Inbox 中,然后往阻塞队列中方一个消息
  • 后面阻塞队列会被唤醒,然后由 MessageLoop 线程再去处理这些具体的消息

下面是 Dispatcher 中定义的映射关系

1
2
val endpoints: ConcurrentMap[String, MessageLoop] =
    new ConcurrentHashMap[String, MessageLoop]

MessageLoop 线程处理的时序图

  • MessageLoop 中接收到消息,转给 Inbox 处理
  • Inbox 则交给 具体的 Endpoint#receiveAndReply响应
  • 具体的类型有很多,比如 HeartbeatReceiver 会接受到请求,再回复响应

RpcEndpoint 相关类图

ThreadSafeRpcEndpoint 相关类图

InboxMessage 继承关系

下图简述了一个发送关系

  • (1)表示本地发送
  • (2)源端发送
  • (3)每个Outbox的drainOutbox 通过循环,不断从messages列表中取得OutboxMessage
  • (4)和(5)表示真实发送

参考