大数据平台框架常用参数优化

本文记录大数据平台框架的一些常用参数,这些参数基本是我见过的或者实际使用过的,我会列出参数的含义以及使用效果,具有一定的参考意义。当然,根据实际的场景不同,参数值并不能随便设置为一样,必须要考虑到实际的情况,否则可能没有效果,或者具有反作用。

会保持更新。

Hadoop

选择 HBaseHadoop 时注意版本适配的问题,Hadoop 选择 v2.7.1 还是很好的,能适配 HBase v1.2.x 以及以上的版本【Hbase 兼容的 Hadoop 版本参见:hbase-configuration 】,也能适配 Hive v0.10.0 以及以上的版本【Hive 兼容的 Hadoop 版本参见:hive-downloads 】。

HDFS

  • fs.hdfs.impl.disable.cache,如果设置为 true,表示关闭文件系统的缓存,这样多线程手动处理 HDFS 文件时,不会 IOException: Filesystem closed

MapReduce

配置文件:mapred-site.xml

Yarn 资源模型:

1
2
3
4
Node Manager -> yarn.nodemanager.resource.memory-mb
YARN container -> yarn.scheduler.minimum-allocation-mb、yarn.scheduler.maximum-allocation-mb
Mapper/Reducer -> mapreduce.map.memory.mb、mapreduce.reduce.memory.mb
JVM -> mapred.map.child.java.opts、mapred.reduce.child.java.opts

对于内存参数的配置,注意它们之间的受限关系,取值不能乱设置,总体来说越具体的参数取值越小,例如常见的一般把 mapreduce.map.java.opts 的值配置成 mapreduce.map.memory.mb * 0.9

  • map 并发大小:mapreduce.job.running.map.limit,可以设置大点,50、100 随便
  • map 内存大小:mapreduce.map.memory.mb,单位为 MB,一般 4GB 够用
  • reduce 启动延迟:mapred.reduce.slowstart.completed.maps,表示 reducemap 执行到什么程度可以启动,例如设置为 1.0 表示等待 map 全部完成后才能执行 reduce
  • reduce 内存大小:mapreduce.reduce.memory.mb,单位为 MB,要根据实际情况设置,一般 4GB 够用
  • reduce 虚拟内存:yarn.nodemanager.vmem-pmem-ratio,一般 2-5 即可
  • reduce 并发大小:mapreduce.job.running.reduce.limit,一般 5-10 个够用【根据业务场景、机器资源而定】
  • mapred.map.child.java.optsMapJVM 参数,例如:-Xmx200m
  • mapred.reduce.child.java.optsReduceJVM 参数,例如:-Xmx200m
  • mapreduce.admin.map.child.java.opts,作用同 mapred.map.child.java.opts,优先级最高,会覆盖掉用户设置的
  • mapreduce.admin.reduce.child.java.opts,作用同 mapred.reduce.child.java.opts,优先级最高,会覆盖掉用户设置的

es-hadoop

使用 es-hadoop 框架处理 Elasticsearch 数据,可以专注于数据 ETL 处理逻辑,其它与集群交互的读写操作交给 es-hadoop 框架处理,这里面有一些常用的参数。

参考官方文档:configuration

  • 读取,只读取指定的字段:es.read.field.include,默认为空,读取全部字段,注意,在 query 中设置 _source 是无效的
  • 读取,排除指定的字段:es.read.field.exclude,默认为空,则不排除任何字段
  • 读取,关闭日期的处理:es.mapping.date.rich,默认为 true,关闭后,读取 Elasticsearchdate 类型的字段,会自动转换为 long 类型,不再是 date 类型
  • 读取,解析指定字段为数组类型:es.read.field.as.array.include,默认为空,则不解析任何字段【字段类型保持原样】
  • 读取,排除解析指定字段为数组字段:es.read.field.as.array.exclude,默认为空,则不排除任何字段【字段该是数组的还是数组,不是数组的仍旧保持原样】

yarn

配置文件:mapred-site.xml

  • yarn.nodemanager.local-dirs,临时目录

HBase

majorcompactionmijorcompaction 的说明。

  • hbase.hregion.majorcompactionHBase 自动做 major_compact 的周期,会严重影响写入性能,建议定期手动做
  • hbase.client.retries.number,客户端连接重试次数,建议设置大一点,例如 24
  • hbase.rootdirHBaseHDFS 中的根目录
  • zookeeper.znode.parentHBaseZookeeper 的根目录,例如使用 Phoenix 登录时需要

Elasticsearch

总述,在设置 Elasticsearch 堆大小时需要通过 $ES_HEAP_SIZE 环境变量,遵循两个规则:

  • 不要超过可用 RAM 的 50%,Lucene 能很好利用文件系统的缓存,它是通过系统内核管理的,如果没有足够的文件系统缓存空间,性能会受到影响。 此外,专门用于堆的内存越多意味着其它可用的内存越少,例如 fielddata
  • 不要超过 32GB,如果堆大小小于 32GBJVM 可以利用指针压缩,这可以大大降低内存的使用,每个指针是 4 字节而不是 8 字节

分片的分配:shards-allocation
脚本的使用:modules-scripting-using
熔断器相关:circuit-breaker
节点选举、故障检测:modules-discovery-zen

fielddata,对字段进行 agg 时,会把数据加载到内存中【索引数据时不会】,记录的是占用内存空间情况,超过指定的值,开始回收内存,防止 OOM

allocate 表示分片的复制分配;relocate 表示分片再次进行 allocateRecovery 表示将一个索引的未分配 shard allocate 到一个结点的过程,在快照恢复、更改索引副本数量、结点故障、结点启动时发生。

如果设置索引副本数为 1,同一个索引的主分片、副本分片不会被分配在同一个节点上面,这才能保证数据高可用,挂了一个节点也没关系【如果一台物理节点开启了两个 Elasticsearch 节点,需要注意使用 cluster.routing.allocation.same_shard.host 参数】。

  • 磁盘空间使用占比上限:cluster.routing.allocation.disk.watermark.high,默认为 90%,表示如果当前节点的磁盘使用占比超过这个值,则分片【针对所有类型的分片:主分片、副本分片】会被自动 relocate 到其它节点,并且任何分片都不会 allocate 到当前节点【此外,对于新创建的 primary 分片也是如此,尽管不是 allocate 动作,除非整个 Elasticsearch 集群只有一个节点】
  • 磁盘空间使用占比下限:cluster.routing.allocation.disk.watermark.low,默认为 85%,表示如果当前节点的磁盘使用占比超过这个值,则分片【新创建的 primary 分片、从来没有进行过 allocate 的分片除外】不会被 allocate 到当前节点
  • 索引的分片副本数:number_of_replicas,一般设置为 1,表示总共有 2 份数据
  • 每个节点分配的分片个数:total_shards_per_node,一般设置为 2,一个节点只分配 2 个分片,分别为主分片、副本分片
  • 索引的分片个数:number_of_shards,当索引数据很大时,一般设置为节点个数【例如 索引数据大小 / 50GB 大于节点个数,例如 10 个节点,索引大小 800GB,此时按照官方建议应该设置 16 个分片,但是分片过多也不好,就可以设置 10 个分片,每个分片大小 80GB】,再配合 分片副本数为 1 每个节点分配的分片个数为 2,就可以确保分片分配在所有的节点上面,并且每个节点上有 2 个分片,分别为主分片、副本分片
  • 数据刷新时间:refresh_interval,表示数据写入后等待多久可以被搜索到,默认值 1s,每次索引的 refresh 会产生一个新的 lucene 段,这会导致频繁的合并行为,如果业务需求对实时性要求没那么高,可以将此参数调大,例如调整为 60s,会大大降低 cpu 的使用率
  • 索引的分片大小,官方建议是每个分片大小在 30GB50GB 不要超过 50GB,所以当索引的数据很大时,就要考虑增加分片的数量
  • 设置 terms 最大个数:index.max_terms_count,默认最大个数为 65535,可以根据集群情况降低,例如设置为 10000,为了集群稳定,一般不需要设置那么大
  • 设置 Boolean Query 的子语句数量:indices.query.bool.max_clause_count,默认为 1024,不建议增大这个值,也可以根据集群情况适当减小
  • 查看热点线程:http://your_ip:your_port/_nodes/your_node_name/hot_threads,可以判断热点线程是 searchbulk,还是 merge,从而进一步分析是查询还是写入导致负载过高
  • 数据目录:path.data: /path/to/data,多个目录使用逗号分隔,里面存放数据文件
  • 日志目录:path.logs: /path/to/logs,里面存放的是节点的日志、慢查询日志、慢索引日志
  • 家目录:path.home: /path/to/homeelasticsearch 的家目录,里面有插件、lib、配置文件等
  • 插件目录:path.plugins: /path/to/plugins,插件目录,里面存放的是插件,例如:分词器
  • 设置慢获取时间边界:index.search.slowlog.threshold.fetch.warn: 30s,超过这个时间的信息会被记录在日志文件中,path.logs 参数指定的目录中 cluster-name_index_fetch_slowlog.log 文件
  • 设置慢查询时间边界:index.search.slowlog.threshold.query.warn: 60s,超过这个时间的信息会被记录在日志文件中,path.logs 参数指定的目录中 cluster-name_index_search_slowlog.log 文件
  • 设置慢索引时间边界:index.search.slowlog.threshold.index.warn: 60s,超过这个时间的信息会被记录在日志文件中,path.logs 参数指定的目录中 cluster-name_index_indexing_slowlog.log 文件
  • 禁止集群重分配:cluster.routing.allocation.enable=none,手动操作分片前需要关闭,否则会引起分片的移动,造成不必要的 IO
  • 开启集群重分配:cluster.routing.allocation.enable=all,集群的分片管理权限交由集群,保持数据均衡
  • 设置集群均衡分片时可以同时 rebalance 分片的个数,cluster.routing.allocation.cluster_concurrent_rebalance:2,不宜设置过大,一般 2-4 个为好,当然如果集群资源足够或者需要快速均衡分片,可以设置大一点
  • 允许分片分配,cluster.routing.allocation.enable=all,开启后分片的分配交由集群管理,如果偶尔需要手动管理分片或者集群停机重启,可以临时关闭,取值设置为 none 即可
  • 推迟索引的分片分配时间,在分片节点出故障或者重启时,可以避免分片数据的移动,前提是及时把节点恢复:index.unassigned.node_left.delayed_timeout=5m,通俗点说,就是趁分片不注意,节点已经恢复了,此时数据分片保持不变,避免了不必要的 IO
  • index.max_slices_per_scroll,除了传统的 scroll 读取数据的方式,v5.x 之后 Elasticsearch 又增加了对每个分片读取数据的功能,称之为切片处理【sliced scroll】,这种读取方式可以对多个分片并行读取数据,大大提高了取数效率,elasticsearch-hadoop 就是采用这种方式读取数据的。但是,这里面有一个限制,Elasticsearch 默认一个 scroll 最大的切片数量为 1024【一般小于等于分片数,也可以通过指定切片字段来创建大于分片数的切片】,可以通过 index.max_slices_per_scroll 参数来变更【不建议更改】
  • cluster.routing.allocation.same_shard.host,在单台物理节点配置多个 Elasticsearch 实例时,这个参数才生效,用来检查同一个分片的多个实例【主分片、副本分片】是否能分配在同一台主机上面,默认值为 false。如果设置为 true,表示开启检查机制,一台物理机上面启动 2 个 Elasticsearch 节点,则分配相同编号的分片时,不会都在这台机器上面,尽管可以满足主分片、副本分片不在同一个 Elasticsearch 节点上
  • script.groovy.sandbox.enabled: false,禁用 Grovvy 脚本,默认是关闭的
  • script.inline: false,允许使用内置 painless 脚本
  • script.stored: false,允许使用保存在 config/scripts 中的脚本,调用时使用 id 即可,类似方法名
  • script.file: false,允许使用外部脚本文件
  • http.port: 9200,集群的 HTTP 端口号
  • transport.tcp.port: 9300,集群的 TCP 端口号
  • thread_pool.bulk.queue_size: 1500bulk 队列的大小
  • indices.breaker.total.use_real_memory: true,熔断器回收内存,防止 OOM,决定父熔断器是考虑实际内存使用情况,还是仅考虑子熔断器内存使用情况
  • indices.breaker.total.limit: 70%,熔断器回收内存,防止 OOM,当 use_real_memorytrue 时,默认为 95%,否则默认为 70%
  • indices.breaker.fielddata.limit: 40%,熔断器回收内存,防止 OOM,默认 40%
  • indices.breaker.request.limit: 60%,熔断器回收内存,防止 OOM,默认 60%
  • discovery.zen.fd.ping_timeout: 60s,集群故障检测
  • discovery.zen.fd.ping_interval: 10s,集群故障检测
  • discovery.zen.fd.ping_retries: 10,集群故障检测
    • discovery.zen.master_election.ignore_non_master_pings: true,选举主节点,设置为 true 时非 node.master 节点不能参与选举,投票也无效
  • discovery.zen.minimum_master_nodes: 2,选举主节点,最少有多少个备选主节点参加选举,防止脑裂现象
  • discovery.zen.ping_timeout: 10s,选举主节点
  • discovery.zen.ping.unicast.hosts: ["ip1:port","ip2:port"],选举主节点,主机列表
  • node.data: true,数据节点
  • node.master: true,有资格被选举为主节点

Spark

  • 序列化方式:spark.serializer,可以选择:org.apache.spark.serializer.KryoSerializer
  • executor 附加参数:spark.executor.extraJavaOptions,例如可以添加:-Dxx=yy【如果仅仅在 driver 端设置,executor 是不会有的】
  • driver 附加参数:spark.driver.extraJavaOptions,例如可以添加:-Dxx=yy
  • 日志配置文件设置,Spark 使用的是 log4j,默认在 Spark 安装目录的 conf 下面,如果想要增加 log4j 相关配置,更改 driver 机器上面的 log4j.properties 配置文件是无效的,必须把所有的 executor 节点上的配置文件全部更新。如果没有权限,也可以自己上传配置文件,然后需要在 executor 附加参数中指定:-Dlog4j.configuration=file:/path/to/file,启动 Spark 任务时还需要使用 --files 指定配置文件名称,多个用逗号分隔,用来上传配置文件到 Spark 节点
  • 开启允许多 SparkContext 存在:spark.driver.allowMultipleContexts,设置为 true 即可,在使用多个 SparkContext 时,需要先停用当前活跃的,使用 stop 方法【在 Spark v2.0 以及以上版本,已经取消了这个限制】
  • spark.executor.cores,每个执行器上面的占用核数,会消耗 CPU,一般设置为 2-3
  • spark.executor.memory,执行器上面的堆内存大小,一般设置为 2048M4096M
  • spark.port.maxRetries,提交任务的 Spark UI 重试次数
  • spark.default.parallelism,默认并行度
  • spark.cores.max,最大核心数
  • spark.executor.logs.rolling.strategy
  • spark.executor.logs.rolling.maxRetainedFiles
  • spark.executor.logs.rolling.size.maxBytes
  • spark.ui.showConsoleProgress

Kafka 输入数据源

Spark Streaming 配置:

  • spark.streaming.backpressure.enabled,开启反压机制
  • spark.streaming.backpressure.pid.minRate
  • spark.streaming.kafka.maxRatePerPartition,每个 partition 的最大读取速度,单位秒,一般设置 500-100 即可
  • spark.streaming.receiver.maxRatereceiver 最大处理数据量,单位秒,与 maxRatePerPartitionDurations 有关,实际运行时由于反压机制,数据处理速度会低于这个值
  • spark.streaming.receiver.writeAheadLog.enable
  • spark.streaming.stopGracefullyOnShutdown,优雅地退出
  • spark.streaming.gracefulStopTimeout
  • xx

Kakfa 配置:

  • metadata.broker.list
  • offsets.storage,设置为 kafka
  • zookeeper.connect
  • zookeeper.connection.timeout.ms
  • group.id
  • fetch.message.max.bytes
  • auto.offset.reset,消费的起始位置,这个参数高低版本之间的名称、值都会不同,需要注意
  • consumer.timeout.ms
  • rebalance.max.retries
  • rebalance.backoff.ms

集群参数

yarn 集群:

  • 待定

standalone 集群

  • 临时目录:SPARK_LOCAL_DIRS,用来存放 Spark 任务运行过程中的临时数据,例如内存不足时把数据缓存到磁盘,就会有数据写入这个目录,当然,在启动 Spark 任务时也可以单独指定,但是最好还是设置在集群上面,可以在 spark-env.sh 脚本中设置,键值对的形式,例如:SPARK_LOCAL_DIRS=/your_path/spark/local。需要注意的是,启动 Excutor 的用户必须有这个目录的写权限,并且保证这个目录的磁盘空间足够使用,否则在 Spark 任务中会出现异常:java.io.IOException: Failed to create local dir in xx,进而导致 Task 失败
  • Work 目录:SPARK_WORKER_DIR,用来存放 Work 的信息,设置方式同上面的 SPARK_LOCAL_DIRS,如果 Spark 任务里面有 System.out (),输出的内容在此目录下
  • SPARK_LOG_DIRSpark 集群自身的日志文件,例如 Work 接收 Spark 任务后通信的内容

Storm

  • StormUI nimbus 内容传输大小限制:nimbus.thrift.max_buffer_size: 1048576,取值的单位是字节,默认为 1048576,如果 nimbus 汇报的内容过多,超过这个值,则在 StormUI 上面无法查看 Topology Summary 信息,会报错:Internal Server Error org.apache.thrift7.transport.TTransportException: Frame size (3052134) larger than max length (1048576)
  • 执行实例 worker 对应的端口号:supervisor.slots.ports:,可以设置多个,和 CPU 的核数一致,或者稍小,提高机器资源的使用率
  • workerJVM 参数:WORKER_GC_OPTS,取值参考:-Xms1G -Xmx5G -XX:+UseG1GC,根据集群机器的资源多少而定,G1 是一种垃圾回收器
  • supervisorJVM 参数:SUPERVISOR_GC_OPTS,取值参考:-Xms1G -Xmx5G -XX:+UseG1GC,根据集群机器的资源多少而定,G1 是一种垃圾回收器
  • Storm UI 的服务端口:ui.port,可以使用浏览器打开网页查看 Topology 详细信息
  • ZooKeeper 服务器列表:storm.zookeeper.servers
  • ZooKeeper 连接端口:storm.zookeeper.port
  • ZooKeeperStorm 的根目录位置:storm.zookeeper.root,用来存放 Storm 集群元信息
  • 客户端连接 ZooKeeper 超时时间:storm.zookeeper.session.timeout
  • Storm 使用的本地文件系统目录:storm.local.dir,注意此目录必须存在并且 Storm 进程有权限可读写
  • Storm 集群运行模式:storm.cluster.mode,取值可选:distributedlocal

Kafka

注意,Kafka 的不同版本参数名、参数值会有变化,特别是 v0.9.x 之后,与之前的低版本差异很大,例如数据游标的参数可以参考我的另外一篇博文:记录一个 Kafka 错误:OffsetOutOfRangeExceptionKafka 官网参见:Kafka-v0.9.0.x-configuration

配置优化都是修改 server.properties 文件中参数值。

  • JVM 参数:KAFKA_HEAP_OPTS,取值参考:-Xmx2G
  • 文件存放位置:log.dirs,多个使用逗号分隔,注意所有的 log 级别需要设置为 INFO
  • 单个 Topic 的文件保留策略:log.retention.hours=72【数据保留 72 小时,超过时旧数据被删除】,log.retention.bytes=1073741824【数据保留 1GB,超过时旧数据被删除】
  • 数据文件刷盘策略:log.flush.interval.messages=10000【每当 producer 写入 10000 条消息时,刷数据到磁盘】,log.flush.interval.ms=1000【每间隔 1 秒钟时间,刷数据到磁盘】
  • Topic 的分区数量:num.partitions=8
  • 启动 Fetch 线程给副本同步数据传输大小限制:replica.fetch.max.bytes=10485760,要比 message.max.bytes
  • message.max.bytes=10485700,这个参数决定了 broker 能够接收到的最大消息的大小,要比 max.request.size
  • max.request.size=10480000,这个参数决定了 producer 生产消息的大小
  • fetch.max.bytes=10485760,这个参数决定了 consumer 消费消息的大小,要比 message.max.bytes
  • broker 处理消息的最大线程数:num.network.threads=17,一般 num.network.threads 主要处理网络 IO,读写缓冲区数据,基本没有 IO 等待,配置线程数量为 CPU 核数加 1
  • broker 处理磁盘 IO 的线程数:num.io.threads=32num.io.threads 主要进行磁盘 IO 操作,高峰期可能有些 IO 等待,因此配置需要大些,配置线程数量为 CPU 核数 2 倍,最大不超过 3 倍
  • 强制新建一个 segment 的时间:log.roll.hour=72
  • 是否允许自动创建 Topicauto.create.topics.enable=true,如果设置为 false,则代码无法创建,需要通过 kafka 的命令创建 Topic
  • auto.offset.reset,关于数据游标的配置【earliestlatestsmallestlargest】,由于不同版本之间的差异,可以参考:记录一个 Kafka 错误:OffsetOutOfRangeException
  • advertised.host.nameadvertised.port,关于外网集群可以访问的配置,跨网络生产、消费数据,v082 以及之前的版本【之后的版本有保留这两个参数,但是不建议使用】
  • advertised.listenerslisteners,关于外网集群可以访问的配置,跨网络生产、消费数据,v090 以及之后的版本

留意参数取值大小的限制:fetch.max.bytes 大于 message.max.bytes 大于 max.request.sizereplica.fetch.max.bytes 大于 message.max.bytes 大于 max.request.size

Zookeeper

配置 zoo.cfg 文件:

  • dataDir,表示快照日志目录
  • dataLogDir,表示事务日志目录,不配置的时候事务日志目录同 dataDir
  • clientPort=2181,服务的监听端口
  • tickTime=2000Zookeeper 的时间单元,Zookeeper 中所有时间都是以这个时间单元的整数倍去配置的,例如,session 的最小超时时间是 2*tickTime【单位:毫秒】
  • syncLimit=5,表示 FollowerObserverLeader 交互时的最大等待时间,只不过是在与 leader 同步完毕之后,进入正常请求转发或 ping 等消息交互时的超时时间
  • initLimit=10ObserverFollower 启动时,从 Leader 同步最新数据时,Leader 允许 initLimit * tickTime 的时间内完成,如果同步的数据量很大,可以相应地把这个值设置大一些
  • maxClientCnxns=384,最大并发客户端数,用于防止 Ddos 的,默认值是 10,设置为 0 是不加限制
  • maxSessionTimeout=120000Session 超时时间限制,如果客户端设置的超时时间不在这个范围,那么会被强制设置一个最大时间,默认的 Session 超时时间是在 2 * tickTime ~ 20 * tickTime 这个范围
  • minSessionTimeout=4000,同 maxSessionTimeout
  • server.x=hostname:2888:3888x 是一个数字,与每个服务器的 myid 文件中的 id 是一样的,hostname 是服务器的 hostname,右边配置两个端口,第一个端口用于 FollowerLeader 之间的数据同步和其它通信,第二个端口用于 Leader 选举过程中投票通信
  • autopurge.purgeInterval=24,在 v3.4.0 及之后的版本,Zookeeper 提供了自动清理事务日志文件和快照日志文件的功能,这个参数指定了清理频率,单位是小时,需要配置一个 1 或更大的整数。默认是 0,表示不开启自动清理功能
  • autopurge.snapRetainCount=30,参数指定了需要保留的事务日志文件和快照日志文件的数目,默认是保留 3 个,和 autopurge.purgeInterval 搭配使用
虾丸派 wechat
扫一扫添加博主,进技术交流群,共同学习进步
永不止步
0%