天问

Spark调优经验总结

本文以Spark实践经验和Spark原理为依据,总结了Spark性能调优的一些方法。这些总结基于Spark-1.0.0版本。对于最近推出的Spark-1.1.0版本,本文介绍了几个版本增强。

 

Spark性能调优

 

Executor和分区

 

Executor是一个独立的JVM进程,每个任务会有独立的线程来执行,Executor最大可并发任务数量与其拥有的核心数量相同,执行过程中的数据缓存放在Executor的全局空间中。根据以上我们可以得出:

 

  1. 同一个Executor中执行的任务,可以共享同一个数据缓存。这也是Spark称之为Process local级别的数据本地性。
  2. Executor可并发执行的任务数量,与其所拥有的核心数相同。
  3. 并发任务之间可能会产生相互干扰,如有些任务占用内存较大会导致其他并发任务失败。
  4. Executor都需要注册到Driver上并与其通信,过多的Executor数量会增加Driver负担。

 

在阶段划分为任务时,会得到与分区数相同的任务数量。减少分区的数量将减少任务数,同时每个任务所处理的计算量会增大。考虑到任务本身的序列化,发送,运行环境准备,结果收集都需要占用Driver资源和Executor资源,减少任务数能够减少此类开销。

 

在实践中,每个Executor可以配置多个核心,从而降低Executor数量,还可以得到更好的数据本地性。根据所配置的核心数量与分区数据量,可以估计出Executor所需最小内存 = 并发任务数 * 单分区大小 + 内存缓存分区数 * 单分区大小。分区数的配置与具体业务逻辑相关,为了将计算资源充分利用,可以参考:分区数 * 并发Job数 >= Executor数 * Executor核心数。其中并发Job数是RDD在调用动作(action)类型的操作时产生的Job,Job之间的阶段是没有依赖关系的因此可并发执行。

 

 

缓存和数据本地性

 

RDD的persist函数用于在计算中避免重复计算。如果有一个RDD会被不同的Job调用,那一定要用persist进行缓存,避免这个RDD的重复计算。如果是缓存到内存中的话,计算完成后一定要记得调用unpersist释放缓存,以免造成内存无法回收。

 

调用了RDD的persist后,将会在RDD第一次被计算时将其计算结果写入缓存,缓存级别由Storage_Level决定,缓存的写入是通过BlockManager进行的,缓存信息也会同步到BlockManagerMaster中。在RDD被再次计算时,将首先到BlockManagerMaster中检查是否有缓存,如果缓存在其他BlockManager中则先传输到本地再使用。

 

任务的数据本地性与缓存位置关系紧密,在任务被创建时会确定其PreferredLocs,在TaskSetManager中再根据PreferredLocs确定任务的数据本地性。在计算PreferredLocs时,首先会将Task所在分区的缓存位置作为优先位置。若无缓存则将RDD指定的PreferredLocs作为优先位置,这类RDD一般为数据源类型的RDD。若以上都没有,则将RDD的窄依赖中的第一个依赖的第一个分区所在位置作为优先位置。

 

SPARK的数据本地性分四个级别:PROCESS_LOCAL:同一Executor, NODE_LOCAL:同一机器, RACK_LOCAL:同一机架, ANY:其他。在实践中,一些分区的体积较大,如果产生了Node或Rack级别的任务,则缓存中的分区数据要在Executor之间传输,这种传输过程不仅占用网络带宽,而且有可能把新Executor的内存占满,导致OOM。最后的结果就是传输的时间较长,还有可能导致Executor崩溃。因此在分区数据量较大时并不建议降低数据本地性。如果Driver的日志中出现了较多的NODE_LOCAL或RACK_LOCAL,同时伴随计算性能下降,那么可以尝试可以将数据本地性的降级等待时间增大,甚至增大到只用PROCESS_LOCAL。

 

Spark配置项

 

RDD的PreferredLocations

 

RDD中有个可选实现的方法是getPreferredLocations(split: Partition): Seq[String],这个方法用来返回指定分区的优先加载位置,一般指的是离分区的数据源最近的位置。例如以HDFS为数据源的话,则可设置为HDFS文件分区所在的节点。我们在实现自己的RDD时,若有可能最好实现此方法,以保证任务的数据本地性。

 

TaskSet调度模式

 

在TaskSet级别Spark提供了FIFO和FAIR两种调度模式,FIFO模式根据Job的先后顺序和Stage的先后顺序选择TaskSet提交。FAIR模式可以配置多个Pool,每个Pool有自己的weight和minShare,其中weight是Pool的优先级,minShare保证Pool最少有几个核心可用。实践中FIFO模式在多个Job并发的情况下会导致有些Job等待时间过长,而FAIR模式表现良好。

 

Spark配置项

 

Master模式选择

 

在Spark提供的spark-submit作业提交脚本中,如果使用yarn作为资源管理器,则可以使用yarn-client和yarn-cluster两种Master模式。其中yarn-client以应用本身为Driver,yarn-cluster会将Driver提交到yarn集群的其中一个节点上运行。由于Driver与Executor之间有频繁的通信,因此Driver的位置最好在集群内。也就是说如果使用yarn-client的话,要能保证应用在集群内,而yarn-cluster模式本身就保证了Driver在集群内。

 

有些应用的使用场景需要在一个SparkContext上持续的提交计算任务,这个SparkContext的生命周期可能会非常长。在这种情况下使用yarn-cluster模式不能实现持续提交计算任务,而且应用与Driver的交互较困难。这种情况便适用于采用 yarn-client模式。

 

Spark配置项

 

SparkContext的重用

 

有些场景需要一个SparkContext持续接收计算任务,这种场景往往对计算任务的时效性要求较高(秒级别),并且可能会有并发的计算任务(如多用户提交任务)。这种场景适合采用yarn-client模式,让Driver位于应用内部,应用可以不断向Driver提交计算任务,并处理返回结果。这种模式的潜在风险在于Driver和Executor都会长时间持续运行,可能会有内存泄露的问题。

 

在实践中,在RDD被persist缓存到内存后,调用unpersist并不能立即释放内存,而是会等待垃圾回收器对其进行回收。在垃圾回收器的选择上,建议使用CMS类型的垃圾回收器,用于避免垃圾回收过程中的顿卡现象。

 

在Driver和Executor的垃圾回收不出问题的情况下,还是可以得到稳定的计算任务性能的。但如果某些情况下计算性能还是随时间推移而下降,则可以重启SparkContext以解决问题。因为重启SparkContext后Driver和Executor都会全新创建,因此能回到最初的性能。重启的方法是在当前所有任务都完成后,在应用中调用SparkContext.stop()方法,并移除SparkContext引用,然后创建新的SparkContext。

 

Driver在启动时需要将Spark的Jar包上传到集群,用于启动每个Executor。这个jar包的大小约130M。Executor在接收任务时,会将任务所依赖的文件、Jar包传输到本地,这里的jar包是应用包,一般包含了应用的各类依赖一般也得100M,Jar包分发的耗时在10秒左右。在对计算任务时效性要求较高的场景,Jar包分发的10秒将是无法接受的。在这里可以采用预先分发的方式解决此问题。我们首先将Spark Jar和应用Jar上传到各个节点的某个相同位置,例如/root/sparkjar。

 

避免Driver启动时分发Jar包:

 

  • 将Driver机上的SPARK_JAR环境变量设置为空,避免Jar包上传动作。
  • 在yarn-site.xml配置文件中,设置yarn.application.classpath为spark jar的位置与此项默认值。

 

避免Task启动时分发依赖和Jar包:

 

  • 将spark.files和spark.jars中的路径配置为local:/root/sparkjar的模式,从而让Executor从本地复制。

 

序列化方式

 

Spark中的任务传输,任务结果收集,Shuffle过程,磁盘级缓存,广播变量的传输等过程均会用到序列化方法。这里有个规律是凡是涉及到网络传输和磁盘缓存的操作,均是先序列化写入,再读出后反序列化的。Spark提供了两种序列化方法即JavaSerializer(默认)和KryoSerializer。根据Spark的官方说法,KryoSerializer性能相对好(10倍于JavaSerializer),但对部分Serializable类型的类不支持,对于不支持的类需要自己写序列化的实现。

 

Spark配置项

 

Spark-1.1.0的优化项

 

Sort-based Shuffle

 

Spark1.0.0提供的Shuffle过程中,如果有C个核心,M个Mapper,有R个Reducer,则每个Mapper将产生R个文件,总计M * R个文件。如果使用Shuffle Files Consolidate的话会好一些,会产生总计min(C, M) * R个文件。现在假设R的数量比较大(如1万个),则每个Mapper将要把输入写入1万个文件中,过程中将会导致较高的磁盘IO,在写入过程中的数据压缩、序列化过程也会占用大量内存。

 

Sort-based shuffle使每个Mapper的输出只写入到一个文件中,这个文件中的记录是排序过的,因此Reducer可以根据记录的起止范围进行读取。为此Sort-based shuffle在写入文件时将会创建一个索引文件,用于记录每个分区的起止范围。

 

Spark配置项

 

TorrentBroadcast

 

Spark-1.1.0之前的广播变量工厂使用的是HttpBroadcastFactory,即所有Executor都从Driver获取广播变量的值。Spark-1.1.0加入了TorrentBroadcastFactory并将其设置为默认,这种广播变量的模式类似于BT下载。Driver会把变量分为数据块存放,Executor取值时也会按数据块提取并合并,但取值的位置会随机的从拥有数据块的Executor上提取,从而形成了BT下载的模式,可大大减轻Driver的负载。

 

Spark配置项

 

Task序列化的优化

 

Spark-1.1.0的任务提交处有个细微的优化,即把任务分为公共部分(RDD,依赖关系,分区处理函数)和非公共部分(StageId,分区ID,优先位置)。任务的公共部分序列化后作为广播变量传输到Executor,非公共部分序列化后分别传输。从而避免了公共部分在同一Executor上的多次传输。参见DAGScheduler. submitMissingTasks()。

 

致谢

 

感谢淘宝技术部-数据挖掘与计算团队提供的优化建议。感谢ODPS团队提供的优化建议。

 

-END-

 

 

 

 

博客地址:http://blog.yoqi.me/?p=766
扫我捐助哦
喜欢 0

这篇文章还没有评论

发表评论