跳转至

7 调优

参考官方文档

算子的合理选择

groupByKey/reduceByKey

尽量不使用groupByKey,可以用reduceByKey代替。以WordCount为例,下面是分别用groupByKeyreduceByKey求解的代码和图示。使用GroupByKey会增加数据传输。

reduce_by_key_reduce_by_key

sc.textFile("hdfs://centos2:9000/test/hive_exercise.json")
  .flatMap(_.split("[\t:{}\"$,]"))
  .map(word => (word, 1))
  .reduceByKey((x, y) => x + y)
  .collect()
  .foreach(println)

sc.textFile("hdfs://centos2:9000/test/hive_exercise.json")
  .flatMap(_.split("[\t:{}\"$,]"))
  .map(word => (word, 1))
  .groupByKey()
  .mapValues(x => x.sum)
  .collect()
  .foreach(println)

序列化格式

当Spark需要通过网络传输数据,或是将数据溢写到磁盘上时,Spark需要序列化。默认情况下, Spark会使用Java内建的序列化库(ObjectOutputStream)。为了获取更好的性能,建议使用Kryo序列化,它比Java序列化更快,更紧凑(10x)。但是并不支持所有Serializable类型,并且需要该向Kryo注册你想要序列化的类。

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

优化配置参数

Spark core优化配置参数

应用属性 描述
spark.driver.cores 在集群模式下管理资源时,用于driver程序的CPU内核数量。默认为1。在生产环境的硬件上,这个值可能最少要上调到8或16。
spark.driver.maxResultSize 如果应用频繁用此driver程序,建议对这个值的设置高于其默认值“1g”。0表示没有限制。这个值反映了Spark action的全部分区中最大的结果集的大小。
spark.driver.memony driver进程使用的总内存数。和内核数一样,建议根据你的应用及硬件情况,把这个值设置为“16g”或“32g”。默认”1g”。
spark.executor.memory 每个executor进程所使用的内存数。默认值为“1g”。根据集群的硬件情况,应该把这个值设置为“4g”、“8g”、“16g”或者更高。
spark.local.dir 这是Spark原生写本地文件,包括在磁盘上存储RDD的地方。默认是/tmp。强烈推荐把它设置在快速存储(首选SSD)上,分配大量的空间。在序列化对象到磁盘时,就会落入这个位置,如果这个路径下没有空间,将会出现不确定的行为

运行时环境参数

应用属性 描述
spark.executor.logs.rolling.* 有四个属性用于设定及维护spark日志的滚动。当spark应用长周期运行时(超过24小时),应该设置这个属性
spark.python.worker.memory 如果使用python开发spark应用,这个属性将为每个python worker进程分配总内存数。默认值512m
shuffle行为参数
应用参数 描述
spark.shuffle.manager 这是Spark里shuffle数据的实现方法,默认为“sort”。这里提到它是因为,如果应用使用Tungsten,应该把这个属性值设置为“Tungsten-sort”。
spark.shuffle.memoryFraction 这个是shuffle操作溢写到磁盘时java堆占总内存的比例。默认值为0.2。如果该应用经常溢出,或者多个shuffle操作同时发生,应该把这个值设置的更高。通常。可以从0.2升到0.4,然后再到0.6,确保一切保持稳定。这个值建议不要超过0.6,否则它可能会碍事,与其他对象竞争堆空间
spark.shuffle.service.enabled 在Spark内开启外部的shuffle服务。如果需要调度动态分配,就必须设置这个属性。默认为false

压缩及序列化参数

应用属性 描述
spark.kryo.classToRegister 当使用Kryo做对象序列化时,需要注册这些类。对于Kryo将序列化的应用所用的所有自定义对象,必须设置这些属性
spark.kryo.register 当自定义类需要扩展“KryoRegister”类接口时,用它代替上面的属性。
spark.rdd.compress 设置是否应当压缩序列化的RDD,默认false。但是和前面说过的一样,如果硬件充足,应该启用这个功能,因为这时的CPU性能损失可以忽略不计
spark.serializer 如果设置这个值,将使用Kryo序列化,而不是使用java的默认序列化方法。强烈推荐配置成Kryo序列化,因为这样可以获得最佳的性能,并改善程序的稳定性

执行行为参数

应用属性 描述
spark.cleaner.ttl Spark 记录任何对象的元数据的持续时间(按照秒来计算)。默认值设为“infinite”(无限),对长时间运行的job来说,可能会造成内存泄漏。适当地进行调整,最好先设置为3600,然后监控性能。
spark.executor.cores 每个executor的CPU核数。这个默认值基于选择的资源调度器。如果使用YARN或者Standalone集群模式,应该调整这个值

网络参数

应用属性 描述
spark.akka.frameSize Spark集群通信中最大消息的大小。当程序在带有上千个map及reduce任务的大数据集上运行时,应该把这个值从128调为512,或者更高。
spark.akka.threads 用于通信的Akka的线程数。对于运行多核的driver应用,推荐将这个属性值从4提高为16、32或更高调度参数
应用属性 描述
spark.cores.max 设置应用程序从集群所有节点请求的最大CPU内核数。如果程序在资源有限的环境下运行,应该把这个属性设置最大为集群中spark可用的CPU核数

数据倾斜

对于大数据来说,数据量大并不可怕,可怕的是数据倾斜。由于数据分布不均匀,造成数据大量的几种在某个点上,造成数据热点问题。

https://segmentfault.com/a/1190000021814886?utm_source=tag-newest#item-1 http://www.jasongj.com/spark/skew/