概览
Spark + Delta-Lake,写入Parquet 文件,总体流程如下:
图片来自 -> 这里
1
2
3
4
5
6
7
8
9
|
df.write.format("delta").save(path)
def write: DataFrameWriter[T] = {
if (isStreaming) {
logicalPlan.failAnalysis(
"'write' can not be called on streaming Dataset/DataFrame")
}
new DataFrameWriter[T](this)
}
|
format(“delta”) 这个别名的注册,是通过DeltaDataSource
实现了这个接口
以插件的形式,注入到 Spark 中
DataFrameWrite 会将执行流程 转到 saveToV1Source 函数,这个函数又会调用 planForWriting
1
2
3
4
5
6
7
8
9
10
11
12
13
|
def planForWriting(mode: SaveMode, data: LogicalPlan): LogicalPlan = {
providingInstance() match {
case dataSource: CreatableRelationProvider =>
disallowWritingIntervals(data.schema.map(_.dataType), forbidAnsiIntervals = true)
SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, mode)
case format: FileFormat =>
disallowWritingIntervals(data.schema.map(_.dataType), forbidAnsiIntervals = false)
DataSource.validateSchema(data.schema)
planForWritingFileFormat(format, mode, data)
case _ => throw new IllegalStateException(
s"${providingClass.getCanonicalName} does not allow create table as select.")
}
}
|
然后执行 SaveIntoDataSourceCommand
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
case class SaveIntoDataSourceCommand(
query: LogicalPlan,
dataSource: CreatableRelationProvider,
options: Map[String, String],
mode: SaveMode) extends LeafRunnableCommand {
override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
override def run(sparkSession: SparkSession): Seq[Row] = {
val relation = dataSource.createRelation(
sparkSession.sqlContext, mode, options, Dataset.ofRows(sparkSession, query))
try {
val logicalRelation = LogicalRelation(relation, relation.schema.toAttributes, None, false)
sparkSession.sharedState.cacheManager.recacheByPlan(sparkSession, logicalRelation)
} catch {
case NonFatal(_) =>
// some data source can not support return a valid relation, e.g. `KafkaSourceProvider`
}
|
这里调用createRelation
,所以会创建出DeltaDataSource
这里包含了 事务优化,写数据文件,写元数据等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
val path = parameters.getOrElse("path", {
throw DeltaErrors.pathNotSpecifiedException
})
val partitionColumns = parameters.get(DeltaSourceUtils.PARTITIONING_COLUMNS_KEY)
.map(DeltaDataSource.decodePartitioningColumns)
.getOrElse(Nil)
val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, new Path(path), parameters)
WriteIntoDelta(
deltaLog = deltaLog,
mode = mode,
new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf),
partitionColumns = partitionColumns,
configuration = DeltaConfigs.validateConfigurations(
parameters.filterKeys(_.startsWith("delta.")).toMap),
data = data,
// empty catalogTable is acceptable as the code path is only for path based writes
// (df.write.save("path")) which does not need to use/update catalog
catalogTableOpt = None
).run(sqlContext.sparkSession)
deltaLog.createRelation()
}
|
参考代码,这里
最后是调用FileFormatWriter
,将数据写入存储系统
这里会创建多个 Task,然后并行的写入
如果写入成功,最后会做一些统计信息,否则就是丢弃
对于每个 Task内部,则是调用ParquetOutputWriter
,开始写入 Parquet
Parquet 内部
一些概念
- Magic Number and File Signature
- Row Group,数据按照 row 分成多块
- Column Chunk,每个 row group内部,按照 column 来存储
- Page,最小单元,每个 Column Chunk由多个 Page组成,page包括不同类型,page 中还包含了 head,用来描述结构
- Data Pages store the actual column data.
- Index Pages store the index of column data.
- Dictionary Pages store dictionary-encoded data (if applicable).
- Footer,包括:FileMetaData,Row Group MetaData,其中第二个又包括:
- Column paths and encodings
- Number of values
- Offsets for the first data and index pages
- Compressed/uncompressed sizes
- Additional key/value metadata.
- Metadata
Spark内部执行 ParquetOutputWriter,然后创建 ParquetOutputFormat
之后会将 Spark SQL Catalyst InternalRow 转为 Parquet messages
ParquetFileWriter 封装了文件级别操作,然后设置 magic number
Parquet 虽然是按照列来存储的,但是 DataFrame 是按照 row by row 处理的
也就是不断遍历:Iterator[InternalRow]
调用org.apache.parquet.hadoop.InternalParquetRecordWriter
1
2
3
4
5
|
public void write(T value) throws IOException, InterruptedException {
writeSupport.write(value);
++ recordCount;
checkBlockSizeReached();
}
|
再调用 实现类org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
override def write(row: InternalRow): Unit = {
consumeMessage {
writeFields(row, schema, rootFieldWriters)
}
}
private def writeFields(
row: InternalRow, schema: StructType, fieldWriters: Array[ValueWriter]): Unit = {
var i = 0
while (i < row.numFields) {
if (!row.isNullAt(i)) {
consumeField(schema(i).name, i) {
fieldWriters(i).apply(row, i)
}
}
i += 1
}
}
|
在 Parquet 端,调用 ValuesWriter对输入的值做编码
相关的编码实现类如下:
当达到 page 的阈值后,就会压缩这些数据,包括
- value count
- null value count
- min/max value
parquet 内部按照 array list of byte[] (List<byte[]>) 存储的
以上都是发生在 memory 中,真正的写磁盘是触发到了 row group 阈值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
private void flushRowGroupToStore()
throws IOException {
recordConsumer.flush();
LOG.debug("Flushing mem columnStore to file. allocated memory: {}", columnStore.getAllocatedSize());
if (columnStore.getAllocatedSize() > (3 * rowGroupSizeThreshold)) {
LOG.warn("Too much memory used: {}", columnStore.memUsageString());
}
if (recordCount > 0) {
rowGroupOrdinal++;
parquetFileWriter.startBlock(recordCount);
columnStore.flush();
pageStore.flushToFileWriter(parquetFileWriter);
recordCount = 0;
parquetFileWriter.endBlock();
this.nextRowGroupSize = Math.min(
parquetFileWriter.getNextRowGroupSize(),
rowGroupSizeThreshold);
}
columnStore = null;
pageStore = null;
}
|
之后,Parquet解除当前的列存储和页存储并重新初始化新的存储,并继续读取剩余的数据框行并重复该过程
FileMetaData Write
在最后,将所有 row group 的 meta 信息存储
Parquet Encoding
Dictionary Encoding
Run Length Encoding (RLE)
- 可以看做是 dictionary encoding 的补充
- 如果一个值反复出现了多次,就记录【出现的次数】- 【值的内容】
- 如果column 中包括的都是不相同的值,则效果就不好了
Delta Encoding
- 适合时间戳这样的,用 long 来表示每个时间戳的值
- 起始值是一个比较大的值,而后面每次变化其实不大
- 只需要很少的 bit 就可以存储每次增量的变化
parquet 如何实现 encoding
- 根类是 ValuesWriter
- 实现的抽象类是 DictionaryValuesWriter,它下面还有各种类型的 具体实现类
下面是 PlainBinaryDictionaryValuesWriter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public static class PlainBinaryDictionaryValuesWriter extends DictionaryValuesWriter {
/* type specific dictionary content */
protected Object2IntMap<Binary> binaryDictionaryContent = new Object2IntLinkedOpenHashMap<>();
public PlainBinaryDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage,
Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) {
super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage, allocator);
binaryDictionaryContent.defaultReturnValue(-1);
}
@Override
public void writeBytes(Binary v) {
int id = binaryDictionaryContent.getInt(v);
if (id == -1) {
id = binaryDictionaryContent.size();
binaryDictionaryContent.put(v.copy(), id);
// length as int (4 bytes) + actual bytes
dictionaryByteSize += 4L + v.length();
}
encodedValues.add(id);
}
。。。
|
内部使用 hash-map 做编号的,超过阈值后,将 这个编号信息 flush
RLE 的实现是RunLengthBitPackingHybridValuesWriter
如果重复的次数超过 8 次,使用下面算法
1
2
3
4
|
rle-run := <rle-header> <repeated-value>
rle-header := varint-encode((rle-run-len) << 1)
repeated-value := value that is repeated, using a fixed-width of round-up-to-next-byte(bit-width)
*varint-encode is ULEB0128 coding
|
否则使用下面算法
1
2
3
|
bit-packed-run := <bit-packed-header> <bit-packed-values>
bit-packed-header := varint-encode(<bit-pack-scaled-run-len> << 1 | 1)
bit-pack-scaled-run-len := (bit-packed-run-len) / 8
|
delta 的实现很多,包括
- DeltaBinaryPackingValuesWriter , Int32
- DeltaLengthByteArrayValuesWriter ,Int64
- DeltaLengthByteArrayValuesWriter
- DeltaByteArrayWriter
就是通过计算本次,前一次的差值来写入的
parquet 通过 factory 创建不同的 writer
接口类
1
2
3
4
5
|
public interface ValuesWriterFactory {
void initialize(ParquetProperties var1);
ValuesWriter newValuesWriter(ColumnDescriptor var1);
}
|
DefaultValuesWriterFactory 的主要实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
public ValuesWriter newValuesWriter(ColumnDescriptor descriptor) {
return this.delegateFactory.newValuesWriter(descriptor);
}
static DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path, ParquetProperties properties, Encoding dictPageEncoding, Encoding
dataPageEncoding) {
switch(path.getType()) {
case BOOLEAN:
throw new IllegalArgumentException("no dictionary encoding for BOOLEAN");
case BINARY:
return new PlainBinaryDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), dataPageEncoding, dictPageEncoding,
properties.getAllocator());
case INT32:
return new PlainIntegerDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), dataPageEncoding, dictPageEncoding,
properties.getAllocator());
case INT64:
return new PlainLongDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), dataPageEncoding, dictPageEncoding,
properties.getAllocator());
case INT96:
return new PlainFixedLenArrayDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), 12, dataPageEncoding,
dictPageEncoding, properties.getAllocator());
case DOUBLE:
return new PlainDoubleDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), dataPageEncoding, dictPageEncoding,
properties.getAllocator());
case FLOAT:
return new PlainFloatDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), dataPageEncoding, dictPageEncoding,
properties.getAllocator());
case FIXED_LEN_BYTE_ARRAY:
return new PlainFixedLenArrayDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(), path.getTypeLength(),
dataPageEncoding, dictPageEncoding, properties.getAllocator());
default:
throw new IllegalArgumentException("Unknown type " + path.getType());
}
}
|
也实现自定义的 encoding 类,只要继承 ValuesWriterFactory 即可
然后 在 org.apache.parquet.column.Build#withValuesWriterFactory 进行设置
Vectorised Parquet Reading
早期的时候是一次读一行,现在改为一次读一个 batch
ColumnBatch 包装了若干个 ColumnVectors,ColumnVectors 包括了一个列的若干值
ColumnBatch 也提供了 row 迭代的访问方式,
提供了获取各种数据类型的方式
Spark 在读取的时候,是重用了 ColumnVector 实例,这样开销就很小了
Spark实现的parquet 主要包含
- VectorizedParquetRecordReader
- VectorizedColumnReader
- VectorizedValuersReader
执行过程
- 初始化 VectorizedParquetRecordReader,调用nextKeyValue
- 创建多个 VectorizedColumnReaders,每个对应一个column
- 之后通过VectorizedValuesReader,用来读取和解码数据
- 有两个版本的VectorizedRleValuesReader (RLE/Bit-PackingHybrid)
- 以及VectorizedPlainValuesReader(for plain encoding)
- 最后将这些数据返回给 VectorizedParquetRecordReader,再由上层进一步消费
VectorizedParquetRecordReader
- 继承自抽象类RecordReader,覆盖了 nextKeyValue 函数
- 读之前会创建capacity 和 内存模式(on-heap or off-heap)
- 首先创建 ColumnBatch,然后创建 ColumnVector(对应于每个column)
- 再通过VectorizedColumnReader 读每个 batach column
VectorizedParquetRecordReader#nextBatch 主要逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
public boolean nextBatch() throws IOException {
for (ParquetColumnVector vector : columnVectors) {
vector.reset();
}
columnarBatch.setNumRows(0);
if (rowsReturned >= totalRowCount) return false;
checkEndOfRowGroup();
int num = (int) Math.min(capacity, totalCountLoadedSoFar - rowsReturned);
for (ParquetColumnVector cv : columnVectors) {
for (ParquetColumnVector leafCv : cv.getLeaves()) {
VectorizedColumnReader columnReader = leafCv.getColumnReader();
if (columnReader != null) {
columnReader.readBatch(num, leafCv.getValueVector(),
leafCv.getRepetitionLevelVector(), leafCv.getDefinitionLevelVector());
}
}
cv.assemble();
}
rowsReturned += num;
columnarBatch.setNumRows(num);
numBatched = num;
batchIdx = 0;
return true;
}
|
VectorizedColumnReader & VectorizedValuesReader
- VectorizedColumnReader 创建对应的 ColumnDescriptor
- 执行readBatch 时,传递两个参数,total(读batch 的数量),column
如果 isCurrentPageDictionaryEncoded 被打开,则计算 dictionary encoded column values
如果isCurrentPageDictionaryEncoded 关闭,则根据类型做解码
这里是通过 VectorizedValueReader 来读取,并解码的
参考