7 调优
参考官方文档
算子的合理选择¶
groupByKey/reduceByKey¶
尽量不使用groupByKey
,可以用reduceByKey
代替。以WordCount为例,下面是分别用groupByKey
和reduceByKey
求解的代码和图示。使用GroupByKey会增加数据传输。
sc.textFile("hdfs://centos2:9000/test/hive_exercise.json")
.flatMap(_.split("[\t:{}\"$,]"))
.map(word => (word, 1))
.reduceByKey((x, y) => x + y)
.collect()
.foreach(println)
sc.textFile("hdfs://centos2:9000/test/hive_exercise.json")
.flatMap(_.split("[\t:{}\"$,]"))
.map(word => (word, 1))
.groupByKey()
.mapValues(x => x.sum)
.collect()
.foreach(println)
序列化格式¶
当Spark需要通过网络传输数据,或是将数据溢写到磁盘上时,Spark需要序列化。默认情况下, Spark会使用Java内建的序列化库(ObjectOutputStream
)。为了获取更好的性能,建议使用Kryo序列化,它比Java序列化更快,更紧凑(10x)。但是并不支持所有Serializable
类型,并且需要该向Kryo注册你想要序列化的类。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
优化配置参数¶
Spark core优化配置参数
应用属性 | 描述 |
---|---|
spark.driver.cores | 在集群模式下管理资源时,用于driver程序的CPU内核数量。默认为1。在生产环境的硬件上,这个值可能最少要上调到8或16。 |
spark.driver.maxResultSize | 如果应用频繁用此driver程序,建议对这个值的设置高于其默认值“1g”。0表示没有限制。这个值反映了Spark action的全部分区中最大的结果集的大小。 |
spark.driver.memony | driver进程使用的总内存数。和内核数一样,建议根据你的应用及硬件情况,把这个值设置为“16g”或“32g”。默认”1g”。 |
spark.executor.memory | 每个executor进程所使用的内存数。默认值为“1g”。根据集群的硬件情况,应该把这个值设置为“4g”、“8g”、“16g”或者更高。 |
spark.local.dir | 这是Spark原生写本地文件,包括在磁盘上存储RDD的地方。默认是/tmp。强烈推荐把它设置在快速存储(首选SSD)上,分配大量的空间。在序列化对象到磁盘时,就会落入这个位置,如果这个路径下没有空间,将会出现不确定的行为 |
运行时环境参数
应用属性 | 描述 |
---|---|
spark.executor.logs.rolling.* | 有四个属性用于设定及维护spark日志的滚动。当spark应用长周期运行时(超过24小时),应该设置这个属性 |
spark.python.worker.memory | 如果使用python开发spark应用,这个属性将为每个python worker进程分配总内存数。默认值512m |
shuffle行为参数 |
应用参数 | 描述 |
---|---|
spark.shuffle.manager | 这是Spark里shuffle数据的实现方法,默认为“sort”。这里提到它是因为,如果应用使用Tungsten,应该把这个属性值设置为“Tungsten-sort”。 |
spark.shuffle.memoryFraction | 这个是shuffle操作溢写到磁盘时java堆占总内存的比例。默认值为0.2。如果该应用经常溢出,或者多个shuffle操作同时发生,应该把这个值设置的更高。通常。可以从0.2升到0.4,然后再到0.6,确保一切保持稳定。这个值建议不要超过0.6,否则它可能会碍事,与其他对象竞争堆空间 |
spark.shuffle.service.enabled | 在Spark内开启外部的shuffle服务。如果需要调度动态分配,就必须设置这个属性。默认为false |
压缩及序列化参数
应用属性 | 描述 |
---|---|
spark.kryo.classToRegister | 当使用Kryo做对象序列化时,需要注册这些类。对于Kryo将序列化的应用所用的所有自定义对象,必须设置这些属性 |
spark.kryo.register | 当自定义类需要扩展“KryoRegister”类接口时,用它代替上面的属性。 |
spark.rdd.compress | 设置是否应当压缩序列化的RDD,默认false。但是和前面说过的一样,如果硬件充足,应该启用这个功能,因为这时的CPU性能损失可以忽略不计 |
spark.serializer | 如果设置这个值,将使用Kryo序列化,而不是使用java的默认序列化方法。强烈推荐配置成Kryo序列化,因为这样可以获得最佳的性能,并改善程序的稳定性 |
执行行为参数
应用属性 | 描述 |
---|---|
spark.cleaner.ttl Spark | 记录任何对象的元数据的持续时间(按照秒来计算)。默认值设为“infinite”(无限),对长时间运行的job来说,可能会造成内存泄漏。适当地进行调整,最好先设置为3600,然后监控性能。 |
spark.executor.cores | 每个executor的CPU核数。这个默认值基于选择的资源调度器。如果使用YARN或者Standalone集群模式,应该调整这个值 |
网络参数
应用属性 | 描述 |
---|---|
spark.akka.frameSize | Spark集群通信中最大消息的大小。当程序在带有上千个map及reduce任务的大数据集上运行时,应该把这个值从128调为512,或者更高。 |
spark.akka.threads | 用于通信的Akka的线程数。对于运行多核的driver应用,推荐将这个属性值从4提高为16、32或更高调度参数 |
应用属性 | 描述 |
---|---|
spark.cores.max | 设置应用程序从集群所有节点请求的最大CPU内核数。如果程序在资源有限的环境下运行,应该把这个属性设置最大为集群中spark可用的CPU核数 |
数据倾斜¶
对于大数据来说,数据量大并不可怕,可怕的是数据倾斜。由于数据分布不均匀,造成数据大量的几种在某个点上,造成数据热点问题。
https://segmentfault.com/a/1190000021814886?utm_source=tag-newest#item-1 http://www.jasongj.com/spark/skew/