集群基本概念

集群创建流程图
flink_2/30.jpg

ClusterDescriptor

  • 先由 ClusterClientFactory 创建 ClusterDescriptor
  • 然后通过 ClusterDescriptor 创建 ClusterClientProvier
  • 最终基于 ClusterClientProvier 实例获取 ClusterClient

ClusterClientFactory 主要分为两种类型

  • 支持容器化部署的 AbstractContainerizedClusterClientFactory
  • 不支持容器化部署的 StandaloneClientFactory

ClusterDescriptor 相关类图

  • ClusterDescriptor接口主要用于创建和停止指定的Flink集群
  • 实现ClusterDescriptor接口的deploySessionCluster()和deployJobCluster()方法
  • 可以远程部署SessionCluster或JobCluster到指定的集群资源管理器上
  • KubernetesClusterDescriptor中借助fabric8IO客户端,实现与Kubernetes集群的网络交互 flink_2/31.jpg

通过SPI加载不同类型的ClusterClientFactory

  • 在DefaultClusterClientServiceLoader中通过Java SPI机制将不同类型的ClusterClientFactory实现子类加载到客户端进程中
  • 扩展机制,需要将对应模块的JAR包复制到${FLINK_HOME}/lib路径

PipelineExecutor 相关类图

  • 通过PipelineExecutor执行并生成JobGraph数据结构
  • 然后通过ClusterClient将JobGraph对象提交到集群运行时
  • 支持 提交单个Job的AbstractJobClusterExecutor
  • 支持提交多个Job的AbstractSessionClusterExecutor flink_2/32.jpg

ClusterEntrypoint 相关类图

  • 有基于 Job 的,有基于 Session 的
  • yarn 模式同时支持 job、 session
  • 而 k8s 只支持 session flink_2/33.jpg

FlinkYarnSessionCli

  • 使用bin/yarn-session.sh脚本启动Session集群
  • 在脚本执行过程中会调用FlinkYarnSessionCli的main()方法启动集群客户端
  • 创建 YarnClusterClientFactory,
  • 根据是否指定了 applicationId,获取已有集群,或者创建新集群
  • 如果启动模式为attached,则直接打印集群信息,否则启动ScheduledExecutorService,周期性地调用YarnApplicationStatusMonitor获取Yarn集群上对应Application的状态信息

YARN 架构
flink_2/34.jpg

Flink On Yarn架构

  • YarnClusterDescriptor 用来创建集群,并返回给客户端YarnClusterClient
  • YarnClusterClient将JobGraph对象提交到运行时中运行
  • FlinkYarnSessionCli中解析提交作业的参数信息,将解析后的JAR包上传至HDFS flink_2/35.jpg

YarnClusterDescriptor

  • 在这里启动 pre job、或者 sesion 集群
  • 内部是调用 Apache Yarn client 去创建 yarn 资源的

创建 ApplicationMaster

1
2
3
4
5
6
7
8
    private ApplicationReport startAppMaster(
            Configuration configuration,
            String applicationName,
            String yarnClusterEntrypoint,
            JobGraph jobGraph,
            YarnClient yarnClient,
            YarnClientApplication yarnApplication,
            ClusterSpecification clusterSpecification)

主要逻辑

  • 构建 hadoop FileSystem,设置准入控制
  • 下载资源文件,如 jar 包等,使用:YarnApplicationFileUploader
  • 以及 jobGraph 依赖的用户 jar等,然后缓存到 NodeManager 本地
  • 设置 classpath 等
  • 配置 job graph 相关参数,输出目录等
  • 将传递的配置文件信息,write 到本地,保持为:flink-conf.yaml
  • 设置 kerberos 信息,设置 UserGroupInformation
  • 设置 ApplicationSubmissionContext 相关信息,类型,Resouce,优先级,Queue 等
  • 提交:yarnClient.submitApplication(appContext)
  • 提交成功后就可以监控这个 am
  • 返回 ApplicationReport :yarnClient.getApplicationReport(appId)
  • 在 while(true) 不断监控

YarnResourceManager 相关类图
flink_2/36.jpg

当创建 Flink 的 ResourceManager,会创建不同的子类

  • ActiveResourceManager
  • StandaloneResourceManager

而 yarn 的 YarnResourceManager 会创建

  • resourceManagerClient
  • nodeManagerClient

资源管理启动过程

  • RM客户端创建与启动
  • NM客户端创建与启动
  • 动态启动TaskManager节点

创建 TaskExecutor

  • 为每个TaskManager创建ResourceID信息,通过ResourceID标识TaskManager资源实例
  • 创建启动TaskExecutor的Context上下文信息
  • 启动TaskExecutor对应的Container资源容器
  • 如果启动有异常则释放资源

TaskManager 和 ​TaskExecutor

  • tm 是基于 jvm 的一个工作节点,包含多个 slots,以及跟 resoure manager 通讯
  • TaskExecutor 是内部执行组件,负责真正的执行,以及 状态管理、checkpoint、slots 等

ON k8s

Kubernetes架构图
flink_2/37.jpg

Flink On Kubernetes架构图

  • 将Flink客户端连接到Kubernetes API Server,包括 config-map,job-manager service,deployment 等
  • kubelet 会从镜像仓库下载镜像,然后挂载,并执行 job-manager命令
  • 当JobManager的Pod启动后,Dispacher和KubernetesResourceManager等组件随之全部启动
  • KubernetesResourceManager 会申请和启动 task-manager 所需要的容器资源
  • k8s 为 task-manager 分配第一个新的 pod,
  • TaskManager启动后会主动注册到ResourceManager的SlotManager中
  • SlotManager向TaskManager请求Slot计算资源,并分配给JobMaster服务
  • TaskManager提供Slot计算资源给JobMaster,JobMaster将任务分配到对应的Slot计算资源上运行 flink_2/38.jpg

Session集群的部署与启动

  1. 通过 KubernetesSessionCli客户端部署 KubernetesSession 集群
  • 创建 FlinkKubeClient,跟 k8s 交互,创建对应的 pod
  • 根据传入的 clusterId,判断是获取已有集群,或者是新建集群
  1. 基于 KubernetesClusterDescriptor 部署集群
  • 创建的入口点是:KubernetesSessionClusterEntrypoint
  • 检查高可用配置
  • 调用 FlinkKubeClient#createJobManagerComponent()
  • 创建:internalClient.resourceList(accompanyingResources).createOrReplace()
  • 最终是调用 fabric8 API 创建 pod
  1. 创建资源
  • 通过FlinkKubeClient创建FlinkMasterDeployment
  • 通过FlinkKubeClient创建Kubernetes Service

创建的资源包括:

  • ConfigMap
  • Service
  • Deployment
  • Pod

KubernetesResourceManager

  • 如从Kubernetes集群申请和启动TaskManager Pod计算资源
  • 内部主要使用FlinkKubeClient作为与Kubernetes交互的客户端

PodCallbackHandler接口

  • onAdded()、onModifie()、onDeleted()、onError()等回调方法
  • 用于 Kubernetes 资源发生变化后,KubernetesResourceManager 能够感知并进行后续的处理

KubernetesResourceManager 类图
flink_2/39.jpg

动态启动TaskManager节点

  • 当用户向Session集群提交任务后,会启动 tm
  • 从 镜像仓库拉取镜像,启动 tm
  • 调用的是: FlinkKubeClient#createTaskManagerPod
  • 通过 KubernetesTaskExecutorRunner,启动 TaskExecutor

Fabric8FlinkKubeClient

  • 用这个实现类,来创建 k8s 相关的各种资源
1
2
3
4
5
6
7
8
CompletableFuture<Void> createConfigMap(KubernetesConfigMap configMap)
KubernetesConfigMapSharedWatcher createConfigMapSharedWatcher(Map<String, String> labels)
createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec)
KubernetesLeaderElector createLeaderElector(KubernetesLeaderElectionConfiguration 
	leaderElectionConfiguration, LeaderCallbackHandler leaderCallbackHandler)
CompletableFuture<Void> createTaskManagerPod(KubernetesPod kubernetesPod) {
	this.internalClient.pods().create(kubernetesPod.getInternalResource());
}

KubernetesSessionClusterEntrypoint.java

1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
	EnvironmentInformation.logEnvironmentInfo(LOG, KubernetesSessionClusterEntrypoint.class.getSimpleName(), args);
	SignalHandler.register(LOG);
	JvmShutdownSafeguard.installAsShutdownHook(LOG);
	Configuration dynamicParameters = (Configuration)ClusterEntrypointUtils.parseParametersOrExit(args, 
		new DynamicParametersConfigurationParserFactory(), KubernetesSessionClusterEntrypoint.class);
	ClusterEntrypoint entrypoint = new KubernetesSessionClusterEntrypoint(KubernetesEntrypointUtils.loadConfiguration(dynamicParameters));
	ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}

KubernetesTaskExecutorRunner.java

1
2
3
4
5
6
    public static void main(String[] args) {
        EnvironmentInformation.logEnvironmentInfo(LOG, "Kubernetes TaskExecutor runner", args);
        SignalHandler.register(LOG);
        JvmShutdownSafeguard.installAsShutdownHook(LOG);
        runTaskManagerSecurely(args);
    }

角色

Apache Flink Component Classes Overview
来自于 $FLINK_HOIME/bin/flink-console.sh 脚本

Class Name Functionality Deployment Mode Key Differences
TaskManagerRunner Manages task execution, slot allocation, and data exchange between tasks. All modes Core component for task execution; runs in both standalone and Kubernetes modes.
HistoryServer Provides a web UI to view historical job metrics and completed job details. All modes Post-execution monitoring tool; not part of the runtime cluster.
FlinkZooKeeperQuorumPeer Integrates Flink with ZooKeeper for leader election and high availability. Standalone HA setups Used only in HA configurations relying on ZooKeeper (not Kubernetes HA).
StandaloneSessionClusterEntrypoint Starts a ​session cluster in standalone mode (long-lived, multi-job). Standalone Session clusters accept multiple jobs; requires explicit job submissions.
StandaloneApplicationClusterEntryPoint Starts an ​application cluster in standalone mode (single job). Standalone Dedicated to a single job; embeds the job JAR in the cluster.
KubernetesSessionClusterEntrypoint Starts a ​session cluster on Kubernetes (long-lived, multi-job). Kubernetes Similar to standalone session mode but optimized for Kubernetes orchestration.
KubernetesApplicationClusterEntrypoint Starts an ​application cluster on Kubernetes (single job). Kubernetes Embeds a single job; cluster lifecycle tied to the job execution.
KubernetesTaskExecutorRunner Runs TaskManager pods in Kubernetes clusters. Kubernetes Kubernetes-specific TaskManager implementation for dynamic resource scaling.

Key Differences Summary

  1. Session vs. Application Clusters:

    • Session clusters accept multiple jobs and persist until manually stopped.
    • Application clusters run a single job and terminate when the job finishes.
  2. Standalone vs. Kubernetes:

    • Standalone classes (e.g., StandaloneSessionClusterEntrypoint) assume manual VM/container management.
    • Kubernetes classes (e.g., KubernetesSessionClusterEntrypoint) integrate with Kubernetes APIs for autoscaling and resource management.
  3. ZooKeeper Dependency:

    • FlinkZooKeeperQuorumPeer is required only for standalone HA setups using ZooKeeper. Kubernetes HA uses native Kubernetes mechanisms (e.g., ConfigMaps).
  4. TaskManager Execution:

    • TaskManagerRunner is generic for all modes, while KubernetesTaskExecutorRunner adds Kubernetes-specific lifecycle hooks.

参考