开发公司Spark SQL参数调优指南

目录

一、常用参数

  1. -- map开发公司开发公司端小文件合并
  2. set hive.merge.mapfiles=true;
  3. -- reduce端小文件合并
  4. set hive.merge.mapredfiles=true;
  5. -- 开发公司开发公司小文件合并的阈值
  6. set spark.sql.mergeSmallFileSize=128000000;
  7. -- 小文件合并的task中,每个task开发公司开发公司读取的数据量
  8. set spark.sql.targetBytesInPartitionWhenMerge=128000000;
  9. -- 普通task读取的数据量,开发公司原来的值是33554432 (33M)
  10. set spark.hadoopRDD.targetBytesInPartition=256000000;
  11. -- 启动参数
  12. set spark.sql.rangePartition.exchangeCoordinator=true;
  13. -- 控制shuffle阶段每个task开发公司读取的数据量为256M
  14. --spark2开发公司只对最后一个stage进行shuffle分区合并,spark3对中间的stage也生效,在spark2开发公司的时候有些同学会依赖spark.sql.adaptive.shuffle.targetPostShuffleInputSize开发公司进行小文件合并,在spark3开发公司上如果设置的这个参数,开发公司影响了中间stage开发公司可能会使作业的运行时间变长,开发公司我们对小文件的问题有专门的feature,开发公司可以设置下面两个参数
  15. set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=256000000;

二、正文

开发公司本文目标读者为对美团开发公司点评数据仓库和有基本开发公司了解的数据开发人员。

目前,开发公司在美团点评的数仓生产开发公司中主要执行Hive表到Hive开发公司表的生产任务,开发公司尤其对有多轮shuffle开发公司的作业有很好的性能提升。开发公司美团点评用于ETL生产的Spark开发公司集群架设在HDFS、Yarn之上,依靠Hive metastore server开发公司进行元数据管理。

Spark开发公司完成一个数据生产任务(执行一条SQL)开发公司的基本过程如下:

(1)对SQL开发公司进行语法分析,开发公司生成逻辑执行计划——(2)从Hive metastore server开发公司获取表信息,开发公司结合逻辑执行计划生成开发公司并优化物理执行计划——(3)开发公司根据物理执行计划向Yarn申请资源(executor),调度task到executor执行。——(4)从HDFS读取数据,任务执行,开发公司任务执行结束后将数据写回HDFS。

开发公司本文从作业运行行为、executor处理能力、driver能力三个方面进行介绍。

作业运行行为主要影响(3)阶段中,task被如何调度,有时Reducer的个数等。

executor处理能力主要影响(3)阶段中task被调度到executor后,executor能否正常完成任务。

driver能力主要影响(2)(3)阶段。在(2)阶段中,如果用户表为ORC表,driver可能读取file footer等信息,会导致driver读取HDFS,如果这部分信息太大,则可能会造成driver存在内存压力。(3)阶段中,driver与Yarn RM交互,申请到executor后进行调度,task执行结束会生成一定执行指标信息和任务执行元数据信息返回给driver,同时每个executor还会和driver有心跳连接,这些都是driver运行的负载。

1 运行行为

1.1 动态生成分区

下列Hive参数对Spark同样起作用。

set hive.exec.dynamic.partition=true; // 是否允许动态生成分区

set hive.exec.dynamic.partition.mode=nonstrict; // 是否容忍指定分区全部动态生成

set hive.exec.max.dynamic.partitions = 100; // 动态生成的最多分区数

1.2 broadcast join

当大表JOIN小表时,如果小表足够小,可以将大表分片,分别用小表和每个大表的分片进行JOIN,最后汇总,能够大大提升作业性能。

spark.sql.autoBroadcastJoinThreshold 平台默认值为26214400(25M),如果小表的大小小于该值,则会将小表广播到所有executor中,使JOIN快速完成。如果该值设置太大,则会导致executor内存压力过大,容易出现OOM。

注:ORC格式的表会对数据进行压缩,通常压缩比为2到3左右,但有些表的压缩比就会很高,有时可以达到10。请妥善配置该参数,并配合spark.executor.memory,使作业能够顺利执行。

使用hint强制做broadcastjoin:

有时候,可能会遇到引擎无法识别表大小的情况,可以使用hint强制执行broadcast join,如下所示。Spark可以识别/*+ MAPJOIN(l) */和/*+ BROADCASTJOIN(u) */两种,Hive只能识别/*+ MAPJOIN(l) */,因此建议使用/*+ MAPJOIN(l) */。

代码块

SQL

select /*+ MAPJOIN(l) */ i.a, i.b, l.b
from tmp1 i join tmp2 l ON i.a = l.a;

1.3 动态资源分配

spark.dynamicAllocation.enabled:是否开启动态资源分配,平台默认开启,同时强烈建议用户不要关闭。理由:开启动态资源分配后,Spark可以根据当前作业的负载动态申请和释放资源。

spark.dynamicAllocation.maxExecutors: 开启动态资源分配后,同一时刻,最多可申请的executor个数。平台默认设置为1000。当在Spark UI中观察到task较多时,可适当调大此参数,保证task能够并发执行完成,缩短作业执行时间。

下图是一个由于并发不足导致作业执行较慢的一个明显的任务:

打开执行时间较长的stage,查看其任务数为2w+。

点击stage的链接,进入查看stage中的任务,将任务按照Launch Time排序,先有小到大再由大到小。

可以看到任务启动时间差了3个多小时。可以确定该任务是由于spark.dynamicAllocation.maxExecutors过小导致的。

该参数可以和spark.executor.cores配合增大作业并发度。s

spark.dynamicAllocation.minExecutors: 和s,d,maxExecutors相反,此参数限定了某一时刻executor的最小个数。平台默认设置为3,即在任何时刻,作业都会保持至少有3个及以上的executor存活,保证任务可以迅速调度。

1.4 Shuflle相关

spark.sql.shuffle.partitions: 在有JOIN或聚合等需要shuffle的操作时,从mapper端写出的partition个数,平台默认设置为2000。

如select a, avg(c) from test_table group by a语句,不考虑优化行为,如果一个map端的task中包含有3000个a,根据spark.sql.shuffle.partitions=2000,会将计算结果分成2000份partition(例如按2000取余),写到磁盘,启动2000个reducer,每个reducer从每个mapper端拉取对应索引的partition。

当作业数据较多时,适当调大该值,当作业数据较少时,适当调小以节省资源。

spark.sql.adaptive.enabled:是否开启调整partition功能,如果开启,spark.sql.shuffle.partitions设置的partition可能会被合并到一个reducer里运行。平台默认开启,同时强烈建议开启。理由:更好利用单个executor的性能,还能缓解小文件问题。

spark.sql.adaptive.shuffle.targetPostShuffleInputSize:和spark.sql.adaptive.enabled配合使用,当开启调整partition功能后,当mapper端两个partition的数据合并后数据量小于targetPostShuffleInputSize时,Spark会将两个partition进行合并到一个reducer端进行处理。平台默认为67108864(64M),用户可根据自身作业的情况酌情调整该值。当调大该值时,一个reduce端task处理的数据量变大,最终产出的数据,存到HDFS上的文件也变大。当调小该值时,相反。

代码块

Plain Text

 
18/01/16 03:18:03 WARN TransportChannelHandler: Exception in connection from rz-data-hdp-dn3938.rz.sankuai.com/10.16.47.49:7337
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 2013265920, max: 2022178816)
  at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:530)
  at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:484)
  at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:711)
  at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:700)
  at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:237)
  at io.netty.buffer.PoolArena.allocate(PoolArena.java:221)
  at io.netty.buffer.PoolArena.allocate(PoolArena.java:141)
  at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:296)
  at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177)
  at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168)
  at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129)
  at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
  at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
  at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
  at java.lang.Thread.run(Thread.java:745)

以及

代码块

Plain Text

18/01/16 18:36:03 WARN TransportChannelHandler: Exception in connection from rz-data-hdp-dn0871.rz.sankuai.com/10.16.57.13:34925
java.lang.IllegalArgumentException: Too large frame: 3608642420
  at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
  at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133)
  at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81)
  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
  at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
  at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
  at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
  at java.lang.Thread.run(Thread.java:745)

spark.sql.adaptive.minNumPostShufflePartitions: 当spark.sql.adaptive.enabled参数开启后,有时会导致很多分区被合并,为了防止分区过少,可以设置spark.sql.adaptive.minNumPostShufflePartitions参数,防止分区过少而影响性能。

1.5 读ORC表优化

spark.hadoop.hive.exec.orc.split.strategy参数控制在读取ORC表时生成split的策略。BI策略以文件为粒度进行split划分;ETL策略会将文件进行切分,多个stripe组成一个split;HYBRID策略为:当文件的平均大小大于hadoop最大split值(默认256 * 1024 * 1024)时使用ETL策略,否则使用BI策略。

对于一些较大的ORC表,可能其footer较大,ETL策略可能会导致其从hdfs拉取大量的数据来切分split,甚至会导致driver端OOM,因此这类表的读取建议使用BI策略。

对于一些较小的尤其有数据倾斜的表(这里的数据倾斜指大量stripe存储于少数文件中),建议使用ETL策略。

另外,spark.hadoop.mapreduce.input.fileinputformat.split.maxsize参数可以控制在ORC切分时stripe的合并处理。具体逻辑是,当几个stripe的大小大于spark.hadoop.mapreduce.input.fileinputformat.split.maxsize时,会合并到一个task中处理。可以适当调小该值,如set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize=134217728。以此增大读ORC表的并发。

比如我想要以64M大小划分split,可以在XT ETL中设置如下参数:

代码块

SQL

 
set spark.hadoop.hive.exec.orc.split.strategy=ETL;
set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize=64000000;
set spark.hadoop.mapreduce.input.fileinputformat.split.minsize=64000000;
-- 避免将拆分的split再合并
set spark.hadoopRDD.targetBytesInPartition=-1;

2 executor能力

2.1内存

spark.executor.memory:executor用于缓存数据、代码执行的堆内存以及JVM运行时需要的内存。当executor端由于OOM时,多数是由于spark.executor.memory设置较小引起的。该参数一般可以根据表中单个文件的大小进行估计,但是如果是压缩表如ORC,则需要对文件大小乘以2~3倍,这是由于文件解压后所占空间要增长2~3倍。平台默认设置为2G。

spark.yarn.executor.memoryOverhead:Spark运行还需要一些堆外内存,直接向系统申请,如数据传输时的netty等。

Spark根据spark.executor.memory+spark.yarn.executor.memoryOverhead的值向RM申请一个容器,当executor运行时使用的内存超过这个限制时,会被yarn kill掉。在Spark UI中相应失败的task的错误信息为:

代码块

Plain Text

 
Container killed by YARN for exceeding memory limits. XXX of YYY physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

这个时候,适当调大spark.yarn.executor.memoryOverhead。平台默认设置为1024(1G),注意:该参数的单位为MB。但是,如果用户在代码中无限制的使用堆外内存。调大该参数没有意义。需要用户了解自己的代码在executor中的行为,合理使用堆内堆外内存。

spark.sql.windowExec.buffer.spill.threshold:当用户的SQL中包含窗口函数时,并不会把一个窗口中的所有数据全部读进内存,而是维护一个缓存池,当池中的数据条数大于该参数表示的阈值时,spark将数据写到磁盘。该参数如果设置的过小,会导致spark频繁写磁盘,如果设置过大则一个窗口中的数据全都留在内存,有OOM的风险。但是,为了实现快速读入磁盘的数据,spark在每次读磁盘数据时,会保存一个1M的缓存。

举例:当spark.sql.windowExec.buffer.spill.threshold为10时,如果一个窗口有100条数据,则spark会写9((100 - 10)/10)次磁盘,在读的时候,会创建9个磁盘reader,每个reader会有一个1M的空间做缓存,也就是说会额外增加9M的空间。

当某个窗口中数据特别多时,会导致写磁盘特别频繁,就会占用很大的内存空间作缓存。因此如果观察到executor的日志中存在大量如下内容,则可以考虑适当调大该参数,平台默认该参数为40960。

代码块

Plain Text

 
pilling data because number of spilledRecords crossed the threshold


2.2 executor并发度

spark.executor.cores:单个executor上可以同时运行的task数。Spark中的task调度在线程上,该参数决定了一个executor上可以并行执行几个task。这几个task共享同一个executor的内存(spark.executor.memory+spark.yarn.executor.memoryOverhead)。适当提高该参数的值,可以有效增加程序的并发度,是作业执行的更快,但使executor端的日志变得不易阅读,同时增加executor内存压力,容易出现OOM。在作业executor端出现OOM时,如果不能增大spark.executor.memory,可以适当降低该值。平台默认设置为1。

该参数是executor的并发度,和spark.dynamicAllocation.maxExecutors配合,可以提高整个作业的并发度。

2.3 executor读取hive表时单task处理数据量/无shuffle作业小文件合并

spark.hadoopRDD.targetBytesInPartition:该参数是美团点评特有参数,目前还未反馈给社区。Spark在读取hive表时,默认会为每个文件创建一个task,如果一个SQL没有shuffle类型的算子,每个task执行完都会产生一个文件写回HDFS,这样就潜在存在小文件问题。该参数可以将多个文件放到一个task中处理,默认为33554432,即如果一个文件和另一个文件大小之和小于32M,就会被放到一个task钟处理。适当提高该值,可以降低调度压力,避免无shuffle作业产生过多小文件。

2.4 GC优化(使用较少,当尝试其他调优方法均无效时可尝试此方法)

executor的JVM参数传递方式为:set spark.executor.extraJavaOptions="XXXXXXXXXX "。例如,set spark.executor.extraJavaOptions="-XX:NewRatio=3 -XX:+UseG1GC"。

注:所有的JVM参数必须写在一起,不能分开。bad case:set spark.executor.extraJavaOptions="-XX:NewRatio=3 "; set spark.executor.extraJavaOptions="-XX:+UseG1GC " ;

打开GC打印:-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps

full GC 频繁:内存不够用,调大spark.executor.memory,调小spark.executor.cores。

minor GC频繁,而full GC比较少:可以适当提高Eden区大小-Xmn

如果OldGen区快要满了,适当提高spark.executor.memory(平台默认2G)或适当降低spark.memory.fraction(平台默认为0.3)或适当提高-XX:NewRatio(老年代是年轻代的多少倍,一般默认是2)。

如果spark.executor.memory调的很大且GC仍是程序运行的瓶颈,可以尝试启用G1垃圾回收器(-XX:+UseG1GC)

修改了GC的参数一定要仔细观察GC的频率和时间。

修改方法:set spark.executor.extraJavaOptions="-XX:NewRatio=3 -XX:+UseG1GC ..."

3 driver指标:

3.1 内存

spark.driver.memory:driver使用内存大小, 平台默认为10G,根据作业的大小可以适当增大或减小此值。

3.2 GC优化

通过set spark.driver.extraJavaOptions="XXXXXXXXXX "设置,具体设置内容可参考2.4节,一般情况driver内存较大,可尝试启用G1垃圾回收器。

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发