查看原文
其他

Blaze:SparkSQL Native算子优化在快手的深度优化及大规模应用实践

王磊 博士 DataFunSummit
2024-09-10

导读 Blaze 是快手自研的基于向量化技术开发的一套 native 执行引擎,执行过程充分利用 native 代码和 SIMD 指令向量化计算的优势。目前已在快手内部部分业务上线,并实现了 30% 的算力提升。

本次分享将围绕下面三点展开:

1. Blaze 原理及架构设计

2. 面向生产的深度优化

3. 当前进展及未来规划

分享嘉宾|王磊博士 快手 大数据离线生产引擎及数据湖负责人 

编辑整理|李笑宇

内容校对|李瑶

出品社区|DataFun


01

Blaze 原理及架构设计

要回答 Blaze 是什么,先要从 Spark 的发展历程说起。Spark 作为业界当前离线生产链路的主力引擎,如何持续提升执行效率始终是个核心命题, 而 Spark 社区也一直在通过代码生成和自适应查询执行等手段在不断迭代优化。

1. Spark 1.0

Spark 1.0 阶段,采用的是解释执行模型,每个算子自成一个函数。解释执行模型最大的问题是效率比较低,存在大量的分支判断检查、无法充分利用 CPU 的指令流水、编译器难以优化、CPU 缓存不友好等问题,导致其执行效率很低。

2. Spark 2.0

到了 Spark 2.0 阶段,Spark 社区推出了多算子编译的概念,其基本思想是将几个简单算子的计算逻辑组织到最近的一个复杂算子中,通过减少运行时函数调用数量的方式来避免大量的函数调用开销。多算子编译功能在 Spark 2.0 之后已经作为 SparkSQL 的默认执行模式,称为 WholeStageCodegen 技术。相比于 1.0 阶段,在运行时执行效率上得到了大幅度优化。

3. Spark 3.0

到了 Spark 3.0,社区引入了一个很重要的功能 Adaptive Query Execution(AQE 自适应执行引擎),其核心改变是从之前在编译阶段静态地生成执行计划,变为可以在运行时对执行计划动态地优化。

4. Spark 未来

向量化执行是其中一个非常重要的方向,这也算是业界当前的共识。向量化执行的思想是将算子的执行粒度从每次处理一行变成每次处理一个行组,以此来避免大量的函数调用,提升 Cache 友好性,数据在内存上是紧密连续排列的,可以通过 SIMD 指令一次处理多个数据,充分发挥现代 CPU 的计算优势。另外当前大数据主流的存储结构(如 parquet)都是使用列式存储,向量化计算可以减少读写过程的行列转换开销。

基于上述背景,我们完成了 Blaze 的设计和实现。

Blaze 是快手自研的基于 rust 和 datafusion 框架开发的 Spark 向量化执行引擎,基于 Blaze,既能够充分发挥 Spark 分布式计算框架的优势,同时也能够利用 DataFusion Native 向量化算子执行的性能优势。

Spark+Blaze 的架构设计原理如下图:

对比 Spark 原生的执行流程,通过引入 Blaze Session Extension 组件进行翻译,将物理执行计划转化为等效的、native 向量化引擎可以识别的计划,提交到 Executor 端由 Native 引擎执行计算,而 Native 引擎基于 Rust+Datafusion+Arrow 实现,这样翻译后的 Native 算子执行效率远高于 Spark 原生 JVM。

Blaze 架构中的核心模块有四个:

  • Native Engine:基于 Datafusion 框架实现的与 Spark 功能一致的 Native 算子,以及相关内存管理、FFI 交互等功能。

  • ProtoBuf:定义用于 JVM 和 Native 之间的算子描述协议,对 Datafusion 执行计划进行序列化和反序列化。

  • JNI Bridge:实现 Spark Extension 和 Native Engine 之间的互相调用。

  • Spark Extension:Spark 插件,实现 Spark 算子到 Native 算子之间的翻译。

具体的执行过程如下:

  • 将 Spark 生成的物理执行计划转换为对应的 Native Plan;

  • Native Plan 的生成和提交;

  • Native 真正的执行。

其中最核心的模块自然是 Native 向量化引擎,由 Rust 开发实现,包含了 Spark 大多数算子功能等价的对应算子,例行 Project、Filter、Sort 等等。这些算子在执行时可以通过向量化方式取得更高的计算效率。

Native 引擎可以通过 Shuffle 算子直接将计算结果写出到磁盘文件完成计算、或者通过 arrow FFI 将结果回传到 Spark、也可以通过 JNI 方式与 Spark 进行数据交互,如直接读写 HDFS 文件、读写 Remote Shuffle Service 等。

我们使用了 datafusion 作为我们的 Native Engine 基座,但是主要的算子,我们都面向 Spark 场景做了自己的重新实现,以便得到更优的执行性能。

02

面向生产的深度优化

在跑通 tpch 和 tpcds 测试集并取得预期性能提升后,我们面向线上生产环境进一步做了系列深度优化,包括性能和稳定性等方面工作:

1. 细粒度的 FailBack 机制

在理想情况下,我们希望 Spark 在执行的全过程中都能够实现向量化能力。现实情况下是实现全方位的算子/表达式向量化覆盖需要时间和过程。另一方面是业务编写的 Java UDF 函数没办法直接转化为 Native 执行。因此,必须考虑 Spark 原生执行和 Native 的执行融合。

针对上述问题,我们实现了演进式向量化执行,支持算子、表达式多种层级的细粒度算子回退机制。

在 Blaze 翻译算子过程中,如果遇到暂无 native 实现的算子、表达式、或者 UDF,会触发回退机制。先计算出算子/表达式依赖的参数列,通过 Arrow FFI 将参数列传入 spark 中解析成行,调用 spark 计算,再将结果值以列的形式通过 Arrow FFI 传回到 Blaze。因此,我们在 jvm 和 native 执行之间搭建了一个桥梁,支持部分向量化执行。这样我们不需要等所有的算子翻译完毕,可以一边推进,一边应用上线。

2. 基于 CBO 的转换策略

演进式执行模式也存在一个问题 Native 算子本身是基于 Arrow 格式的列存,Spark 原生算子是 InternalRow 行式存储,所以转换过程中会引入各种行列转换的开销。大量的行列转换,可能导致查询整体性能严重退化。

我们在 Blaze 的 SessionExtention 组件中实现了一套基于规则和代价的转换策略,如果预期某一部分操作转换为 Native 算子后引入的行列转换开销会降低最终执行的效率,那就不会对它进行转换,以此来保证在部分算子未完全覆盖的场景下执行效率也至少不会低于原生的 Spark 引擎。例如对于 Generate->R2C->NativeFilter 可以变换为 Generate->Filter->R2C,变换后 R2C 只需要处理 filter 之后的数据传递给下一个算子,可以显著减少行列转换的开销。对于 ETL 任务,我们还可以进一步基于历史执行情况等信息,判断该任务是否适用于 Blaze。

3. 更高效的向量化数据传输格式

Spark 中,Shuffle 一直是性能杀手,数据在分布式节点之间通过 shuffle 进行流转,对于 shuffle 数据需要进行编码、压缩、网络传输、解压、解码几个步聚,编码压缩大小直接影响网络 io 数据量和性能。在原生的 spark 中,shuffle 的数据通过 row-based 序列化+压缩传输,业界的向量化数据通常使用 arrow 格式进行传输。

但是实践过程我们发现 arrow 格式与主流的轻量压缩算法(snappy/lz4/zstd 等)适配度不好,主流的轻量压缩算法只识别 4 字节以上的重复数据,在 arrow 中使用 int32 存放偏移量,一般只有前 3 字节重复,无法被压缩,会导致压缩率偏低。

另外 arrow 在写每个 batch 的时候都会带上字段名、类型这些信息,导致大量冗余数据。在一开始的 tpc-ds 测试中,我们发现部分 case 性能退化资源开销变大,经过排查发现是 shuffle 数据大小增加导致的。

所以我们在 Blaze 中定制了传输格式,除了尽可能去除冗余信息(如列名、数据类型)之外,还使用了我们自定义的 byte-transpose 列式数据序列化格式提升数据压缩率,使得数据量大幅下降。

Byte-transpose 是将整型/浮点型向量数据的字节顺序进行重组,先输出所有元素的高字节,再输出低字节,由于在计算过程中高字节通常是相同的(如对于整数,大部分高字节都是 0),重组之后高字节可以被有效压缩。采用我们自定义的更高效的向量化数据传输格式后,通过线上实际测试,shuffle 数据量显著下降,同时 tpc-ds 测试遇到的 bad case 也都得到了优化。

4. 多级内存管理策略

Spark 自身有一套相对完善的内存管理机制,这套机制主要针对堆内内存,比如内存不足的情况下,数据会自动溢写到磁盘。而 Native 使用的都是堆外内存,无法发挥 Spark 的内存管理优势。因此,如何统一管理堆内堆外内存,避免线上任务内存不足或者频繁 Spill,是上线稳定性的一大挑战。我们首先根据任务的算子向量化覆盖情况实现了堆内堆外内存的自适应调节;其次,实现了堆外内存->堆内内存->磁盘文件的多级内存管理机制。

基于上述手段,对于向量化引擎上线后任务稳定运行提供了有力保障,不需要用户手动调整内存参数来适配新的执行模式,尽可能减少用户成本。

5. 复杂度更优的聚合算法实现

为了更好适应 spark 的需求,aggregate、sort、shuffle 等复杂算子都没有直接复用 datafusion 的实现,而是在 blaze 代码中单独开发实现。例如,Spark 在 HashAggregate 算子进行大数量 group-by 聚合计算时,内存不足触发 spill 时,会从 hash-based 聚合退化成使用多路归并排序算法的 sort-based 聚合,对 spill 数据按 group-key 进行全排序,最后逐条数据归并聚合输出结果,排序和合并过程都是 O(nlgn)的复杂度。

在 Blaze 中,我们实现了基于分桶的归并方式,触发 spill 时使用基数排序对数据按 hash 进行分桶、溢写;合并阶段对多个 spill 块相同 id 的桶使用 hash 表进行合并,两个阶段的复杂度都是 O(n),实践验证,能够大幅提升聚合算子的执行效率。

6. 向量化计算场景的表达式重复计算优化

SQL 执行过程中多个算子出现相同的表达式时,往往会引入重复计算,如图的示例 SQL 中,Project 和 Filter 出现了重复的表达式。

Spark 可以通过 Whole-stage codegen 技术解决一部分此类问题,而业界的向量化引擎 (如 datafusion、velox)并没有相应的解决方案。

在 Blaze 中,我们参照 Whole-stage codegen 技术解决了常见的重复计算问题。Blaze 会尝试将包含重复表达式的算子,如图中的 Project 和 Filter 合并为一个大算子,并在其中对表达式计算结果进行缓存、复用,达到了减少重复计算、提高执行效率的目的。在发生重复的表达式计算逻辑较为复杂(如 json 解析、UDF 调用等)的 case,这个优化往往能将执行效率提高一倍以上。

在内部一些典型业务场景会调用一个业务逻辑很重的 UDF,重复计算会带来很大开销,经过上述优化,可以减少大约 40% 的计算开销。

03

当前进展及未来规划

1. Blaze 当前已经支持以下方面

  • Parquet 的向量化读写

  • 线上常用算子的全面支持

  • 线上常用表达式的全面支持

  • 支持内部自研 Remote Shuffle Service

在 TPC-H 的测试中,Blaze 通过了所有 22 个测试场景,性能平均提升了 2.8 倍。

在真实的生产环境中,Adhoc 场景 Spark 引擎全量上线向量化执行,算力平均提升 30%+,ETL 场景覆盖 40% 任务。

2. Blaze 的未来规划

  • 持续迭代优化,内部线上推全

  • 支持更多引擎或场景,例如数据湖等

  • 开源社区运营建设

项目地址:https://github.com/kwai/blaze

当前关注度:934 star ,日均 20-30 人访问,每月 PR 约 30 个。

非常欢迎大家和我们一起共建社区。
以上就是本次分享的内容,谢谢大家。


分享嘉宾

INTRODUCTION


王磊博士

快手

大数据离线生产引擎及数据湖负责人

毕业于中国科学院大学,博士学历。

曾任职于360系统部,担任离线计算组负责人。

现任职于快手数据平台部数据引擎技术中心,负责离线生产引擎及数据湖研发。

往期推荐


Spark 内核的设计原理

LLM+Data:大模型在大数据领域应用新范式

沐瞳指标管理与智能分析

信贷场景广告投放优化实践

数据仓库模型管理与标签资产价值评估实践

DataFunCon 2024·北京站首日圆满收官

数据指标在金融行业的应用

全球化视野下,多云数据架构如何应对出海挑战?

京东零售数据湖应用与实践

辛选集团数据建设历程以及数据在直播电商的应用

点个在看你最好看

SPRING HAS ARRIVED

继续滑动看下一个
DataFunSummit
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存