这一节将主要介绍资源调优,或者说是如何充分利用集群资源。然后再说一下如何对并发度进行优化,这是job性能参数中最难也是最重要的部分。最后我们将了解一些数据自身的表现形式:Spark读取时数据在磁盘上的保存形式(如Apache Avro和 Apache Parquet),以及缓存时或系统内移动时数据在内存中的形式。
优化资源分配
在Spark的用户反馈中经常提到一类问题如“我有一个500个节点的集群,但是在我运行一个应用时,我看到只有两个Task在执行。求帮助!”。考虑到Spark资源控制参数的数量,这种问题不是不可能存在的。在这一节里,我们将学习如何压榨出集群的最后一点资源。根据资群资源管理器的不同(Yarn、Mesos和Spark Standalone),相应的建议和配置也有所不同。不过我们将主要集中在Yarn上——这也是Cloudera推荐使用的资源管理器。
要了解一些Spark on YARN的背景知识,可以先看一下这篇文章。
目前Spark(和Yarn)主要关注的资源只有两个:CPU和内存。当然了,硬盘和网络IO对于Spark的性能也很关键,但是不管Spark还是Yarn,目前都还不能有效地对其进行管理。
在一个Spark应用中,每个Executor都有固定数目的core以及固定大小的堆内存。core的数量可以通过spark-submit提交任务时的参数“–executor-cores”来进行指定,也可以在spark-default.conf配置文件中或SparkConf对象中对“spark.executor.cores”属性进行配置。类似的,堆内存的大小可以通过“—executor-memory”或“spark.executor.memory”来进行设置。cores属性决定了一个executor可以并发执行的task的数量。如“–executor-cores 5”则意味着一个executor在同一时间内最多可以同时执行5个task。内存大小决定了Spark可以缓存的数据的总量以及在group、aggregate或join时可以shuffle地最大数据总量。
“–num-executors”命令参数或“spark.executor.instances”属性则决定了job可以申请的executor总数。从CDH 5.4/Spark 1.3开始,就可以不必设置这个值了。更好的方法是通过“spark.dynamicAllocation.enabled”开启动态分配。通过动态分配,spark应用可以在大量待执行的task积压时申请更多的executor资源,并在空闲时释放executor资源。
同时了解Spark请求的资源是如何与yarn可用资源相配合的也是很重要的。相关的yarn参数包括:
yarn.nodemanager.resource.memory-mb:控制了集群每个节点上所有的container可以使用的最大内存总量。
yarn.nodemanager.resource.cpu-vcores:控制了集群每个节点上所有的container可以使用的最大core总量
在提交任务时请求5个executor core就等于向yarn集群请求5个虚拟core。但是向yarn请求内存则有点复杂,情况大致如下:
1. “–executor-memory/spark.executor.memory”控制着executor需要使用的堆内存大小,但是spark也会用到一些堆外内存,比如interned String或者direct byte buffer。“spark.yarn.executor.memoryOverhead”控制着这部分内存,它和其它属性一起决定了每个executor向yarn请求的内存总量。这个属性的默认值是“max(384, 0.07 * spark.executor.memory)”
2. Yarn提供的内存可能会比请求的多一点。Yarn的配置参数“yarn.scheduler.minimum-allocation-mb”和“yarn.scheduler.increment-allocation-mb”控制着分配内存的最小值和增加量。
下图展示了spark和yarn的内存相关属性及配置的层级结构:
对于如何分配executor,如觉得以上内容还不够,那么还可以关注下如下的几个点:
1. application master,是一个非executor container,有向yarn请求container的特殊能力,它本身所占用的资源也需要被考虑进来。在“yarn-client”模式下,它默认会占用1024M内存以及一个vcore。在“yarn-cluster”模式下,application master会运行driver,所以通过“–driver-memory”和“–driver-cores”来配置资源通常是有效的。
2. 给executor分配太多的内存会导致过长的GC延迟。64GB是一个稍显粗略的内存上限建议。
3. 我们已经注意到HDFS客户端在并发线程过多时会存在一些问题,大致每个executor分配5个task就可能耗尽写入带宽,所以给executor分配的core的数目最好低于5。
4. 运行“小型”executor(例如只有一个core且内存仅够运行一个task的executor)实际上是放弃了在一个JVM中并发执行多个Task的优势。比如broadcast变量需要在每个executor上复制一份copy,太多小型executor的话就会产生过多的这种变量copy。
为了能够更具体地描述以上的这些内容,这里以一个Spark应用示例,通过调整这个应用的配置,让其可以充分利用集群的资源:假设一个集群有6个节点在运行NodeManager,每个节点有16个core以及64GB内存。那么每个NodeManager的配置“yarn.nodemanager.resource.memory-mb”和“yarn.nodemanager.resource.cpu-vcores”可以分别设置为63 * 1024 = 64512(MB)和15。这里没有将全部的资源都分配给Yarn container,因为需要留一些资源给每个节点的操作系统以及hadoop后台进程。这里留了1G内存和1个core给这些系统进程。Cloudera Manager会自动计算这些内容并完成相关的Yarn配置。
根据上面说的集群配置,我们极有可能在提交任务时这样配置executor:“–num-executors 6 –executor-cores 15 –executor-memory 63G”。但是,这是一个错误的做法,因为:
- 6GB+的executor塞不进有63GB内存空间的NodeManager;
- application master会占用某个节点上的一个core,这意味着那个节点上将不会有足够的core可以分配给一个15-core的executor;
- 每个executor有15个core将有可能导致HDFS IO吞吐异常。
比较好的做法是这样进行配置:“–num-executors 17 –executor-cores 5 –executor-memory 19G”。为什么呢?
- 这样每个节点上将会有3个executor——有application master的节点除外,它上面只有两个executor;
- “–executor-memory”的值是这样算出来的:(63/3) = 21,3是每个节点的executor数量;21 * 0.07 = 1.47,前面提过0.07是堆外内存的比例;21-1.47≈19.
优化并发度
我们都知道Spark是一套并发处理引擎。不过也许我们并不清楚:Spark并不是一个特别“神奇”的并发处理引擎,能够无限制的并行处理业务;它的能力也有极限,我们需要找出最优的并发度。
Spark的每一个stage都有一定数量的task,每个task会串行地处理数据。在优化Spark Job并行度的时候,task数量也许是决定应用性能的唯一的也是最重要的参数。
那么这个数量是如何决定的呢?在之前的文章里有讲述过Spark是如何将RDD划分为不同的stage的。简单提示一下:一些transformation操作如repartition或reduceByKey造成了stage的边界。一个stage中task的个数与stage中最后一个RDD的partition数目相同。一个RDD中partition的数目与其所依赖的RDD的partition数目相同。不过有一些例外情况:coalesce transformation算子创建的RDD的partition总数可以少于其父RDD的partition总数;union transformation算子会创建一个RDD,其partition总数为所有父RDD的全部partitio数目之和;cartesian创建的RDD的partiton总数为所有父RDD的partiton数量的乘积。
如果一个RDD没有父RDD呢?依赖textFile或hadoopFile创建的RDD的partition数量取决于其所使用的MapReduce InputFormat实现。通常情况下一个HDFS block对应着一个partition。对于依赖parallelize方法创建的RDD,其partiton数量取决于用户在调用parallelize方法时传递的参数,如果用户没有传递参数那么会默认采用“spark.default.parallelism”的值。
要想知道一个RDD中partition的个数,可以通过调用rdd.partitions().size()获得。
这里需要关心的是task数量过小的情况。如果task的数量小于可用的slot总数,那么对应的stage将无法充分利用所有可用的CPU。
task数量过少也意味着在执行聚集操作时每个task都会有更大的内存压力。任何“join”、“group”或“*ByKey”操作会将对象保存在hashmap或内存buffer中,以便进行分组或排序。“join”、“group”或“groupByKey”在触发shuffle时会在fetch端使用这些数据结构。“aggregateByKey”、“reduceByKey”在触发shuffle时会在两端使用这些数据结构。
当聚集操作要获取的数据在内存中不容易放下的时候,就会暴露一些问题。第一,在内存中保留太多数据会造成GC压力,容易导致处理中断。第二,如果在内存中放不下这些数据,Spark将会尝试将之溢写到硬盘上,这会产生硬盘IO或排序。过大的shuffle导致的内存开销是我在所有的Cloudera用户中看到的造成Job执行过慢的首要原因。
那么该怎样增加partition的数量呢?如果当前stage是在从hadoop中读取数据,那么可用的做法有:
- 调用repartition方法,这会产生一次shuffle;
- 调整InputFormat实现,创建更多的partition;
- 向HDFS写数据的时候,使用更小的block size。
如果当前stage正在从其他stage接收数据。那么导致stage分界的transformation操作通常会接受一个numPartitions参数,比如:
1 |
val v rdd2 = rdd1.reduceByKey(_ + _, numPartitions = X) |
那么X设置为多少才比较合适呢?最直接的做法就是通过多次实验来决定:看一下父RDD的partition数量,然后每次按这个数量的1.5倍来设置X的值,多次重复,直到性能不再有明显提升。
也有一些计算这个X值的规则,但是不太好验证,因为有些参数不容易计算出来。这里也会介绍下这些规则,但是不建议在工作中使用,这里的介绍只为了能让大家对相关的内容有些深入的了解。使用这些规则的主要目标是在可以充分利用可用内存的前提下,创建足够多的task。
每个task的可用内存等于如下公式:(spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction)/spark.executor.cores。memory fraction和safty fraction的默认值分别为0.2和0.8.
内存中的shuffle数据的量不太容易确定。最可行的方案是找到stage运行时Shuffle Spill(Memory)和Shuffle Spill(Disk)的比例。然后用这个比例乘以shuffle写数据的总量。不过在这个stage执行reduce操作时情况会有点儿复杂:
此时可以把partition的数量稍稍调大一点儿——一般来说,partition的数量通常是多点儿比少点儿好。
实际上,在设置partition的数量如果有些犹豫不决的话,可以尝试将partiton的数量设置地大一些。这和在编写MapReduce程序时有些不同,MR对task的总数通常比较保守。这是因为MapReduce启动task的开销通常比较大,而Spark则比较小。
压缩数据结构
Spark上的数据流由record的形式构成。record通常有两种表现形式:一种表现形式是反序列化后的java对象;一种是序列化后的二进制数组。通常在内存中Spark会使用反序列化后的record,而在硬盘或网络传输时会使用序列化后的record。也有一些工作在尝试将shuffle数据以序列化后的形式保存在内存中。
“spark.serializer”这个属性决定了使用哪种序列化方案使record在这两种表现形式间转换。Kryo serializer,即org.apache.spark.serializer.KryoSerializer,是推荐的选项。但可惜的是它并不是默认的选项。因为在Spark的早期版本中KryoSerializer有些不稳定,而现在Spark又不想打破版本间的兼容性。但是不管如何,Kryo都应该是优先使用的serializer。
record在这两种表现形式间的转换对于Spark的性能有着巨大的影响。因此很有必要检查一下应用运行时在各处使用的数据类型,并尝试挤出一些水分。
数据序列化后的体量过大可能会导致更加频繁的硬盘和网络IO,同时也会减少Spark可以缓存(这里是指在MEMORY_SER storage level下)的record的总量。这里主要的解决方案是将所有的用户自定义类都通过SparkConf#registerKryoClasses API来注册和传递。
数据格式
当你有权利决定数据以何种格式在硬盘上存储的时候,应选择一种可扩展的二进制形式,如Avro, Parquet, Thrift, 或Protobuf。也许应该表述地更清楚一点:这里说的在hadoop上使用Avro, Thrift, 或Protobuf保存数据时,是说将数据以Avro, Thrift, 或Protobuf的结构保存在SequenceFile中。
应当尽量避免使用JSON来存储数据。每当你打算将大量数据保存在JSON,可以想想将要浪费的资源,想想中东发生的战争,想想全球变暖,想象碳排放,这些都只是为了让CPU多转几圈来一遍一遍又一遍地解析你的文件。所以,最好再多学点儿沟通技巧,让你的同事或领导也确信这一点。
其它
本文译自:How-to: Tune Your Apache Spark Jobs (Part 2)
补充
这两篇文最初是在StackOverflow上看到的,感觉有用就尝试翻译了一下。后来又在朋友圈看到其他人的翻译,觉得信达雅各方面远胜我的工作,所以在这里特地附上链接:
########
发表评论