调度过程

架构

调度过程

WEB-UI

Web-UI 需要关注的指标

  • Executor 中
    • 失败的数据,读数据,shuffle,cache
    • CPU,内存使用,RDD 数量,GC时间
  • SQL 节点关注执行执行计划
    • 整个执行计划的大体看下,关注 shuffle 部分
    • shuffle 读、写的数量,时间
    • shuffle 本地读,跨网络传输的数量,时间
    • Sort 的排序时间,spill 溢出数量
    • Aggregate 聚合的时间,spill 数量
  • Stage 分析
    • 看 DAG,看下大体情况
    • 看 Event Timeline,就是这个 stage中每个步骤大概花了多少时间
    • schedule、deserialization、shuffle读写、executor计算时间
    • 理想情况应该 executor计算时间最多
    • shuffle时间过大,考虑是否用 broadcast 优化
    • schedule时间多,优化并行度,减少调度开销
  • Task Metrics
    • 以一个更细的粒度,看 task 的执行细节
    • 最小,25%分为,50%分为,75%,Locality level
    • shuffle读写,spill,持续时间,序列化,内存,schedule,GC time
  • 其他
    • Storage 信息
    • environment 信息

shuffle优化

提高shuffle操作的并行度

  • 最简单的一种方式
  • 增加了 task 数量,每个task 出来的任务少了
  • 某个key对应100W数据,增加并行度后,这个 task 还是会处理这么多

两阶段聚合(局部聚合+全局聚合)

  • 适合group by 这种操作
  • 给每个key 增加随机前缀(executor数量),对这些打散的随机key做聚合
  • 对局部聚合的数据去掉 随机前缀,再次聚合,就是最终结果

实现

  • 第一步,给RDD中的每个key都打上一个随机前缀。
  • 第二步,对打上随机前缀的key进行局部聚合。
  • 第三步,去除RDD中每个key的随机前缀。
  • 第四步,对去除了随机前缀的RDD进行全局聚合。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 第一步,给RDD中的每个key都打上一个随机前缀。
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(
        new PairFunction<Tuple2<Long,Long>, String, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(10);
                return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
            }
        });
  
// 第二步,对打上随机前缀的key进行局部聚合。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });
  
// 第三步,去除RDD中每个key的随机前缀。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
        new PairFunction<Tuple2<String,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
                    throws Exception {
                long originalKey = Long.valueOf(tuple._1.split("_")[1]);
                return new Tuple2<Long, Long>(originalKey, tuple._2);
            }
        });
  
// 第四步,对去除了随机前缀的RDD进行全局聚合。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });

将reduce join转为map join

  • 也就是 BHJ 方式,用广播来优化
  • 只是适合 大表 join 小表的场景
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 首先将数据量比较小的RDD的数据,collect到Driver中来。
List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
// 然后使用Spark的广播功能,将小RDD的数据转换成广播变量,这样每个Executor就只有一份RDD的数据。
// 可以尽可能节省内存空间,并且减少网络传输性能开销。
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);
  
// 对另外一个RDD执行map类操作,而不再是join类操作。
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple)
                    throws Exception {
                // 在算子函数中,通过广播变量,获取到本地Executor中的rdd1数据。
                List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
                // 可以将rdd1的数据转换为一个Map,便于后面进行join操作。
                Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
                for(Tuple2<Long, Row> data : rdd1Data) {
                    rdd1DataMap.put(data._1, data._2);
                }
                // 获取当前RDD数据的key以及value。
                String key = tuple._1;
                String value = tuple._2;
                // 从rdd1数据Map中,根据key获取到可以join到的数据。
                Row rdd1Value = rdd1DataMap.get(key);
                return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
            }
        });
  
// 这里得提示一下。
// 上面的做法,仅仅适用于rdd1中的key没有重复,全部是唯一的场景。
// 如果rdd1中有多个相同的key,那么就得用flatMap类的操作,在进行join的时候不能用map,而是得遍历rdd1所有数据进行join。
// rdd2中每条数据都可能会返回多条join后的数据。

采样倾斜key并分拆join操作

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
// 首先从包含了少数几个导致数据倾斜key的rdd1中,采样10%的样本数据。
JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);
  
// 对样本数据RDD统计出每个key的出现次数,并按出现次数降序排序。
// 对降序排序后的数据,取出top 1或者top 100的数据,也就是key最多的前n个数据。
// 具体取出多少个数据量最多的key,由大家自己决定,我们这里就取1个作为示范。
JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._1, 1L);
            }     
        });
JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
        new Function2<Long, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });
JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair( 
        new PairFunction<Tuple2<Long,Long>, Long, Long>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
                    throws Exception {
                return new Tuple2<Long, Long>(tuple._2, tuple._1);
            }
        });
final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;
  
// 从rdd1中分拆出导致数据倾斜的key,形成独立的RDD。
JavaPairRDD<Long, String> skewedRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        });
// 从rdd1中分拆出不导致数据倾斜的普通key,形成独立的RDD。
JavaPairRDD<Long, String> commonRDD = rdd1.filter(
        new Function<Tuple2<Long,String>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                return !tuple._1.equals(skewedUserid);
            } 
        });
  
// rdd2,就是那个所有key的分布相对较为均匀的rdd。
// 这里将rdd2中,前面获取到的key对应的数据,过滤出来,分拆成单独的rdd,并对rdd中的数据使用flatMap算子都扩容100倍。
// 对扩容的每条数据,都打上0~100的前缀。
JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(
         new Function<Tuple2<Long,Row>, Boolean>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Boolean call(Tuple2<Long, Row> tuple) throws Exception {
                return tuple._1.equals(skewedUserid);
            }
        }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<Tuple2<String, Row>> call(
                    Tuple2<Long, Row> tuple) throws Exception {
                Random random = new Random();
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
                for(int i = 0; i < 100; i++) {
                    list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2));
                }
                return list;
            }
              
        });
 
// 将rdd1中分拆出来的导致倾斜的key的独立rdd,每条数据都打上100以内的随机前缀。
// 然后将这个rdd1中分拆出来的独立rdd,与上面rdd2中分拆出来的独立rdd,进行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(
        new PairFunction<Tuple2<Long,String>, String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                    throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
            }
        })
        .join(skewedUserid2infoRDD)
        .mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Long, Tuple2<String, Row>> call(
                            Tuple2<String, Tuple2<String, Row>> tuple)
                            throws Exception {
                            long key = Long.valueOf(tuple._1.split("_")[1]);
                            return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2);
                        }
                    });
 
// 将rdd1中分拆出来的包含普通key的独立rdd,直接与rdd2进行join。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);
 
// 将倾斜key join后的结果与普通key join后的结果,uinon起来。
// 就是最终的join结果。
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);

使用随机前缀和扩容RDD进行join
如果表中包含了很多倾斜 key,处理方式 跟 加盐 join 的类似

  • 首先将其中一个key分布相对较为均匀的RDD膨胀100倍。
  • 其次,将另一个有数据倾斜key的RDD,每条数据都打上100以内的随机前缀。
  • 将两个处理后的RDD进行join即可
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import scala.collection.mutable.ListBuffer
import scala.util.Random

// Assume rdd1 and rdd2 are already created with types as follows:
// rdd1: RDD[(Long, Row)]
// rdd2: RDD[(Long, String)]

// Step 1: Expanding skewed keys in rdd1
val expandedRDD: RDD[(String, Row)] = rdd1.flatMap { case (key, row) =>
  val list = ListBuffer[(String, Row)]()
  for (i <- 0 until 100) {
    list += ((s"${i}_$key", row))
  }
  list
}

// Step 2: Adding random prefix (salt) to keys in rdd2
val mappedRDD: RDD[(String, String)] = rdd2.map { case (key, value) =>
  val random = new Random()
  val prefix = random.nextInt(100) // Random prefix between 0 and 99
  (s"${prefix}_$key", value)
}

// Step 3: Joining the two RDDs
val joinedRDD: RDD[(String, (String, Row))] = mappedRDD.join(expandedRDD)

// Step 4: Removing the salt (prefix) from the keys to restore original keys
val finalResult: RDD[(Long, (String, Row))] = joinedRDD.map { case (saltedKey, (value, row)) =>
  // Remove the prefix to get the original key
  val originalKey = saltedKey.split("_")(1).toLong
  (originalKey, (value, row))
}

// Collect and print the final result
finalResult.collect().foreach(println)

另一个例子
代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
//根据Join Keys是否倾斜、将内外表分别拆分为两部分
import org.apache.spark.sql.functions.array_contains
 
//将Join Keys分为两组,存在倾斜的、和分布均匀的
val skewOrderIds: Array[Int] = _
val evenOrderIds: Array[Int] = _
 
val skewTx: DataFrame = transactions.filter(array_contains(lit(skewOrderIds),$"orderId"))
val evenTx: DataFrame = transactions.filter(array_contains(lit(evenOrderIds),$"orderId"))
 
val skewOrders: DataFrame = orders.filter(array_contains(lit(skewOrderIds),$"orderId"))
val evenOrders: DataFrame = orders.filter(array_contains(lit(evenOrderIds),$"orderId"))


//将分布均匀的数据分别注册为临时表
evenTx.createOrReplaceTempView(“evenTx”)
evenOrders.createOrReplaceTempView(“evenOrders”)
 
val evenQuery: String =select /*+ shuffle_hash(orders) */ sum(tx.price * tx.quantity) as revenue, o.orderId
from evenTx as tx inner join evenOrders as o
on tx.orderId = o.orderId
where o.status =COMPLETEand o.date between ‘2020-01-01’ and ‘2020-03-31group by o.orderId
val evenResults: DataFrame = spark.sql(evenQuery)


// 加盐处理
import org.apache.spark.sql.functions.udf
 
//定义获取随机盐粒的UDF
val numExecutors: Int = _
val rand = () => scala.util.Random.nextInt(numExecutors)
val randUdf = udf(rand)
 
//第一阶段的加盐操作。注意:保留orderId字段,用于后期第二阶段的去盐化
 
//外表随机加盐
val saltedSkewTx = skewTx.withColumn(“joinKey”, concat($“orderId”, lit(_), randUdf()))
 
//内表复制加盐
var saltedskewOrders = skewOrders.withColumn(“joinKey”, concat($“orderId”, lit(_), lit(1)))
for (i <- 2 to numExecutors) {
  saltedskewOrders = saltedskewOrders union skewOrders.withColumn(“joinKey”, concat($“orderId”, lit(_), lit(i)))
}


//将加盐后的数据分别注册为临时表
saltedSkewTx.createOrReplaceTempView(“saltedSkewTx”)
saltedskewOrders.createOrReplaceTempView(“saltedskewOrders”)
 
val skewQuery: String =select /*+ shuffle_hash(orders) */ sum(tx.price * tx.quantity) as initialRevenue, o.orderId, o.joinKey
from saltedSkewTx as tx inner join saltedskewOrders as o
on tx.joinKey = o.joinKey
where o.status =COMPLETEand o.date between ‘2020-01-01’ and ‘2020-03-31group by o.joinKey
//第一阶段加盐、Shuffle、关联、聚合后的初步结果
val skewInitialResults: DataFrame = spark.sql(skewQuery)


// 去盐
val skewResults: DataFrame = skewInitialResults.select(“initialRevenue”, “orderId”)
.groupBy(col(“orderId”)).agg(sum(col(“initialRevenue”)).alias(“revenue”))

shuffle原理

HashShuffleManager,现在已经没有了
未优化的方式,task 生成的磁盘文件会很多

优化后的

现在 ShuffleManager接口只有一个实现类

  • SortShuffleManager

普通情况下,会先排序,再写文件,最后 merge,这样文件就很少了
为了方便下游 task read shuffle 的数据,还有一个 index 文件,记录了 key 对应文件的 offset
总的文件数量,就是每个 executor 上的 task 数量

bypass 机制,最后也会merge 成一个文件,只是不做 sort 了,省了一些时间

源码层面优化

join优化

  • bucked join
  • colocate join

with 的优化

  • 创建 自己的 with 逻辑计划
  • 逻辑计划中的 查询做优化
  • 转换成自定义的 物理计划
  • 物理计划缓存
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
object CTECachingRule extends Rule[LogicalPlan] {
  override def apply(plan: LogicalPlan): LogicalPlan = {
    plan transform {
      case w @ WithCTE(child, cteDefs) =>
        cteMap(cteName) = ctePlan.cache()
        w
    }
  }
}


// 物理计划
// Custom rule to avoid redundant shuffles on cached CTEs
object CTEPhysicalPlanReuseRule extends Rule[SparkPlan] {
  override def apply(plan: SparkPlan): SparkPlan = plan transform {
    case exchange: ReusedExchangeExec =>
      // Ensure cached CTE result is reused directly
      if (isCached(exchange)) {
        exchange.withNewChildren(Seq.empty)
      } else {
        exchange
      }
  }

  def isCached(plan: SparkPlan): Boolean = {
    // Logic to check if plan is cached in memory
    plan.isInstanceOf[ReusedExchangeExec]
  }
}

查询下推

  • mysql_table_a JOIN mysql_table_b JOIN pg_table_c
  • 可以将 mysql a,b 合并为一个 sql,下推到数据源
  • 将spark的逻辑计划,转为 calcite 逻辑计划,再转换为数据源的 sql

自动倾斜 join 优化

  • 需要一套元数据中心,通过元数据中心,获取数据源的信息
  • 扩展 CBO,通过表数据量,可以做 join 的order 调整
  • 扩展 优化规则,可能还需要一些物理计划
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
object SkewedJoinSaltingRule extends Rule[LogicalPlan] {
  override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case join @ Join(left, right, _, _) =>
      // Detect skewed keys (example: assume keys with high frequency)
      val skewedKeys = detectSkewedKeys(left)  // Custom logic to identify skewed keys

      if (skewedKeys.nonEmpty) {
        applySalting(join, skewedKeys)
      } else {
        join  // No modification if no skew detected
      }
  }

  def detectSkewedKeys(left: LogicalPlan): Set[Any] = {
    // Pseudo-code for detecting skewed keys
    // Example: Scan the data or use metadata to detect skew
    Set(1, 2, 3)  // Replace with dynamic skew detection logic
  }

  def applySalting(join: Join, skewedKeys: Set[Any]): LogicalPlan = {
    join transformExpressions {
      case key if skewedKeys.contains(key) =>
        // Add salt to the skewed keys
        val salt = Literal(Random.nextInt(100))
        Concat(salt, key.asInstanceOf[AttributeReference])
    }
  }
}

Limit大数量
Limit 一次性取 1亿条,limit 1亿非常慢
包括的Limit类型

  • CollectLimitExec
  • CollectTailExec
  • BaseLimitExec
  • LocalLimitExec
  • GlobalLimitExec

执行过程

  • 对于子RDD(上游RDD), 使用mapPartitionsInternal, 对每个partitions执行取前1亿行的操作
  • 将第一步输出RDD进行shuffle, 混洗成一个单分区RDD(SinglePartition)
  • 对单分区RDD再做一次取前1亿行的操作
  • 建设分区 100个,每个分区都要取 1亿,再shuffle 到一个分区,再对这个分区取 1亿

优化

  • 不要一次那么多
  • sample 采样获取
  • 每个分区 take少量一些,最后再汇总,limit

JOIN优化

bucked join

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
test("bucket join") {
    import org.apache.spark.sql.{SparkSession, Row}
    import org.apache.spark.sql.types._

    // Create the schema for people DataFrame
    val peopleSchema = StructType(Array(
      StructField("id", IntegerType, nullable = false),
      StructField("name", StringType, nullable = true),
      StructField("age", IntegerType, nullable = true)
    ))

    // Create the data as Seq[Row]
    val peopleData = Seq(
      Row(1, "Michael", 30),
      Row(2, "Andy", 29),
      Row(3, "Justin", 19)
    )

    // Create people DataFrame
    val peopleDF = spark.createDataFrame(
      spark.sparkContext.parallelize(peopleData),
      peopleSchema
    )

    // Create the schema for employee DataFrame
    val employeeSchema = StructType(Array(
      StructField("id", IntegerType, nullable = false),
      StructField("name", StringType, nullable = true),
      StructField("salary", IntegerType, nullable = true)
    ))

    // Create the data as Seq[Row]
    val employeeData = Seq(
      Row(1, "Michael", 3000),
      Row(2, "Andy", 4500),
      Row(3, "Justin", 3500),
      Row(4, "Berta", 4000)
    )

    // Create employee DataFrame
    val employeeDF = spark.createDataFrame(
      spark.sparkContext.parallelize(employeeData),
      employeeSchema
    )

    // Show the DataFrames
    peopleDF.show()
    employeeDF.show()


    // Save the DataFrames with bucketing by 'id'
    peopleDF.write
      .format("parquet")
      .bucketBy(3, "id")  // Bucketing by 'id' into 3 buckets
      .sortBy("id")  // Sort within each bucket by 'id'
      .saveAsTable("bucketed_people")

    employeeDF.write
      .format("parquet")
      .bucketBy(3, "id")  // Bucketing by 'id' into 3 buckets
      .sortBy("id")  // Sort within each bucket by 'id'
      .saveAsTable("bucketed_employees")

    // ==========================================
    // Load the bucketed tables
    val bucketedPeopleDF = spark.table("bucketed_people")
    val bucketedEmployeesDF = spark.table("bucketed_employees")

    // Perform the join on bucketed tables without shuffle
    val resultDF = bucketedPeopleDF.join(bucketedEmployeesDF, Seq("id"))

    // Show the result
    resultDF.show()

    // Check the physical plan to confirm bucketed join
    resultDF.explain("extended")


  }

最后生成的物理计划,没有了exchange操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#50, name#51, age#52, name#57, salary#58]
   +- SortMergeJoin [id#50], [id#56], Inner
      :- Sort [id#50 ASC NULLS FIRST], false, 0
      :  +- Filter isnotnull(id#50)
      :     +- FileScan parquet spark_catalog.default.bucketed_people[id#50,name#51,age#52] Batched: false, Bucketed: true, DataFilters: 
	  [isnotnull(id#50)], Format: Parquet, Location: InMemoryFileIndex(1 
	  paths)[file:/data0/test/spark-warehouse/bucketed_people], PartitionFilters: [], PushedFilters: [IsNotNull(id)], 
	  ReadSchema: struct<id:int,name:string,age:int>, SelectedBucketsCount: 3 out of 3
      +- Sort [id#56 ASC NULLS FIRST], false, 0
         +- Filter isnotnull(id#56)
            +- FileScan parquet spark_catalog.default.bucketed_employees[id#56,name#57,salary#58] 
			Batched: false, Bucketed: true, DataFilters: [isnotnull(id#56)], Format: Parquet, Location: InMemoryFileIndex(1 
			paths)[file:/data0/test/spark-warehouse/bucketed_employees], PartitionFilters: [], 
			PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:int,name:string,salary:int>, SelectedBucketsCount: 3 out of 3

4个大表 join问题
sql 执行过程,是前面两个 SMJ,然后跟 第三个 SMJ,再跟第四个 SMJ
整体的 DAG 是串行的

1
2
3
4
5
6
SELECT e.emp_id, e.emp_name, d.dept_name, p.project_name
FROM employees e
  INNER JOIN departments d ON e.dept_id = d.dept_id
  INNER JOIN assignments a ON e.emp_id = a.emp_id
  INNER JOIN projects p ON a.project_id = p.project_id
ORDER BY e.emp_id

优化,数据量不变,但是并行度提高了

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
-- Step 1: Join employees and departments
WITH t12 AS (
  SELECT e.emp_id, e.emp_name, d.dept_name, e.dept_id
  FROM employees e
  INNER JOIN departments d ON e.dept_id = d.dept_id
),

-- Step 2: Join assignments and projects
t34 AS (
  SELECT a.emp_id, p.project_name, a.project_id
  FROM assignments a
  INNER JOIN projects p ON a.project_id = p.project_id
)

-- Step 3: Join the two intermediate results t12 and t34
SELECT t12.emp_id, t12.emp_name, t12.dept_name, t34.project_name
FROM t12
LEFT JOIN t34 ON t12.emp_id = t34.emp_id
ORDER BY t12.emp_id;

打印的物理计划看,不再是串行了,而是 1 join 2, 3 join 4,是并行的
最后 1,2 的结果 join 3,4 的结果

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
AdaptiveSparkPlan isFinalPlan=false
+- Sort [emp_id#3 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(emp_id#3 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=446]
      +- Project [emp_id#3, emp_name#4, dept_name#12, project_name#18]
         +- SortMergeJoin [emp_id#3], [emp_id#23], LeftOuter
            :- Sort [emp_id#3 ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(emp_id#3, 200), ENSURE_REQUIREMENTS, [plan_id=439]
            :     +- Project [emp_id#3, emp_name#4, dept_name#12]
            :        +- SortMergeJoin [dept_id#5], [dept_id#11], Inner
            :           :- Sort [dept_id#5 ASC NULLS FIRST], false, 0
            :           :  +- Exchange hashpartitioning(dept_id#5, 200), ENSURE_REQUIREMENTS, [plan_id=424]
            :           :     +- Filter isnotnull(dept_id#5)
            :           :        +- Scan ExistingRDD[emp_id#3,emp_name#4,dept_id#5]
            :           +- Sort [dept_id#11 ASC NULLS FIRST], false, 0
            :              +- Exchange hashpartitioning(dept_id#11, 200), ENSURE_REQUIREMENTS, [plan_id=425]
            :                 +- Scan ExistingRDD[dept_id#11,dept_name#12]
            +- Sort [emp_id#23 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(emp_id#23, 200), ENSURE_REQUIREMENTS, [plan_id=440]
                  +- Project [emp_id#23, project_name#18]
                     +- SortMergeJoin [project_id#24], [project_id#17], Inner
                        :- Sort [project_id#24 ASC NULLS FIRST], false, 0
                        :  +- Exchange hashpartitioning(project_id#24, 200), ENSURE_REQUIREMENTS, [plan_id=431]
                        :     +- Scan ExistingRDD[emp_id#23,project_id#24]
                        +- Sort [project_id#17 ASC NULLS FIRST], false, 0
                           +- Exchange hashpartitioning(project_id#17, 200), ENSURE_REQUIREMENTS, [plan_id=432]
                              +- Scan ExistingRDD[project_id#17,project_name#18]




Process finished with exit code 0

RSS

Uber 的 原始 shuffle 问题

  • Hardware Reliability: Due to the large volumes of shuffle data being written to the SSD daily, Uber disks were worn out faster than the initial design. Instead of being sustained for three years, the disk at Uber wears out in 6 months.
  • Shuffle failure: when the reducer fetches the data from all mapper tasks on the same machine, the service becomes unavailable, which causes a lot of shuffle failures.
  • Noisy Neighbor Issue: An application that writes more significant shuffle data will potentially take all the disk space volume in the machine, which causes other applications on this machine to fail due to disk full exceptions.
  • Shuffle Service Unreliability: Uber user external shuffle service in YARN and Mesos for Spark. They often experienced the shuffle service being unavailable in a set of nodes.

普通的 shuffle 调用过程

Uber 的 RSS 架构

  • 先将 RSS服务注册到 ZK 上,driver根据负载情况选择 RSS 服务
  • mapper 将相同的分区直接写到固定的 RSS 上,reducer 也去指定的 RSS 上获取
  • 这样 reducer 就只需要跟一个 RSS通讯
  • 为保持高可用,mapper可以写多个 RSS机器

参考