大数据平台框架常用参数优化

本文记录大数据平台框架的一些常用参数,这些参数基本是我见过的或者实际使用过的,我会列出参数的含义以及使用效果,具有一定的参考意义。当然,根据实际的场景不同,参数值并不能随便设置为一样,必须要考虑到实际的情况,否则可能没有效果,或者具有反作用。

会保持更新。

Hadoop

选择 HBaseHadoop 时注意版本适配的问题,Hadoop 选择 v2.7.1 还是很好的,能适配 HBase v1.2.x 以及以上的版本【Hbase 兼容的 Hadoop 版本参见:hbase-configuration 】,也能适配 Hive v0.10.0 以及以上的版本【Hive 兼容的 Hadoop 版本参见:hive-downloads 】。

HDFS

  • fs.hdfs.impl.disable.cache,如果设置为 true,表示关闭文件系统的缓存,这样多线程手动处理 HDFS 文件时,不会 IOException: Filesystem closed

MapReduce

配置文件:mapred-site.xml

Yarn 资源模型:

1
2
3
4
Node Manager -> yarn.nodemanager.resource.memory-mb
YARN container -> yarn.scheduler.minimum-allocation-mb、yarn.scheduler.maximum-allocation-mb
Mapper/Reducer -> mapreduce.map.memory.mb、mapreduce.reduce.memory.mb
JVM -> mapred.map.child.java.opts、mapred.reduce.child.java.opts

对于内存参数的配置,注意它们之间的受限关系,取值不能乱设置,总体来说越具体的参数取值越小,例如常见的一般把 mapreduce.map.java.opts 的值配置成 mapreduce.map.memory.mb * 0.9

  • map 并发大小:mapreduce.job.running.map.limit,可以设置大点,50、100 随便
  • map 内存大小:mapreduce.map.memory.mb,单位为 MB,一般 4GB 够用
  • reduce 启动延迟:mapred.reduce.slowstart.completed.maps,表示 reducemap 执行到什么程度可以启动,例如设置为 1.0 表示等待 map 全部完成后才能执行 reduce
  • reduce 内存大小:mapreduce.reduce.memory.mb,单位为 MB,要根据实际情况设置,一般 4GB 够用
  • reduce 虚拟内存:yarn.nodemanager.vmem-pmem-ratio,一般 2-5 即可
  • reduce 并发大小:mapreduce.job.running.reduce.limit,一般 5-10 个够用【根据业务场景、机器资源而定】
  • mapred.map.child.java.optsMapJVM 参数,例如:-Xmx200m
  • mapred.reduce.child.java.optsReduceJVM 参数,例如:-Xmx200m
  • mapreduce.admin.map.child.java.opts,作用同 mapred.map.child.java.opts,优先级最高,会覆盖掉用户设置的
  • mapreduce.admin.reduce.child.java.opts,作用同 mapred.reduce.child.java.opts,优先级最高,会覆盖掉用户设置的

es-hadoop

使用 es-hadoop 框架处理 Elasticsearch 数据,可以专注于数据 ETL 处理逻辑,其它与集群交互的读写操作交给 es-hadoop 框架处理,这里面有一些常用的参数。

参考官方文档:configuration

  • 读取,只读取指定的字段:es.read.field.include,默认为空,读取全部字段,注意,在 query 中设置 _source 是无效的
  • 读取,排除指定的字段:es.read.field.exclude,默认为空,则不排除任何字段
  • 读取,关闭日期的处理:es.mapping.date.rich,默认为 true,关闭后,读取 Elasticsearchdate 类型的字段,会自动转换为 long 类型,不再是 date 类型
  • 读取,解析指定字段为数组类型:es.read.field.as.array.include,默认为空,则不解析任何字段【字段类型保持原样】
  • 读取,排除解析指定字段为数组字段:es.read.field.as.array.exclude,默认为空,则不排除任何字段【字段该是数组的还是数组,不是数组的仍旧保持原样】

yarn

配置文件:mapred-site.xml

  • yarn.nodemanager.local-dirs,临时目录

HBase

架构图

如下

架构图

部分知识点

1、以下内容是关于 major compaction【大合并】、minor compaction【小合并】 的说明。

minor compaction 操作只用来做部分文件【触发时相关的几个 StoreFile 文件】的合并操作,不做任何清除数据、多版本数据清理工作。

major compaction 操作是对一个 Region 下的 HStore 下的所有 StoreFile 执行合并操作,最终的结果是整理合并出一个文件。此过程会真正删除标记为需要清理的数据,而且会消耗大量的磁盘 IO、网络 IO,甚至导致部分节点无法响应,严重影响读写性能,读请求会变慢,写请求会被阻塞。

major compaction 的操作目的:

  • 合并文件
  • 真正清除标记为删除、过期、多余版本的数据【minor compaction 并不会真正清除数据】
  • 提高读写数据的效率,当然,由于磁盘 IO、网络 IO 的消耗,此操作过程会严重影响读写性能,读请求会变慢,写请求会被阻塞【有时候甚至会降低 10 倍】

一般情况下,HBase 集群的 major compact 都是关闭的,如果开启默认是 7 天执行一次,因此离线的 major compact 是必要的,可以定期手动触发,可以使用 major_compact 表名称 对某个表进行操作。如果手动触发,操作命令很快就返回结果,但是后台操作其实一直在运行,可以通过 grafana 监控查看压缩队列的长度,当压缩队列长度超过 100 的时候,应该延迟操作。由于 major compact 是很重的后台操作,因此操作之前需要有仔细的观察和分析,例如通过 grafana -> HBase 监控可以获得,关于触发时期的选择建议:

  • 业务低峰时段运行,即读写请求不大的时候,可以避免影响正常的业务
  • 分表执行【或者分 Region 执行】,不要整个集群集体执行,并且优先考虑含有 TTL 的表
  • StoreFile 短期内增加比较多的时候
  • 表中 StoreFile 平均大小比较小的时候

2、以下内容是关于租约时间的说明。

参考官网的配置示例、异常信息:default_configurations

一些默认配置:hbase-default.xml

关于客户端和 Regionserver 之间的租约时间【LeaseException】,所谓租约,是指 Hbase client 端每次和 Regionserver 交互的时候,都会在服务器端生成一个租约【Lease】,租约的有效期由参数 hbase.client.scanner.timeout.period 指定,默认的租约有效时间是 60 秒。

scan 操作过程中,客户端去 Regionserver 取数据的时候,Hbase 中存的数据量很大并且有很多 Region 的时候的,客户端请求的 Region 不在内存中,或是没有被 cache 住,需要从磁盘中加载。如果这时候加载过程需要的时间超过 hbase.client.scanner.timeout.period 所配置的时间,并且客户端没有向 Regionserver 报告其还活着,那么 Regionserver 就会认为本次租约已经过期,并从 LeaseQueue 中从删除掉本次租约。此后,当 Regionserver 加载完成后,拿已经被删除的租约再去取数据的时候,就会出现如下的错误现象。

异常示例:

1
org.apache.hadoop.hbase.regionserver.LeaseException: lease '-8841369309248784313' does not exist

一般的做法,就是在配置文件中增大 hbase.client.scanner.timeout.period 的时间,但也不能忽略 RPC 连接的超时问题,所以在增大 hbase.client.scanner.timeout.period 的时候应该同时增大 hbase.rpc.timeout,同时注意 hbase.rpc.timeout 的值应该等于或大于 hbase.client.scanner.timeout.period 的值。

很多人都会误认为一次 Scan 操作就是一次 RPC 请求,其实是不对的。实际上,一次请求大量数据的 Scan 操作可能会导致多个很严重的后果:服务器端可能因为大量 IO 操作导致 IO 利用率很高,影响其它正常的业务请求;大量数据传输会导致网络带宽等系统资源被大量占用;客户端也可能因为内存无法缓存这些数据导致 OOM。基于此,HBase 会将一次大的 Scan 操作根据设置条件拆分为多个 RPC 请求,每次只返回规定数量的结果。代码示例 ResultScanner rs = table.getScanner (scan); foreach (Result r :rs){...} 语句实际上等价于 ResultScanner rs = table.getScanner (scan); Result r = rs.next (),每执行一次 next () 操作就会调用客户端发送一次 RPC 请求,参数 hbase.client.scanner.timeout.period 就用来表示这么一次 RPC 请求的超时时间,默认为 60000ms,一旦请求超时,就会抛出 SocketTimeoutException 异常。

相关配置项

HBase 相关配置说明:

  • hbase.hregion.majorcompactionHBase 自动做 major compaction 的周期,会严重影响写入性能,建议定期手动做
  • hbase.hregion.majorcompaction=0,关闭定期的 major compaction 操作,必要时只能手动执行
  • hbase.client.retries.number,客户端连接重试次数,建议设置大一点,例如 24
  • hbase.rootdirHBaseHDFS 中的根目录
  • zookeeper.znode.parentHBaseZookeeper 的根目录,例如使用 Phoenix 登录时需要
  • hbase.hregion.max.filesize,设置 HBase 分区大小,超过此值时自动分裂,避免一个分区过大,默认值 10GB【10737418240B】,在创建表时合理预估数据大小,预设置合理的分区规则【利用 rowkey】,可以避免频繁分裂,也使数据分布更加均匀
  • hbase.rpc.timeoutRPC 连接失效时间,默认 60 秒
  • hbase.client.scanner.timeout.period,客户端和 Regionserver 之间租约过期的时间,默认是 60 秒,注意:参数 hbase.regionserver.lease.period 已经不建议使用
  • hbase.client.scanner.caching,设置 HBase Scanner 一次从服务端读取的数据条数,默认情况下 1 次 1 条,通过将其设置成一个合理的值【例如 100-500 之间的数字】,可以减少 Scan 过程中 next () 的时间开销,代价是 Scanner 需要通过客户端的内存来维持这些被 cache 的行记录
  • hbase.rootdir,这个目录是 region server 的共享目录,用来持久化 HBase
  • hbase.master.portHBaseMaster 的端口,默认: 60000
  • hbase.cluster.distributedHBase 的运行模式,false 表示单机模式,true 表示分布式模式
  • hbase.tmp.dir,本地文件系统的临时文件夹,可以设置为一个更为持久的目录上【/tmp 在重启时会被清楚】,默认:${java.io.tmpdir}/hbase-${user.name}
  • hbase.local.dir,作为本地存储,位于本地文件系统的路径,默认:${hbase.tmp.dir}/local/

Elasticsearch

总述,在设置 Elasticsearch 堆大小时需要通过 $ES_HEAP_SIZE 环境变量,遵循两个规则:

  • 不要超过可用 RAM 的 50%,Lucene 能很好利用文件系统的缓存,它是通过系统内核管理的,如果没有足够的文件系统缓存空间,性能会受到影响。 此外,专门用于堆的内存越多意味着其它可用的内存越少,例如 fielddata
  • 不要超过 32GB,如果堆大小小于 32GBJVM 可以利用指针压缩,这可以大大降低内存的使用,每个指针是 4 字节而不是 8 字节

分片的分配:shards-allocation
脚本的使用:modules-scripting-using
熔断器相关:circuit-breaker
节点选举、故障检测:modules-discovery-zen

fielddata,对字段进行 agg 时,会把数据加载到内存中【索引数据时不会】,记录的是占用内存空间情况,超过指定的值,开始回收内存,防止 OOM

allocate 表示分片的分配【第一次分配、负载均衡过程中的再次分配】;relocate 【负载均衡过程中的再次分配】表示分片再次进行 allocateRecovery 表示将一个索引的未分配 shard allocate 到一个结点的过程,在快照恢复、更改索引副本数量、结点故障、结点启动时发生。

Elasticsearch 慢查询日志、慢索引日志等一些配置信息只在 master 节点配置即可,不需要每个节点都配置。

如果设置索引副本数为 1,同一个索引的主分片、副本分片不会被分配在同一个节点上面,这才能保证数据高可用,挂了一个节点也没关系【如果一台物理节点开启了两个 Elasticsearch 节点,需要注意使用 cluster.routing.allocation.same_shard.host 参数】。

  • 磁盘空间使用占比上限:cluster.routing.allocation.disk.watermark.high,默认为 90%,表示如果当前节点的磁盘使用占比超过这个值,则分片【针对所有类型的分片:主分片、副本分片】会被自动 relocate 到其它节点,并且任何分片都不会 allocate 到当前节点【此外,对于新创建的 primary 分片也是如此,除非整个 Elasticsearch 集群只有一个节点了】
  • 磁盘空间使用占比下限:cluster.routing.allocation.disk.watermark.low,默认为 85%,表示如果当前节点的磁盘使用占比超过这个值,则分片【新创建的 primary 分片、从来没有进行过 allocate 的分片除外】不会被 allocate 到当前节点
  • 索引的分片副本数:number_of_replicas,一般设置为 1,表示总共有 2 份数据
  • index.auto_expand_replicas:副本数自动扩展,会根据可用 Elasticsearch 节点数来设置副本数,默认为 false,可以设置为 0-all0-5 等等
  • 每个节点分配的分片个数:total_shards_per_node,一般设置为 2,一个节点只分配 2 个分片,分别为主分片、副本分片
  • 索引的分片个数:number_of_shards,当索引数据很大时,一般设置为节点个数【例如 索引数据大小 / 50GB 大于节点个数,例如 10 个节点,索引大小 800GB,此时按照官方建议应该设置 16 个分片,但是分片过多也不好,就可以设置 10 个分片,每个分片大小 80GB】,再配合 分片副本数为 1 每个节点分配的分片个数为 2,就可以确保分片分配在所有的节点上面,并且每个节点上有 2 个分片,分别为主分片、副本分片
  • 数据刷新时间:refresh_interval,表示数据写入后等待多久可以被搜索到,默认值 1s,每次索引的 refresh 会产生一个新的 lucene 段,这会导致频繁的合并行为,如果业务需求对实时性要求没那么高,可以将此参数调大,例如调整为 60s,会大大降低 cpu 的使用率
  • 索引的分片大小,官方建议是每个分片大小在 30GB50GB 不要超过 50GB,所以当索引的数据很大时,就要考虑增加分片的数量
  • 设置 terms 最大个数:index.max_terms_count,默认最大个数为 65535,可以根据集群情况降低,例如设置为 10000,为了集群稳定,一般不需要设置那么大,索引级别的参数
  • 设置 Boolean Query 的子语句数量:indices.query.bool.max_clause_count,默认为 1024,不建议增大这个值,也可以根据集群情况适当减小,集群级别的参数
  • 查看热点线程:http://your_ip:your_port/_nodes/your_node_name/hot_threads,可以判断热点线程是 searchbulk,还是 merge,从而进一步分析是查询还是写入导致负载过高
  • 数据目录:path.data: /path/to/data,多个目录使用逗号分隔,里面存放数据文件
  • 日志目录:path.logs: /path/to/logs,里面存放的是节点的日志、慢查询日志、慢索引日志
  • 家目录:path.home: /path/to/homeelasticsearch 的家目录,里面有插件、lib、配置文件等
  • 插件目录:path.plugins: /path/to/plugins,插件目录,里面存放的是插件,例如:分词器
  • 设置慢获取时间边界:index.search.slowlog.threshold.fetch.warn: 30s,超过这个时间的信息会被记录在日志文件中,path.logs 参数指定的目录中 cluster-name_index_fetch_slowlog.log 文件
  • 设置慢查询时间边界:index.search.slowlog.threshold.query.warn: 60s,超过这个时间的信息会被记录在日志文件中,path.logs 参数指定的目录中 cluster-name_index_search_slowlog.log 文件
  • 设置慢索引时间边界:index.search.slowlog.threshold.index.warn: 60s,超过这个时间的信息会被记录在日志文件中,path.logs 参数指定的目录中 cluster-name_index_indexing_slowlog.log 文件
  • 禁止集群重分配:cluster.routing.allocation.enable=none,手动操作分片前需要关闭,否则会引起分片的移动,造成不必要的 IO
  • 开启集群重分配:cluster.routing.allocation.enable=all,集群的分片管理权限交由集群,保持数据均衡
  • 设置集群均衡分片时可以同时 rebalance 分片的个数,cluster.routing.allocation.cluster_concurrent_rebalance:2,不宜设置过大,一般 2-4 个为好,当然如果集群资源足够或者需要快速均衡分片,可以设置大一点
  • 允许分片分配,cluster.routing.allocation.enable=all,开启后分片的分配交由集群管理,如果偶尔需要手动管理分片或者集群停机重启,可以临时关闭,取值设置为 none 即可
  • 推迟索引的分片分配时间,在分片节点出故障或者重启时,可以避免分片数据的移动,前提是及时把节点恢复:index.unassigned.node_left.delayed_timeout=5m,通俗点说,就是趁分片不注意,节点已经恢复了,此时数据分片保持不变,避免了不必要的 IO
  • index.max_slices_per_scroll,除了传统的 scroll 读取数据的方式,v5.x 之后 Elasticsearch 又增加了对每个分片读取数据的功能,称之为切片处理【sliced scroll】,这种读取方式可以对多个分片并行读取数据,大大提高了取数效率,elasticsearch-hadoop 就是采用这种方式读取数据的。但是,这里面有一个限制,Elasticsearch 默认一个 scroll 最大的切片数量为 1024【一般小于等于分片数,也可以通过指定切片字段来创建大于分片数的切片】,可以通过 index.max_slices_per_scroll 参数来变更【不建议更改】
  • cluster.routing.allocation.same_shard.host,在单台物理节点配置多个 Elasticsearch 实例时,这个参数才生效,用来检查同一个分片的多个实例【主分片、副本分片】是否能分配在同一台主机上面,默认值为 false。如果设置为 true,表示开启检查机制,一台物理机上面启动 2 个 Elasticsearch 节点,则分配相同编号的分片时,不会都在这台机器上面,尽管可以满足主分片、副本分片不在同一个 Elasticsearch 节点上
  • script.groovy.sandbox.enabled: false,禁用 Grovvy 脚本,默认是关闭的
  • script.inline: false,允许使用内置 painless 脚本
  • script.stored: false,允许使用保存在 config/scripts 中的脚本,调用时使用 id 即可,类似方法名
  • script.file: false,允许使用外部脚本文件
  • http.port: 9200,集群的 HTTP 端口号
  • transport.tcp.port: 9300,集群的 TCP 端口号
  • thread_pool.bulk.queue_size: 1500bulk 队列的大小
  • indices.breaker.total.use_real_memory: true,熔断器回收内存,防止 OOM,决定父熔断器是考虑实际内存使用情况,还是仅考虑子熔断器内存使用情况
  • indices.breaker.total.limit: 70%,熔断器回收内存,防止 OOM,当 use_real_memorytrue 时,设置为 70%,否则默认为 70%
  • indices.breaker.fielddata.limit: 40%,熔断器回收内存,防止 OOM,默认 40%
  • indices.breaker.request.limit: 60%,熔断器回收内存,防止 OOM,默认 60%
  • indices.fielddata.cache.size,可以设置 20%,要低于 fielddata.limit,默认无界限
  • discovery.zen.fd.ping_timeout: 60s,集群故障检测
  • discovery.zen.fd.ping_interval: 10s,集群故障检测
  • discovery.zen.fd.ping_retries: 10,集群故障检测
    • discovery.zen.master_election.ignore_non_master_pings: true,选举主节点,设置为 true 时非 node.master 节点不能参与选举,投票也无效
  • discovery.zen.minimum_master_nodes: 2,选举主节点,最少有多少个备选主节点参加选举,防止脑裂现象
  • discovery.zen.ping_timeout: 10s,选举主节点
  • discovery.zen.ping.unicast.hosts: ["ip1:port","ip2:port"],选举主节点,主机列表
  • node.data: true,数据节点
  • node.master: true,有资格被选举为主节点
  • action.destructive_requires_name=true,设置严格校验,对于删除数据、删除索引的破坏性行为进行严格校验,不支持通配符,防止类似于 rm -rf /* 的悲剧
  • network.host,绑定主机名或者 ip 地址,用于向集群广播自己
  • cluster.routing.allocation.exclude._ip,临时下线节点,类似于黑名单,分片不会往指定的主机移动,同时会把分片从指定的节点全部移除,最终可以下线该节点,可通过 put transient 设置临时生效
  • cluster.routing.allocation.disk.watermark.flood_stage,磁盘使用上限,超过则对应的索引被自动设置为只读模式,索引属性:index.blocks.read_only_allow_delete

当然,除了关注 Elasticsearch 本身的配置之外,如果在实际应用中使用了 Nginx 作为中间代理,此时需要注意 Nginx 的配置,它也会直接影响着请求。例如;client_max_body_size,影响着请求的请求体大小,如果参数设置过小则直接无法通过 Nginx 转发,也就无法到达 Elasticsearch 了,最终请求会失败【如果是写入大量的数据场景,请求体会很大,可能有几兆,则写入失败,一般的查询请求倒是很小】。

Spark

v1.6.2 官方说明文档

  • 序列化方式:spark.serializer,可以选择:org.apache.spark.serializer.KryoSerializer
  • executor 附加参数:spark.executor.extraJavaOptions,例如可以添加:-Dxx=yy【如果仅仅在 driver 端设置,executor 是不会有的】
  • driver 附加参数:spark.driver.extraJavaOptions,例如可以添加:-Dxx=yy
  • 日志配置文件设置,Spark 使用的是 log4j,默认在 Spark 安装目录的 conf 下面,如果想要增加 log4j 相关配置,更改 driver 机器上面的 log4j.properties 配置文件是无效的,必须把所有的 executor 节点上的配置文件全部更新。如果没有权限,也可以自己上传配置文件,然后需要在 executor 附加参数中指定:-Dlog4j.configuration=file:/path/to/file,启动 Spark 任务时还需要使用 --files 指定配置文件名称,多个用逗号分隔,用来上传配置文件到 Spark 节点
  • 开启允许多 SparkContext 存在:spark.driver.allowMultipleContexts,设置为 true 即可,在使用多个 SparkContext 时,需要先停用当前活跃的,使用 stop 方法【在 Spark v2.0 以及以上版本,已经取消了这个限制】
  • spark.executor.cores,每个执行器上面的占用核数,会消耗 CPU,一般设置为 2-3
  • spark.executor.memory,执行器上面的堆内存大小,一般设置为 2048M4096M
  • spark.port.maxRetries,提交任务的 Spark UI 重试次数
  • spark.default.parallelism,默认并行度
  • spark.cores.max,最大核心数
  • spark.executor.logs.rolling.strategy
  • spark.executor.logs.rolling.maxRetainedFiles
  • spark.executor.logs.rolling.size.maxBytes
  • spark.ui.showConsoleProgress
  • spark.yarn.max.executor.failurestask 失败重试次数,默认为 spark.executor.cores 的 2 倍,最小值为 3,如果重试最大次数后 task 仍旧失败,则整个 Application 执行失败【容错性】
  • spark.yarn.maxAppAttempts,提交申请的最大尝试次数,小于等于 yarn 配置中的全局最大尝试次数,尝试最大次数后仍旧无法提交,则 Application 提交失败【yarn 配置为 yarn.resourcemanager.am.max-attempts,默认为 2,即有 2 次提交机会】

Kafka 输入数据源

Spark Streaming 配置:

  • spark.streaming.backpressure.enabled,开启反压机制
  • spark.streaming.backpressure.pid.minRate
  • spark.streaming.kafka.maxRatePerPartition,每个 partition 的最大读取速度,单位秒,一般设置 500-100 即可
  • spark.streaming.receiver.maxRatereceiver 最大处理数据量,单位秒,与 maxRatePerPartitionDurations 有关,实际运行时由于反压机制,数据处理速度会低于这个值
  • spark.streaming.receiver.writeAheadLog.enable
  • spark.streaming.stopGracefullyOnShutdown,优雅地退出
  • spark.streaming.gracefulStopTimeout
  • xx

Kakfa 配置:

  • metadata.broker.list
  • offsets.storage,设置为 kafka
  • zookeeper.connect
  • zookeeper.connection.timeout.ms
  • group.id
  • fetch.message.max.bytes
  • auto.offset.reset,消费的起始位置,这个参数高低版本之间的名称、值都会不同,需要注意
  • consumer.timeout.ms
  • rebalance.max.retries
  • rebalance.backoff.ms

集群参数

yarn 集群:

  • yarn.resourcemanager.am.max-attempts,最大应用尝试次数,它是所有 AM 的全局设置,每个应用都可以通过 API 的参数指定其各自的最大应用尝试次数【参数 spark.yarn.maxAppAttempts】,但是单个数字不能超过这个全局上限,如果超过了,资源管理器将覆盖它。默认数量设置为 2,以允许至少 1 次重试,即有 2 次提交的机会

standalone 集群

  • 临时目录:SPARK_LOCAL_DIRS,用来存放 Spark 任务运行过程中的临时数据,例如内存不足时把数据缓存到磁盘,就会有数据写入这个目录,当然,在启动 Spark 任务时也可以单独指定,但是最好还是设置在集群上面,可以在 spark-env.sh 脚本中设置,键值对的形式,例如:SPARK_LOCAL_DIRS=/your_path/spark/local。需要注意的是,启动 Excutor 的用户必须有这个目录的写权限,并且保证这个目录的磁盘空间足够使用,否则在 Spark 任务中会出现异常:java.io.IOException: Failed to create local dir in xx,进而导致 Task 失败
  • Work 目录:SPARK_WORKER_DIR,用来存放 Work 的信息,设置方式同上面的 SPARK_LOCAL_DIRS,如果 Spark 任务里面有 System.out (),输出的内容在此目录下
  • SPARK_LOG_DIRSpark 集群自身的日志文件,例如 Work 接收 Spark 任务后通信的内容

Storm

  • StormUI nimbus 内容传输大小限制:nimbus.thrift.max_buffer_size: 1048576,取值的单位是字节,默认为 1048576,如果 nimbus 汇报的内容过多,超过这个值,则在 StormUI 上面无法查看 Topology Summary 信息,会报错:Internal Server Error org.apache.thrift7.transport.TTransportException: Frame size (3052134) larger than max length (1048576)
  • 执行实例 worker 对应的端口号:supervisor.slots.ports:,可以设置多个,和 CPU 的核数一致,或者稍小,提高机器资源的使用率
  • workerJVM 参数:WORKER_GC_OPTS,取值参考:-Xms1G -Xmx5G -XX:+UseG1GC,根据集群机器的资源多少而定,G1 是一种垃圾回收器
  • supervisorJVM 参数:SUPERVISOR_GC_OPTS,取值参考:-Xms1G -Xmx5G -XX:+UseG1GC,根据集群机器的资源多少而定,G1 是一种垃圾回收器
  • Storm UI 的服务端口:ui.port,可以使用浏览器打开网页查看 Topology 详细信息
  • ZooKeeper 服务器列表:storm.zookeeper.servers
  • ZooKeeper 连接端口:storm.zookeeper.port
  • ZooKeeperStorm 的根目录位置:storm.zookeeper.root,用来存放 Storm 集群元信息
  • 客户端连接 ZooKeeper 超时时间:storm.zookeeper.session.timeout
  • Storm 使用的本地文件系统目录:storm.local.dir,注意此目录必须存在并且 Storm 进程有权限可读写
  • Storm 集群运行模式:storm.cluster.mode,取值可选:distributedlocal

Kafka

注意,Kafka 的不同版本参数名、参数值会有变化,特别是 v0.9.x 之后,与之前的低版本差异很大,例如数据游标的参数可以参考我的另外一篇博文:记录一个 Kafka 错误:OffsetOutOfRangeExceptionKafka 官网参见:Kafka-v0.9.0.x-configuration

配置优化都是修改 server.properties 文件中参数值。

  • JVM 参数:KAFKA_HEAP_OPTS,取值参考:-Xmx2G
  • 文件存放位置:log.dirs,多个使用逗号分隔,注意所有的 log 级别需要设置为 INFO
  • 单个 Topic 的文件保留策略:log.retention.hours=72【数据保留 72 小时,超过时旧数据被删除】,log.retention.bytes=1073741824【数据保留 1GB,超过时旧数据被删除】
  • 数据文件刷盘策略:log.flush.interval.messages=10000【每当 producer 写入 10000 条消息时,刷数据到磁盘】,log.flush.interval.ms=1000【每间隔 1 秒钟时间,刷数据到磁盘】
  • Topic 的分区数量:num.partitions=8
  • 启动 Fetch 线程给副本同步数据传输大小限制:replica.fetch.max.bytes=10485760,要比 message.max.bytes
  • message.max.bytes=10485700,这个参数决定了 broker 能够接收到的最大消息的大小,要比 max.request.size
  • max.request.size=10480000,这个参数决定了 producer 生产消息的大小
  • fetch.max.bytes=10485760,这个参数决定了 consumer 消费消息的大小,要比 message.max.bytes
  • broker 处理消息的最大线程数:num.network.threads=17,一般 num.network.threads 主要处理网络 IO,读写缓冲区数据,基本没有 IO 等待,配置线程数量为 CPU 核数加 1
  • broker 处理磁盘 IO 的线程数:num.io.threads=32num.io.threads 主要进行磁盘 IO 操作,高峰期可能有些 IO 等待,因此配置需要大些,配置线程数量为 CPU 核数 2 倍,最大不超过 3 倍
  • 强制新建一个 segment 的时间:log.roll.hour=72
  • 是否允许自动创建 Topicauto.create.topics.enable=true,如果设置为 false,则代码无法创建,需要通过 kafka 的命令创建 Topic
  • auto.offset.reset,关于数据游标的配置【earliestlatestsmallestlargest】,由于不同版本之间的差异,可以参考:记录一个 Kafka 错误:OffsetOutOfRangeException
  • advertised.host.nameadvertised.port,关于外网集群可以访问的配置,跨网络生产、消费数据,v082 以及之前的版本【之后的版本有保留这两个参数,但是不建议使用】
  • advertised.listenerslisteners,关于外网集群可以访问的配置,跨网络生产、消费数据,v090 以及之后的版本
  • kafka-topics.sh --create -zookeeper xx:2181 --replication-factor 2 --partitions 2 --topic topic_xxkafka 创建 topic,里面的副本数量参数 --replication-factor,和 Elasticsearch 的含义不一样,不仅仅是指备份的数据量,而是总体的副本数,所以设置为 1 等价于没有副本。

留意参数取值大小的限制:fetch.max.bytes 大于 message.max.bytes 大于 max.request.sizereplica.fetch.max.bytes 大于 message.max.bytes 大于 max.request.size

Zookeeper

配置 zoo.cfg 文件:

  • dataDir,表示快照日志目录
  • dataLogDir,表示事务日志目录,不配置的时候事务日志目录同 dataDir
  • clientPort=2181,服务的监听端口
  • tickTime=2000Zookeeper 的时间单元,Zookeeper 中所有时间都是以这个时间单元的整数倍去配置的,例如,session 的最小超时时间是 2*tickTime【单位:毫秒】
  • syncLimit=5,表示 FollowerObserverLeader 交互时的最大等待时间,只不过是在与 leader 同步完毕之后,进入正常请求转发或 ping 等消息交互时的超时时间
  • initLimit=10ObserverFollower 启动时,从 Leader 同步最新数据时,Leader 允许 initLimit * tickTime 的时间内完成,如果同步的数据量很大,可以相应地把这个值设置大一些
  • maxClientCnxns=384,最大并发客户端数,用于防止 Ddos 的,默认值是 10,设置为 0 是不加限制
  • maxSessionTimeout=120000Session 超时时间限制,如果客户端设置的超时时间不在这个范围,那么会被强制设置一个最大时间,默认的 Session 超时时间是在 2 * tickTime ~ 20 * tickTime 这个范围
  • minSessionTimeout=4000,同 maxSessionTimeout
  • server.x=hostname:2888:3888x 是一个数字,与每个服务器的 myid 文件中的 id 是一样的,hostname 是服务器的 hostname,右边配置两个端口,第一个端口用于 FollowerLeader 之间的数据同步和其它通信,第二个端口用于 Leader 选举过程中投票通信
  • autopurge.purgeInterval=24,在 v3.4.0 及之后的版本,Zookeeper 提供了自动清理事务日志文件和快照日志文件的功能,这个参数指定了清理频率,单位是小时,需要配置一个 1 或更大的整数。默认是 0,表示不开启自动清理功能
  • autopurge.snapRetainCount=30,参数指定了需要保留的事务日志文件和快照日志文件的数目,默认是保留 3 个,和 autopurge.purgeInterval 搭配使用
虾丸派 wechat
扫一扫添加博主,进技术交流群,共同学习进步
永不止步
0%