前两天又接了一个Spark任务,倒不复杂,依然是检索HDFS上的日志数据这样的事情。不过瞅着组内跑着十几二十个任务内存一共只有160来G的yarn集群,有些欲哭无泪。
事情还是要做的,反正执行时间要求不太严格,只能想办法尽量压缩内存的占用了。
先说下背景:现在使用的yarn集群由8个容器组成,每个容器的内存大概20G;工作内容是检索数据,源数据大概1T左右,取出来的目标结果数据在2~8G这样子。
最开始的时候查询任务是直接使用sparkSql来完成。随着数据量的上升很快就遇到了最经典的两个问题:StackOverflowError和OutOfMemoryError。
对于栈溢出,之前设计了几个解决方案,在历史文章里面有记录《Spark StackOverflowError》。其中我使用了任务内多批次执行的方案。现在想来,这其实并不是最好的解决方案:问题在于分批越多,每批任务中的action算子就会导致任务的执行时间越长,远不如直接增加栈空间来得简单直接。不过也算是错有错着,这反倒为后来的优化打下了基础。
至于堆内存溢出,主要发生在将每个partition的数据合并压缩的阶段:.repartition(1).saveAsTextFile(pathSave, classOf[GzipCodec])
。因为这个操作可能会发生在每个Executor上,所以只好通过简单的增加Executor的内存来解决问题。因为内存总量有限,单个Executor的内存调大了,就只能将task的并发度调小。这样在更严重的问题暴露之前,一直尝试解决的问题就是如何在并发度和内存占用之间取得平衡。
更严重的问题出现在这次的需求上:很简单,要导出的结果数据集变得非常大了,一般都会大于8G,此时堆内存溢出频繁出现。应对方案如下:取消压缩操作、增大Executor执行内存,将Executor的数量调整为2,每个Executor的task数目调整为1。这样Spark任务可以正常执行了,但是因为并行度太小的缘故,执行时间巨长——动辄跑上十来个小时。优化执行速度又提到了时间表上。
是一次执行错误给了优化的方向。现在任务的执行步骤为:
1 2 3 4 5 |
1. 搜集目标数据路径 -> 2. 将目标数据路径分片(720个路径一组) -> 3. 分别为每组路径执行查询并生成中间结果集文件 -> 4. 合并中间文件,生成最终结果 -> 5. 清理中间文件 -> 结束 |
某次任务执行到第4步的时候报错了,考虑到耗时的问题,就重新写了一段代码来完成4和5两步的操作。此时想到这个任务在不同的阶段对资源的需求是不一样的:
- 在执行1~3这几个步骤的时候对内存的需求没那么强,但是如果稍稍增加些并行度就能极大地提升任务的执行效率;
- 第4步则是典型的吃内存的操作,此时并行度为1,但是内存需要足够大才能保证任务顺利完成。
此时方案已经很清晰了:将一个任务拆成两个,一个负责搜集数据,一个负责合并生成的中间数据,在执行的时候按不同的策略分配资源。
至此,当前的任务优化已完成。
再扯些没用的。
- 最后的优化方案实际上非常简单,以至于我很奇怪为什么一开始没想到。并且这种方案是在Hadoop的计算实践中是最常用的操作。唉,也许是灯下黑吧。
- 也许直接使用Hadoop会是一个更好地选择。因为瓶颈主要出现在内存上,Hadoop对内存资源的占用会少很多。
- 如果能不走yarn,直接使用java操作,那么尽量不走yarn好了,虽然复杂度会提升很多,但是执行效率是有保证的。
另外,同事曾经问过我一个问题:如果减少了executor的数目,那么每个executor要处理的数据不就变多了,这样也会造成内存压力。听到这个问题时,我的第一印象是“对啊,之前怎么没有考虑到这一点啊”。后来仔细思索了一段时间,想明白了关键:这个问题的前半句是对的,数据总量固定,并行度降低,单个executor要处理的数据量必然会增加;但是后半句是错的,内存中的数据量取决于partition的数量,在配置中则是和task数量相关。
记录一组spark任务提交参数,留着以后参考:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
spark-submit --class com.zhyea.MyJobDriver \ --master yarn-cluster \ --name my-job \ --num-executors 8 \ --driver-memory 4g \ --executor-memory 6g \ --executor-cores 3 \ --conf spark.driver.extraJavaOptions="-verbose:gc -Xss256M -XX:+UseG1GC -XX:-UseGCOverheadLimit -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution -Dfile.encoding=UTF-8" \ --conf spark.executor.extraJavaOptions=-verbose:gc -Xss80M -XX:+UseG1GC -XX:-UseGCOverheadLimit -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -XX:NewRatio=1 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution -Dfile.encoding=UTF-8" --conf spark.driver.userClassPathFirst=true \ --conf spark.executor.userClassPathFirst=true \ --conf spark.network.timeout=600 \ --conf spark.executor.heartbeatInterval=540 \ --jars ${jars} job.jar "$1" "$2" |
就这些了。
发表评论