大数据平台框架常用参数优化

本文记录大数据平台框架的一些常用参数,这些参数基本是我见过的或者实际使用过的,我会列出参数的含义以及使用效果,具有一定的参考意义。当然,根据实际的场景不同,参数值并不能随便设置为一样,必须要考虑到实际的情况,否则可能没有效果,或者具有反作用。

会保持更新。

Hadoop

选择 HBaseHadoop 时注意版本适配的问题,Hadoop 选择 v2.7.1 还是很好的,能适配 HBase v1.2.x 以及以上的版本【Hbase 兼容的 Hadoop 版本参见:hbase-configuration 】,也能适配 Hive v0.10.0 以及以上的版本【Hive 兼容的 Hadoop 版本参见:hive-downloads 】。

HDFS

  • fs.hdfs.impl.disable.cache,如果设置为 true,表示关闭文件系统的缓存,这样多线程手动处理 HDFS 文件时,不会 IOException: Filesystem closed

MapReduce

  • map 并发大小:mapreduce.job.running.map.limit,可以设置大点,50、100 随便
  • map 内存大小:mapreduce.map.memory.mb,单位为 MB,一般 4GB 够用
  • reduce 启动延迟:mapred.reduce.slowstart.completed.maps,表示 reducemap 执行到什么程度可以启动,例如设置为 1.0 表示等待 map 全部完成后才能执行 reduce
  • reduce 内存大小:mapreduce.reduce.memory.mb,单位为 MB,要根据实际情况设置,一般 4GB 够用
  • reduce 虚拟内存:yarn.nodemanager.vmem-pmem-ratio,一般 2-5 即可
  • reduce 并发大小:mapreduce.job.running.reduce.limit,一般 5-10 个够用【根据业务场景、机器资源而定】

es-hadoop

使用 es-hadoop 框架处理 Elasticsearch 数据,可以专注于数据 ETL 处理逻辑,其它与集群交互的读写操作交给 es-hadoop 框架处理,这里面有一些常用的参数。

参考官方文档:configuration

  • 读取,只读取指定的字段:es.read.field.include,默认为空,读取全部字段,注意,在 query 中设置 _source 是无效的
  • 读取,排除指定的字段:es.read.field.exclude,默认为空,则不排除任何字段
  • 读取,关闭日期的处理:es.mapping.date.rich,默认为 true,关闭后,读取 Elasticsearchdate 类型的字段,会自动转换为 long 类型,不再是 date 类型
  • 读取,解析指定字段为数组类型:es.read.field.as.array.include,默认为空,则不解析任何字段【字段类型保持原样】
  • 读取,排除解析指定字段为数组字段:es.read.field.as.array.exclude,默认为空,则不排除任何字段【字段该是数组的还是数组,不是数组的仍旧保持原样】

HBase

待整理

Elasticsearch

allocate 表示分片的复制分配;relocate 表示分片再次进行 allocateRecovery 表示将一个索引的未分配 shard allocate 到一个结点的过程,在快照恢复、更改索引副本数量、结点故障、结点启动时发生。

如果设置索引副本数为 1,同一个索引的主分片、副本分片不会被分配在同一个节点上面,这才能保证数据高可用,挂了一个节点也没关系。

  • 磁盘空间使用占比上限:cluster.routing.allocation.disk.watermark.high,默认为 90%,表示如果当前节点的磁盘使用占比超过这个值,则分片【针对所有类型的分片:主分片、副本分片】会被自动 relocate 到其它节点,并且任何分片都不会 allocate 到当前节点【此外,对于新创建的 primary 分片也是如此,尽管不是 allocate 动作,除非整个 Elasticsearch 集群只有一个节点】
  • 磁盘空间使用占比下限:cluster.routing.allocation.disk.watermark.low,默认为 85%,表示如果当前节点的磁盘使用占比超过这个值,则分片【新创建的 primary 分片、从来没有进行过 allocate 的分片除外】不会被 allocate 到当前节点
  • 索引的分片副本数:number_of_replicas,一般设置为 1,表示总共有 2 份数据
  • 每个节点分配的分片个数:total_shards_per_node,一般设置为 2,一个节点只分配 2 个分片,分别为主分片、副本分片
  • 索引的分片个数:number_of_shards,当索引数据很大时,一般设置为节点个数【例如 索引数据大小 / 50GB 大于节点个数,例如 10 个节点,索引大小 800GB,此时按照官方建议应该设置 16 个分片,但是分片过多也不好,就可以设置 10 个分片,每个分片大小 80GB】,再配合 分片副本数为 1 每个节点分配的分片个数为 2,就可以确保分片分配在所有的节点上面,并且每个节点上有 2 个分片,分别为主分片、副本分片
  • 数据刷新时间:refresh_interval,表示数据写入后等待多久可以被搜索到,默认值 1s,每次索引的 refresh 会产生一个新的 lucene 段,这会导致频繁的合并行为,如果业务需求对实时性要求没那么高,可以将此参数调大,例如调整为 60s,会大大降低 cpu 的使用率
  • 索引的分片大小,官方建议是每个分片大小在 30GB50GB 不要超过 50GB,所以当索引的数据很大时,就要考虑增加分片的数量
  • 设置 terms 最大个数:index.max_terms_count,默认最大个数为 65535,可以根据集群情况降低,例如设置为 10000,为了集群稳定,一般不需要设置那么大
  • 设置 Boolean Query 的子语句数量:indices.query.bool.max_clause_count,默认为 1024,不建议增大这个值,也可以根据集群情况适当减小
  • 查看热点线程:http://your_ip:your_port/_nodes/your_node_name/hot_threads,可以判断热点线程是 searchbulk,还是 merge,从而进一步分析是查询还是写入导致负载过高
  • 数据目录:path.data: /path/to/data,多个目录使用逗号分隔,里面存放数据文件
  • 日志目录:path.logs: /path/to/logs,里面存放的是节点的日志、慢查询日志、慢索引日志
  • 家目录:path.home: /path/to/homeelasticsearch 的家目录,里面有插件、lib、配置文件等
  • 插件目录:path.plugins: /path/to/plugins,插件目录,里面存放的是插件,例如:分词器
  • 设置慢获取时间边界:index.search.slowlog.threshold.fetch.warn: 30s,超过这个时间的信息会被记录在日志文件中,path.logs 参数指定的目录中 cluster-name_index_fetch_slowlog.log 文件
  • 设置慢查询时间边界:index.search.slowlog.threshold.query.warn: 60s,超过这个时间的信息会被记录在日志文件中,path.logs 参数指定的目录中 cluster-name_index_search_slowlog.log 文件
  • 设置慢索引时间边界:index.search.slowlog.threshold.index.warn: 60s,超过这个时间的信息会被记录在日志文件中,path.logs 参数指定的目录中 cluster-name_index_indexing_slowlog.log 文件

script 脚本设置
rebalance 数据平衡
discovery 配置
indices.breaker 配置,熔断器,防止 oom

Spark

  • 序列化方式:spark.serializer,可以选择:org.apache.spark.serializer.KryoSerializer
  • executor 附加参数:spark.executor.extraJavaOptions,例如可以添加:-Dxx=yy【如果仅仅在 driver 端设置,executor 是不会有的】
  • driver 附加参数:spark.driver.extraJavaOptions,例如可以添加:-Dxx=yy
  • 日志配置文件设置,Spark 使用的是 log4j,默认在 Spark 安装目录的 conf 下面,如果想要增加 log4j 相关配置,更改 driver 机器上面的 log4j.properties 配置文件是无效的,必须把所有的 executor 节点上的配置文件全部更新。如果没有权限,也可以自己上传配置文件,然后需要在 executor 附加参数中指定:-Dlog4j.configuration=file:/path/to/file,启动 Spark 任务时还需要使用 --files 指定配置文件名称,多个用逗号分隔,用来上传配置文件到 Spark 节点
  • 开启允许多 SparkContext 存在:spark.driver.allowMultipleContexts,设置为 true 即可,在使用多个 SparkContext 时,需要先停用当前活跃的,使用 stop 方法【在 Spark v2.0 以及以上版本,已经取消了这个限制】
  • spark.executor.cores,每个执行器上面的占用核数,会消耗 CPU,一般设置为 2-3
  • spark.executor.memory,执行器上面的堆内存大小,一般设置为 2048M4096M
  • spark.port.maxRetries,提交任务的 Spark UI 重试次数
  • spark.default.parallelism,默认并行度
  • spark.cores.max,最大核心数
  • spark.executor.logs.rolling.strategy
  • spark.executor.logs.rolling.maxRetainedFiles
  • spark.executor.logs.rolling.size.maxBytes
  • spark.ui.showConsoleProgress

Kafka 输入数据源

Spark Streaming 配置:

  • spark.streaming.backpressure.enabled,开启反压机制
  • spark.streaming.backpressure.pid.minRate
  • spark.streaming.kafka.maxRatePerPartition,每个 partition 的最大读取速度,单位秒,一般设置 500-100 即可
  • spark.streaming.receiver.maxRatereceiver 最大处理数据量,单位秒,与 maxRatePerPartitionDurations 有关,实际运行时由于反压机制,数据处理速度会低于这个值
  • spark.streaming.receiver.writeAheadLog.enable
  • spark.streaming.stopGracefullyOnShutdown,优雅地退出
  • spark.streaming.gracefulStopTimeout
  • xx

Kakfa 配置:

  • metadata.broker.list
  • offsets.storage,设置为 kafka
  • zookeeper.connect
  • zookeeper.connection.timeout.ms
  • group.id
  • fetch.message.max.bytes
  • auto.offset.reset,消费的起始位置,这个参数高低版本之间的名称、值都会不同,需要注意
  • consumer.timeout.ms
  • rebalance.max.retries
  • rebalance.backoff.ms

集群参数

yarn 集群:

  • 待定

standalone 集群

  • 临时目录:SPARK_LOCAL_DIRS,用来存放 Spark 任务运行过程中的临时数据,例如内存不足时把数据缓存到磁盘,就会有数据写入这个目录,当然,在启动 Spark 任务时也可以单独指定,但是最好还是设置在集群上面,可以在 spark-env.sh 脚本中设置,键值对的形式,例如:SPARK_LOCAL_DIRS=/your_path/spark/local。需要注意的是,启动 Excutor 的用户必须有这个目录的写权限,并且保证这个目录的磁盘空间足够使用,否则在 Spark 任务中会出现异常:java.io.IOException: Failed to create local dir in xx,进而导致 Task 失败
  • Work 目录:SPARK_WORKER_DIR,用来存放 Work 的信息,设置方式同上面的 SPARK_LOCAL_DIRS

Storm

  • StormUI nimbus 内容传输大小限制:nimbus.thrift.max_buffer_size: 1048576,取值的单位是字节,默认为 1048576,如果 nimbus 汇报的内容过多,超过这个值,则在 StormUI 上面无法查看 Topology Summary 信息,会报错:Internal Server Error org.apache.thrift7.transport.TTransportException: Frame size (3052134) larger than max length (1048576)
  • 执行实例 worker 对应的端口号:supervisor.slots.ports:,可以设置多个,和 CPU 的核数一致,或者稍小,提高机器资源的使用率
  • workerJVM 参数:WORKER_GC_OPTS,取值参考:-Xms1G -Xmx5G -XX:+UseG1GC,根据集群机器的资源多少而定,G1 是一种垃圾回收器
  • supervisorJVM 参数:SUPERVISOR_GC_OPTS,取值参考:-Xms1G -Xmx5G -XX:+UseG1GC,根据集群机器的资源多少而定,G1 是一种垃圾回收器
  • Storm UI 的服务端口:ui.port,可以使用浏览器打开网页查看 Topology 详细信息
  • ZooKeeper 服务器列表:storm.zookeeper.servers
  • ZooKeeper 连接端口:storm.zookeeper.port
  • ZooKeeperStorm 的根目录位置:storm.zookeeper.root,用来存放 Storm 集群元信息
  • 客户端连接 ZooKeeper 超时时间:storm.zookeeper.session.timeout
  • Storm 使用的本地文件系统目录:storm.local.dir,注意此目录必须存在并且 Storm 进程有权限可读写
  • Storm 集群运行模式:storm.cluster.mode,取值可选:distributedlocal

Kafka

注意,Kafka 的不同版本参数名、参数值会有变化,特别是 v0.9.x 之后,与之前的低版本差异很大,例如数据游标的参数可以参考我的另外一篇博文:记录一个 Kafka 错误:OffsetOutOfRangeExceptionKafka 官网参见:Kafka-v0.9.0.x-configuration

配置优化都是修改 server.properties 文件中参数值。

  • JVM 参数:KAFKA_HEAP_OPTS,取值参考:-Xmx2G
  • 文件存放位置:log.dirs,多个使用逗号分隔,注意所有的 log 级别需要设置为 INFO
  • 单个 Topic 的文件保留策略:log.retention.hours=72【数据保留 72 小时,超过时旧数据被删除】,log.retention.bytes=1073741824【数据保留 1GB,超过时旧数据被删除】
  • 数据文件刷盘策略:log.flush.interval.messages=10000【每当 producer 写入 10000 条消息时,刷数据到磁盘】,log.flush.interval.ms=1000【每间隔 1 秒钟时间,刷数据到磁盘】
  • Topic 的分区数量:num.partitions=8
  • 启动 Fetch 线程给副本同步数据传输大小限制:replica.fetch.max.bytes=10485760,要比 message.max.bytes
  • message.max.bytes=10485700,这个参数决定了 broker 能够接收到的最大消息的大小,要比 max.request.size
  • max.request.size=10480000,这个参数决定了 producer 生产消息的大小
  • fetch.max.bytes=10485760,这个参数决定了 consumer 消费消息的大小,要比 message.max.bytes
  • broker 处理消息的最大线程数:num.network.threads=17,一般 num.network.threads 主要处理网络 IO,读写缓冲区数据,基本没有 IO 等待,配置线程数量为 CPU 核数加 1
  • broker 处理磁盘 IO 的线程数:num.io.threads=32num.io.threads 主要进行磁盘 IO 操作,高峰期可能有些 IO 等待,因此配置需要大些,配置线程数量为 CPU 核数 2 倍,最大不超过 3 倍
  • 强制新建一个 segment 的时间:log.roll.hour=72
  • 是否允许自动创建 Topicauto.create.topics.enable=true,如果设置为 false,则代码无法创建,需要通过 kafka 的命令创建 Topic
  • auto.offset.reset,关于数据游标的配置【earliestlatestsmallestlargest】,由于不同版本之间的差异,可以参考:记录一个 Kafka 错误:OffsetOutOfRangeException
  • advertised.host.nameadvertised.port,关于外网集群可以访问的配置,跨网络生产、消费数据,v082 以及之前的版本【之后的版本有保留这两个参数,但是不建议使用】
  • advertised.listenerslisteners,关于外网集群可以访问的配置,跨网络生产、消费数据,v090 以及之后的版本

留意参数取值大小的限制:fetch.max.bytes 大于 message.max.bytes 大于 max.request.sizereplica.fetch.max.bytes 大于 message.max.bytes 大于 max.request.size

Zookeeper

配置 zoo.cfg 文件:

  • dataDir,表示快照日志目录
  • dataLogDir,表示事务日志目录,不配置的时候事务日志目录同 dataDir
  • clientPort=2181,服务的监听端口
  • tickTime=2000Zookeeper 的时间单元,Zookeeper 中所有时间都是以这个时间单元的整数倍去配置的,例如,session 的最小超时时间是 2*tickTime【单位:毫秒】
  • syncLimit=5,表示 FollowerObserverLeader 交互时的最大等待时间,只不过是在与 leader 同步完毕之后,进入正常请求转发或 ping 等消息交互时的超时时间
  • initLimit=10ObserverFollower 启动时,从 Leader 同步最新数据时,Leader 允许 initLimit * tickTime 的时间内完成,如果同步的数据量很大,可以相应地把这个值设置大一些
  • maxClientCnxns=384,最大并发客户端数,用于防止 Ddos 的,默认值是 10,设置为 0 是不加限制
  • maxSessionTimeout=120000Session 超时时间限制,如果客户端设置的超时时间不在这个范围,那么会被强制设置一个最大时间,默认的 Session 超时时间是在 2 * tickTime ~ 20 * tickTime 这个范围
  • minSessionTimeout=4000,同 maxSessionTimeout
  • server.x=hostname:2888:3888x 是一个数字,与每个服务器的 myid 文件中的 id 是一样的,hostname 是服务器的 hostname,右边配置两个端口,第一个端口用于 FollowerLeader 之间的数据同步和其它通信,第二个端口用于 Leader 选举过程中投票通信
  • autopurge.purgeInterval=24,在 v3.4.0 及之后的版本,Zookeeper 提供了自动清理事务日志文件和快照日志文件的功能,这个参数指定了清理频率,单位是小时,需要配置一个 1 或更大的整数。默认是 0,表示不开启自动清理功能
  • autopurge.snapRetainCount=30,参数指定了需要保留的事务日志文件和快照日志文件的数目,默认是保留 3 个,和 autopurge.purgeInterval 搭配使用
虾丸派 wechat
扫一扫添加博主,进技术交流群,共同学习进步
永不止步
0%