实战经验:Apache Spark性能调优的五大秘诀

Apache Spark 于 2013 年开源,至今仍是最受欢迎且功能强大的计算引擎之一。然而,使用Spark也面临着挑战,诸如缩容、数据倾斜和内存溢出等问题。本文基于大规模Spark任务积累的经验,介绍一些技巧以规避可能遇到的常见陷阱,推

实战经验:Apache Spark性能调优的五大秘诀

Apache Spark 于 2013 年开源,至今仍是最受欢迎且功能强大的计算引擎之一。然而,使用Spark也面临着挑战,诸如缩容、数据倾斜和内存溢出等问题。本文基于大规模Spark任务积累的经验,介绍一些技巧以规避可能遇到的常见陷阱,推荐最佳实践。

秘诀 #1:为集群设置内存和存储

设置 Spark 集群的基础步骤是确定要为驱动程序和执行器分配多少计算资源、内存和存储。

CPU

CPU(或核心)决定了 Spark 应用程序的并行度。每个任务占用一个核心,因此应确保核心数量与任务数量和工作负载的并行性成正比。对于一般工作负载,通常每个执行器配备 2 - 5 个核心;如果频繁进行shuffle,可考虑使用核心数较少的执行器。通常,内存会随着每台机器的核心数量而增加。

堆内内存

默认情况下,Spark 仅使用堆内内存。堆内内存用于处理数据、缓存弹性分布式数据集(RDD)以及执行转换等任务。提供足够的内存来保存正在处理的数据非常重要;否则,可能会遇到内存溢出(OOM)错误,或者需要进行额外的shuffle操作,这可能会降低效率。另一方面,使用内存过大的执行器在进行垃圾回收时可能会导致长时间停顿(后面会详细介绍垃圾回收)。一般来说,最好避免使用大的堆大小(> 32GB),而是将数据分散到多个较小的执行器中。

堆外内存

使用堆外内存可以绕过 Java 垃圾回收,这有助于减少开销并提高内存密集型工作负载的性能。当你想缓存会被重复使用的数据(如大型静态数据集)时,这非常有用。

使用 spark.memory.offHeap.enabled 启用堆外内存。要注意,内存不是无限的 —— 堆外内存会减少可用于数据处理的堆内内存。一个通用的最佳实践是使用 spark.executor.memoryOverhead 将堆外内存设置为执行器内存的 10%。

存储

Spark 在处理大数据方面的优势之一就是当内存不足时,它能够将中间数据溢出到磁盘。默认情况下,Spark 使用其工作节点的本地磁盘来存储溢出的数据,但你可以将其配置为使用高性能存储以获得更好的性能。

然而,在为 Spark 集群配置内存和存储时要平衡性能和成本效益。将数据写入磁盘和从磁盘读取数据的速度明显慢于内存。将数据溢出到磁盘还会产生序列化和反序列化数据的开销,给 CPU 和磁盘 I/O 带来额外的负载。

秘诀 #2:序列化与数据结构

作为一个分布式执行引擎,Spark 经常在多台机器之间移动数据。由于大数据量在网络中传输需要更长的时间,这会导致更高的计算成本。数据存储在内存中以便快速计算,但每台机器上的内存既昂贵又有限。考虑到通过网络移动数据和将数据存储在内存中的成本,尽可能减少数据占用空间非常重要。

选择合适的序列化方式

缩小数据量的一种方法是序列化。Spark 中的序列化将数据转换为字节流,以便在网络中传输并存储在内存缓冲区中,然后在计算时再进行反序列化。默认情况下,Spark 使用 Java 序列化在节点之间移动数据。

一个常见的优化方法是切换到 Kryo 序列化,它比 Java 序列化快达 10 倍,并且占用空间更小。一个关键的权衡是,Kryo 需要你注册类(与 Java 序列化不同),这会增加配置开销,并且可能会导致向后兼容性问题。要在 Spark 中启用 Kryo 序列化,请设置以下配置:

代码语言:javascript代码运行次数:0运行复制
spark.serializer=org.apache.spark.serializer.KryoSerializer

你也可以探索更新的序列化框架,如 Apache Fury(孵化中),它旨在提供进一步的性能提升。

优化数据结构

最小化数据占用空间的一个重要方法是选择内存开销较小且对象较少的数据结构。遵循以下提示来优化数据结构:

  • 使用基本数据类型代替标准的 Java 和 Scala 类(如 HashMap),这些类会因元数据和对象引用而增加内存开销。
  • 扁平化嵌套结构以避免深度嵌套,深度嵌套可能包含许多小对象和指针。
  • 对于固定值集,使用枚举代替字符串;对于标识符,使用数值代替。

除了减少内存占用外,优化数据结构还可以显著降低垃圾回收的成本(我们将在下一个秘诀中介绍)。

秘诀 #3:垃圾回收

Spark 集群中的每个节点都运行一个 Java 虚拟机,因此使用垃圾回收来清理内存中未使用的对象。GC 对于释放内存和防止内存泄漏至关重要,但如果没有进行适当的调优,可能会导致 Spark 作业延迟或挂起。

选择合适的 GC 算法

常见算法的比较:

GC 算法

工作原理

适用场景

缺点

Garbage First(G1GC)

将堆划分为多个区域,并优先回收可回收内存最多的区域

大多数工作负载;平衡吞吐量和可预测的停顿时间

对于非常大的堆(> 32GB)需要进行调优

Parallel GC

高吞吐量、多线程的 GC;专注于最小化相对于应用程序执行时间的 GC 开销

批量处理较重的工作负载

大堆的停顿时间较长

Concurrent Mark - Sweep(CMS)

并发收集垃圾以减少停顿时间

低延迟工作负载

已弃用;存在内存碎片问题

ParNew GC

多线程的新生代收集器,通常与 CMS 配合使用以实现低停顿

需要较短新生代 GC 时间的低延迟工作负载

可能导致频繁的全量收集;依赖于已弃用的 CMS

如果你需要超低的 GC 停顿时间(如高频交易场景),值得探索其他专门的 GC 算法,如 Shenandoah 和 ZGC。

案例研究:从 G1GC 切换到 ParNew GC

一个大量短生命周期对象的工作负载使用 G1GC,在堆上平衡了吞吐量和停顿时间。然而导致了较高的 GC 延迟和增加的任务执行时间。切换到 ParNew GC 能够以更低的停顿时间和更低的延迟优化新生代收集。虽然 ParNew GC 可能会增加全量收集的频率,但在这种情况下,新生代 GC 性能的提升超过了额外的开销,从而带来了更好的整体性能。

GC 调优是一个迭代的过程,很大程度上取决于工作负载的特性。重要的是使用 Spark UI 和 JVM 指标来监控性能,以便随着时间的推移优化配置。

秘诀 #4:并行性和分区

Spark 的强大功能之一是能够并行处理数据。为了实现这一点,Spark 将数据划分为更小的块,使每个块能够适应较小的机器,并使这些块能够并发处理。高效分区的关键是防止数据倾斜。当 Spark 分区倾斜时,可能遇到 OOM 错误和小文件问题,这会影响性能。

.coalesce().repartition() 方法是优化和平衡分区的有效工具,但工作方式略有不同:

  • 使用 coalesce() 减少分区数量而不进行shuffle。这种方法成本较低,在数据已经在分区之间均匀分布时很有用。这样作业可以更高效地运行,而无需承担读取大量分区的开销。
  • 使用 repartition() 通过shuffle将数据均匀分布到各个分区。这种方法成本较高,但可以大大提高不平衡分区的性能。它可用于增加或减少分区数量。

另一个有效的工具是动态分区剪枝,它根据运行时过滤器修剪不必要的分区,以减少磁盘读取并提高查询性能。在查询执行期间,Spark 会识别相关的分区并避免读取不必要的分区,从而减少磁盘 I/O 并提高性能。

使用以下 Spark 配置启用动态分区剪枝:

代码语言:javascript代码运行次数:0运行复制
spark.sql.dynamicPartitionPruning.enabled=true
秘诀 #5:优化连接操作

在 Spark 中,表连接是最常见但计算量最大的操作之一。优化连接操作可以对整体性能和计算成本产生巨大影响。

利用自适应查询执行(AQE)

Spark 为连接操作提供的最大优化是自适应查询执行(AQE)。AQE 会自动收集统计信息,并在运行时使用这些信息来调整查询执行计划,以实现最佳性能。使用 spark.sql.adaptive.enabled=true 启用 AQE。AQE 为连接操作提供以下优化:

  • 广播连接:广播连接将 DataFrame 分发到每个执行器,有助于避免数据shuffle并加快执行速度。当连接一个大的 DataFrame 和一个小的 DataFrame 时,这种方法特别有效。如果 AQE 检测到连接的一侧足够小,可以放入内存,它会自动将连接转换为广播连接。使用以下配置启用:
代码语言:javascript代码运行次数:0运行复制
spark.sql.adaptive.localShuffleReader.enabled=true
# 根据执行器内存设置转换阈值
spark.sql.adaptive.maxBroadcastJoinThreshold=<size>
  • 倾斜连接处理:当少数键在分区中占主导地位时,数据倾斜会导致性能瓶颈。AQE 通过更均匀地重新分配倾斜的键或避免对这些键进行shuffle来解决倾斜问题。使用 spark.sql.adaptive.skewJoin.enabled=true 启用此功能。

你还可以利用 AQE 进行连接操作之外的优化,如自适应分区和去除无用的排序。更多信息请参阅官方 AQE 文档。除了 AQE,遵循以下最佳实践来优化连接操作:

  • 将最大的 DataFrame 放在连接操作的左侧。这有助于减少shuffle操作并提高效率。
  • 在连接 DataFrame 时,使用相同的分区器(即使用相同的键进行重新分区),以避免重新shuffle数据。
代码语言:javascript代码运行次数:0运行复制
df1 = df1.repartition("join_key")
df2 = df2.repartition("join_key")
joined_df = df1.join(df2, "join_key")
  • 将频繁使用的数据集缓存到内存或磁盘中,以避免在连接操作期间重新计算:
    • 对 DataFrame 使用 .cache() 方法缓存适合内存的数据。
    • 对于需要持久化到磁盘的较大数据集,使用 .persist(StorageLevel.DISK_ONLY)

应用这些优化后,使用 Spark UI 验证其影响非常重要。

【额外福利】秘诀 #6:动态分配

Spark 中的动态分配功能可以根据运行时需求动态扩展资源,有效处理不可预测的工作负载。虽然它可以优化资源利用率,但对于静态和可预测的工作负载来说,可能不值得引入这种复杂性。

为了处理不断变化的工作负载,动态分配在运行时分配资源,以便根据实际工作负载更好地利用资源。动态分配可以实时添加和移除执行器节点,以有效处理工作负载的高峰和低谷。缩容依赖于多个配置的协同工作,为了实现最佳效果,需要对这些配置进行调优。以下是关键配置:

  • spark.dynamicAllocation.enabled=true:启用动态分配,默认情况下该功能是禁用的。
  • spark.dynamicAllocation.executorIdleTimeout:控制空闲执行器在被移除之前可以保持空闲的时间。如果执行器被移除过于频繁,导致低效的重新计算,可以增加这个值。
  • spark.dynamicAllocation.cachedIdleTimeout:控制带有缓存 RDD 块的执行器在被释放之前可以保持空闲的时间。如果你的工作负载大量使用缓存的 RDD,可以将此值设置得更高,以防止过早释放并避免不必要地重新计算缓存数据。
  • spark.dynamicAllocation.shuffleActiveTimeout:定义没有缓存数据或活动任务的空闲执行器在被移除之前可以保持空闲的时间。如果你的工作负载涉及长时间的shuffle阶段或中间操作,可以增加此值,以确保执行器不会过早被释放。
  • spark.decommission.enabled=true:通过将缓存的 RDD 和shuffle数据迁移到另一个活动执行器,优雅地关闭执行器。这有助于避免重新计算。请注意,这是 Spark 的一个较新的功能(从 3.1 版本开始可用),可能存在一些小问题。

通过正确调整这些动态分配配置,你可以显著提高性能,并为不可预测的工作负载优化资源利用率。

总结

运行 Spark 可能具有挑战性,因为工作负载可能会受到多种因素的限制,包括计算资源、内存、I/O 甚至网络带宽。上述秘诀为调试和优化 Spark 作业提供了坚实的基础。在此基础上,你应该不断进行实验,为每个工作负载找到最佳的配置组合。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。原始发表:2025-04-03,如有侵权请联系 cloudcommunity@tencent 删除数据性能apachespark内存

发布者:admin,转转请注明出处:http://www.yc00.com/web/1747981344a4714528.html

相关推荐

  • 实战经验:Apache Spark性能调优的五大秘诀

    Apache Spark 于 2013 年开源,至今仍是最受欢迎且功能强大的计算引擎之一。然而,使用Spark也面临着挑战,诸如缩容、数据倾斜和内存溢出等问题。本文基于大规模Spark任务积累的经验,介绍一些技巧以规避可能遇到的常见陷阱,推

    5小时前
    10

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信