Daily Upserts and Queries


标题: “每日Upserts和查询” 权重: 32


实验2 - 每日Upserts和查询 中, 在对Hudi表执行upserts和读取操作时, 做出了几个配置选择以优化性能。在本节中, 我们将深入解释一些关键的Hudi概念和操作, 以帮助我们完全理解实验笔记本中的调优和配置选择。

请注意, 实验中使用的配置在笔记本本身中有解释。本节将重点解释Hudi的内部工作原理, 以便更深入地理解实验中的优化。

Upsert如何工作

Hudi upsert操作包括以下步骤:

  1. 对upsert数据批次进行重复数据删除。 这会触发对要upsert的数据框进行reduceByKey操作, 这会引发Spark shuffle, 从而创建一个数据框, 其中:

    • hoodie.upsert.shuffle.parallelism个分区(Hudi < 0.13.0)
    • MIN(dataframe.numPartitions(), hoodie.upsert.shuffle.parallelism)个分区(Hudi >= 0.13.0)
  2. 检索每个受影响的表分区的最新文件切片(即表的最新版本)。

  3. 将更新分配给文件,通过标记记录为更新来实现。

    更新是通过索引查找来识别的, 这一步的具体操作取决于配置的索引类型。

    ::::tabs{variant="container” groupId="index” activeTabId="simple”}

    :::tab{id="simple” label="SIMPLE INDEX”}

    1. 执行精简连接,将传入的upsert记录与从存储中提取的所有键进行连接。
    2. 标记匹配的记录为更新。
    3. 对更新进行分桶,为每个受影响的文件创建一个虚拟桶, 并将针对同一文件的更新记录分配到同一个桶中。 :::

    :::tab{id="bloom” label="BLOOM INDEX”}

    1. 范围剪枝(如果启用了hoodie.bloom.index.prune.by.ranges)通过将要upsert的记录键与文件中_hoodie_record_key列的最小值最大值进行比较:

      • 如果启用了列统计(hoodie.metadata.index.column.stats.enable=true)且hoodie.bloom.index.use.metadata=true, 则通过元数据表查找来执行此检查
      • 如果禁用了列统计(hoodie.metadata.index.column.stats.enable=false)或hoodie.bloom.index.use.metadata=false, Hudi使用Parquet文件页脚中的hoodie_min_record_keyhoodie_max_record_key值。

      此操作过滤掉了不包含要upsert的记录键的文件。 默认情况下, Hudi会启用范围剪枝。

    2. 检查剩余文件的布隆过滤器以进一步剪枝文件。

      • 如果布隆过滤器索引存储在元数据表上(hoodie.metadata.index.bloom.filter.enable=true)且hoodie.bloom.index.use.metadata=true, Hudi会将要upsert的记录键与存储在元数据表中的_hoodie_record_key bloomfilter值进行比较。
      • 如果布隆过滤器索引未存储在元数据表上(hoodie.metadata.index.bloom.filter.enable=false)或hoodie.bloom.index.use.metadata=false, Hudi会将要upsert的记录键与Parquet文件页脚中的org.apache.hudi.bloomfilter值进行比较。
    3. 检查候选文件并标记记录。

      在范围剪枝和布隆过滤器操作之后, 我们为每个传入的记录都有一个可能包含该记录的文件列表(需要更新)。然后将记录按这些文件ID分组, 并对这些文件进行完整检查, 以完成标记(即标记记录为更新或插入)。需要进行这个最终的完整检查, 因为布隆过滤器可能会产生假阳性, 即它们指示文件可能包含记录键。

    4. 为每个受影响的文件创建一个虚拟桶, 并将针对同一文件的更新记录分配到同一个桶中。 ::: ::::

  4. 识别每个分区路径下的小文件

    这些是大小低于配置的hoodie.parquet.small.file.limit(默认为100MB)的文件。

  5. 基于之前提交写入的文件计算平均记录大小

    如果没有之前的提交其totalBytesWritten大于[hoodie.record.size.estimation.threshold * hoodie.parquet.small.file.limit](1.0 * 100MB), 则使用hoodie.copyonwrite.record.size.estimate(默认为1024)。

  6. 将插入分配到小文件,直到达到hoodie.parquet.max.file.size(默认为120MB)。

    首先将插入添加到现有的小文件(现有的桶)。一旦这些文件的大小超过hoodie.parquet.max.file.size, 就会生成新的文件(新的桶)来写入插入数据记录。

  7. 将upsert数据写入Hudi表

    每个虚拟桶都被分配给一个Spark任务, 该任务执行写操作, 确保生成的文件大小小于hoodie.parquet.max.file.size。如果分配的数据会生成一个超过此阈值的文件, 它会被拆分成多个文件以满足最大文件大小要求。

下图直观地表示了这个过程。

调优upsert性能

在实验笔记本中, 我们使用了几种调优选项来优化我们在Copy On Write(COW)表上的每日批量upsert的性能。以下各小节提供了有关使用的每种技术的更多信息。

优化1: 范围剪枝

单调递增的记录键允许进行高效的范围剪枝, 从而提高索引查找和整体upsert性能。请参见批量插入 以获得详细解释。


实验2 - 每日upserts和查询 中, 我们将记录键选为YYYYMMDDHHmm_droneID, 以获得单调递增的记录键。这意味着具有相似时间戳(记录键前缀)的记录将被共同定位在同一个文件上。

这确保了在执行每日upsert操作时进行高效的范围剪枝, 因为只需要处理最新添加的文件。请注意, 在我们特定的实验用例中, 范围剪枝的效率会随着时间的推移而提高。每日upsert包括新一天的数据(插入)和前一天数据的较少更新。因此, 随着我们在一个月内的进展, 我们可以看到范围剪枝的影响越来越大。

下面是一个示例, 演示了我们实验用例中的文件如何受到影响, 以及范围剪枝如何随时间而提高过滤效率。

请注意, 我们初始数据集的批量插入也是按照这个单调递增的记录键进行排序的。这确保了初始批量加载中的适当共定位。

优化2: upsert并行度

hoodie.upsert.shuffle.parallelism控制用于重复数据删除标记upsert阶段的并行度。如Upsert如何工作 所述, 由于Spark shuffle, 这些阶段可能很昂贵, 因此正确配置这些操作的并行度非常重要。

  • Hudi < 0.13.0: Upsert并行度为hoodie.upsert.shuffle.parallelism, 其默认值为200。
  • Hudi >= 0.13.0: Upsert并行度为MIN(dataframe.numPartitions(), hoodie.upsert.shuffle.parallelism), 其中hoodie.upsert.shuffle.parallelism默认设置为0, 这会保持要upsert的数据框分区数作为并行度。在Hudi 0.13.0中引入了使用值0的可能性。

实验2 - 每日upserts和查询 中, 我们要upsert的输入数据框有20个分区, 我们将默认的hoodie.upsert.shuffle.parallelism保留为0, 这将导致使用20的upsert并行度。由于我们的Spark应用程序有16个核心, 我们将输入数据框重新分区到可用核心数的倍数(即48), 这样可以充分利用我们的可用资源。这是一个通用的Spark最佳实践。 请注意, 将hoodie.upsert.shuffle.parallelism设置为48将不会允许我们达到48的并行度, 因为在Hudi 0.13+中, upsert并行度是MIN(dataframe.numPartitions(), hoodie.upsert.shuffle.parallelism), 当hoodie.upsert.shuffle.parallelism被指定且>0时。在我们的例子中, 这仍然会导致只有20的并行度。

优化3: 文件大小

对于COW表, 受更新操作影响的文件会被完全重写。因此, 如果我们有一个90MB的文件受到更新影响, 该文件将被完全重写。我们在Upsert如何工作 的第6步中看到, Hudi upserts会检测小文件(< hoodie.parquet.small.file.limit)并尝试将插入记录打包到这些文件中, 直到达到配置的hoodie.parquet.max.file.size。这可能会使文件的实际写入, 即Upsert如何工作 的第7步, 成为一个重量级操作, 对整体upsert性能有很大影响。

例如: 如果一个upsert必须(重新)写入10个文件作为其输出, 将有10个Spark任务并行运行, 每个任务写入一个文件。即使我们增加了Spark应用程序中可用的核心数, 也只有10个核心用于最终的写入阶段。写入任务的持续时间取决于它必须写入的数据量。

实验2 - 每日upserts和查询 中, 我们发现upsert操作的大部分时间实际上花在了这个最终的写入阶段。提高upsert性能的一种可能性是减小Hudi数据文件的大小。我们可以通过减小hoodie.parquet.small.file.limithoodie.parquet.max.file.size值来实现这一点。结果是, 我们将在写入阶段看到更高的并行度, 因为我们将写入更多、更小的文件。不仅我们将有更多并行运行的任务, 每个写入任务的持续时间也会更短, 因为要写入的文件更小。这会导致upsert的整体性能提高。

请注意, 这需要在读取和写入性能之间进行权衡。减小文件大小将提高写入性能, 但会降低读取性能。为了在之后获得更大的文件, 一种可能性是在非活动分区上运行聚簇操作。

查询

快照查询如何访问数据

我们将使用一个示例来解释查询行为。让我们考虑一个在特定列X上有过滤器的查询(例如, 其中X == Y或X < Y)。

数据访问行为取决于是否启用了元数据列统计(hoodie.metadata.index.column.stats.enable)和元数据表布隆过滤器(hoodie.metadata.index.bloom.filter.enable)。下面我们分别介绍两种情况下的查询示例。


hoodie.metadata.index.column.stats.enablehoodie.metadata.index.bloom.filter.enable都禁用时:

  1. 从元数据表中检索最新的文件切片(我们数据的最新版本)。
  2. 将对列X的过滤器下推(spark.sql.parquet.filterPushdown默认设置为true)。这意味着只读取Parquet文件页脚, 以评估X列的列统计, 并最终决定是否跳过该文件。
  3. 未被跳过的文件被加载到Spark任务的内存中, 并与过滤器进行比较。
  4. 返回最终结果。

当`hoodie.metadata.index