本文记录大数据平台框架的一些常用参数,这些参数基本是我见过的或者实际使用过的,我会列出参数的含义以及使用效果,具有一定的参考意义。当然,根据实际的场景不同,参数值并不能随便设置为一样,必须要考虑到实际的情况,否则可能没有效果,或者具有反作用。
会保持更新。
Hadoop
选择 HBase、Hadoop 时注意版本适配的问题,Hadoop 选择 v2.7.1 还是很好的,能适配 HBase v1.2.x 以及以上的版本【Hbase 兼容的 Hadoop 版本参见:hbase-configuration 】,也能适配 Hive v0.10.0 以及以上的版本【Hive 兼容的 Hadoop 版本参见:hive-downloads 】。
HDFS
fs.hdfs.impl.disable.cache,如果设置为true,表示关闭文件系统的缓存,这样多线程手动处理HDFS文件时,不会IOException: Filesystem closed
MapReduce
配置文件:mapred-site.xml。
Yarn 资源模型:
1 | Node Manager -> yarn.nodemanager.resource.memory-mb |
对于内存参数的配置,注意它们之间的受限关系,取值不能乱设置,总体来说越具体的参数取值越小,例如常见的一般把 mapreduce.map.java.opts 的值配置成 mapreduce.map.memory.mb * 0.9 。
map并发大小:mapreduce.job.running.map.limit,可以设置大点,50、100 随便map内存大小:mapreduce.map.memory.mb,单位为MB,一般4GB够用reduce启动延迟:mapred.reduce.slowstart.completed.maps,表示reduce在map执行到什么程度可以启动,例如设置为1.0表示等待map全部完成后才能执行reducereduce内存大小:mapreduce.reduce.memory.mb,单位为MB,要根据实际情况设置,一般4GB够用reduce虚拟内存:yarn.nodemanager.vmem-pmem-ratio,一般 2-5 即可reduce并发大小:mapreduce.job.running.reduce.limit,一般 5-10 个够用【根据业务场景、机器资源而定】mapred.map.child.java.opts,Map的JVM参数,例如:-Xmx200mmapred.reduce.child.java.opts,Reduce的JVM参数,例如:-Xmx200mmapreduce.admin.map.child.java.opts,作用同mapred.map.child.java.opts,优先级最高,会覆盖掉用户设置的mapreduce.admin.reduce.child.java.opts,作用同mapred.reduce.child.java.opts,优先级最高,会覆盖掉用户设置的
es-hadoop
使用 es-hadoop 框架处理 Elasticsearch 数据,可以专注于数据 ETL 处理逻辑,其它与集群交互的读写操作交给 es-hadoop 框架处理,这里面有一些常用的参数。
参考官方文档:configuration 。
- 读取,只读取指定的字段:
es.read.field.include,默认为空,读取全部字段,注意,在query中设置_source是无效的 - 读取,排除指定的字段:
es.read.field.exclude,默认为空,则不排除任何字段 - 读取,关闭日期的处理:
es.mapping.date.rich,默认为true,关闭后,读取Elasticsearch的date类型的字段,会自动转换为long类型,不再是date类型 - 读取,解析指定字段为数组类型:
es.read.field.as.array.include,默认为空,则不解析任何字段【字段类型保持原样】 - 读取,排除解析指定字段为数组字段:
es.read.field.as.array.exclude,默认为空,则不排除任何字段【字段该是数组的还是数组,不是数组的仍旧保持原样】
yarn
配置文件:mapred-site.xml 。
yarn.nodemanager.local-dirs,临时目录
HBase
架构图
如下

部分知识点
1、以下内容是关于 major compaction【大合并】、minor compaction【小合并】 的说明。
minor compaction 操作只用来做部分文件【触发时相关的几个 StoreFile 文件】的合并操作,不做任何清除数据、多版本数据清理工作。
major compaction 操作是对一个 Region 下的 HStore 下的所有 StoreFile 执行合并操作,最终的结果是整理合并出一个文件。此过程会真正删除标记为需要清理的数据,而且会消耗大量的磁盘 IO、网络 IO,甚至导致部分节点无法响应,严重影响读写性能,读请求会变慢,写请求会被阻塞。
major compaction 的操作目的:
- 合并文件
- 真正清除标记为删除、过期、多余版本的数据【
minor compaction并不会真正清除数据】 - 提高读写数据的效率,当然,由于磁盘
IO、网络IO的消耗,此操作过程会严重影响读写性能,读请求会变慢,写请求会被阻塞【有时候甚至会降低 10 倍】
一般情况下,HBase 集群的 major compact 都是关闭的,如果开启默认是 7 天执行一次,因此离线的 major compact 是必要的,可以定期手动触发,可以使用 major_compact 表名称 对某个表进行操作。如果手动触发,操作命令很快就返回结果,但是后台操作其实一直在运行,可以通过 grafana 监控查看压缩队列的长度,当压缩队列长度超过 100 的时候,应该延迟操作。由于 major compact 是很重的后台操作,因此操作之前需要有仔细的观察和分析,例如通过 grafana -> HBase 监控可以获得,关于触发时期的选择建议:
- 业务低峰时段运行,即读写请求不大的时候,可以避免影响正常的业务
- 分表执行【或者分
Region执行】,不要整个集群集体执行,并且优先考虑含有TTL的表 StoreFile短期内增加比较多的时候- 表中
StoreFile平均大小比较小的时候
2、以下内容是关于租约时间的说明。
参考官网的配置示例、异常信息:default_configurations 。
一些默认配置:hbase-default.xml 。
关于客户端和 Regionserver 之间的租约时间【LeaseException】,所谓租约,是指 Hbase client 端每次和 Regionserver 交互的时候,都会在服务器端生成一个租约【Lease】,租约的有效期由参数 hbase.client.scanner.timeout.period 指定,默认的租约有效时间是 60 秒。
在 scan 操作过程中,客户端去 Regionserver 取数据的时候,Hbase 中存的数据量很大并且有很多 Region 的时候的,客户端请求的 Region 不在内存中,或是没有被 cache 住,需要从磁盘中加载。如果这时候加载过程需要的时间超过 hbase.client.scanner.timeout.period 所配置的时间,并且客户端没有向 Regionserver 报告其还活着,那么 Regionserver 就会认为本次租约已经过期,并从 LeaseQueue 中从删除掉本次租约。此后,当 Regionserver 加载完成后,拿已经被删除的租约再去取数据的时候,就会出现如下的错误现象。
异常示例:
1 | org.apache.hadoop.hbase.regionserver.LeaseException: lease '-8841369309248784313' does not exist |
一般的做法,就是在配置文件中增大 hbase.client.scanner.timeout.period 的时间,但也不能忽略 RPC 连接的超时问题,所以在增大 hbase.client.scanner.timeout.period 的时候应该同时增大 hbase.rpc.timeout,同时注意 hbase.rpc.timeout 的值应该等于或大于 hbase.client.scanner.timeout.period 的值。
很多人都会误认为一次
Scan操作就是一次RPC请求,其实是不对的。实际上,一次请求大量数据的Scan操作可能会导致多个很严重的后果:服务器端可能因为大量IO操作导致IO利用率很高,影响其它正常的业务请求;大量数据传输会导致网络带宽等系统资源被大量占用;客户端也可能因为内存无法缓存这些数据导致OOM。基于此,HBase会将一次大的Scan操作根据设置条件拆分为多个RPC请求,每次只返回规定数量的结果。代码示例ResultScanner rs = table.getScanner (scan); foreach (Result r :rs){...}语句实际上等价于ResultScanner rs = table.getScanner (scan); Result r = rs.next (),每执行一次next ()操作就会调用客户端发送一次RPC请求,参数hbase.client.scanner.timeout.period就用来表示这么一次RPC请求的超时时间,默认为60000ms,一旦请求超时,就会抛出SocketTimeoutException异常。
相关配置项
HBase 相关配置说明:
hbase.hregion.majorcompaction,HBase自动做major compaction的周期,会严重影响写入性能,建议定期手动做hbase.hregion.majorcompaction=0,关闭定期的major compaction操作,必要时只能手动执行hbase.client.retries.number,客户端连接重试次数,建议设置大一点,例如 24hbase.rootdir,HBase在HDFS中的根目录zookeeper.znode.parent,HBase在Zookeeper的根目录,例如使用Phoenix登录时需要hbase.hregion.max.filesize,设置HBase分区大小,超过此值时自动分裂,避免一个分区过大,默认值 10GB【10737418240B】,在创建表时合理预估数据大小,预设置合理的分区规则【利用rowkey】,可以避免频繁分裂,也使数据分布更加均匀hbase.rpc.timeout,RPC连接失效时间,默认 60 秒hbase.client.scanner.timeout.period,客户端和Regionserver之间租约过期的时间,默认是 60 秒,注意:参数hbase.regionserver.lease.period已经不建议使用hbase.client.scanner.caching,设置HBase Scanner一次从服务端读取的数据条数,默认情况下 1 次 1 条,通过将其设置成一个合理的值【例如 100-500 之间的数字】,可以减少Scan过程中next ()的时间开销,代价是Scanner需要通过客户端的内存来维持这些被cache的行记录hbase.rootdir,这个目录是region server的共享目录,用来持久化HBasehbase.master.port,HBase的Master的端口,默认: 60000hbase.cluster.distributed,HBase的运行模式,false表示单机模式,true表示分布式模式hbase.tmp.dir,本地文件系统的临时文件夹,可以设置为一个更为持久的目录上【/tmp在重启时会被清楚】,默认:${java.io.tmpdir}/hbase-${user.name}hbase.local.dir,作为本地存储,位于本地文件系统的路径,默认:${hbase.tmp.dir}/local/
Elasticsearch
总述,在设置 Elasticsearch 堆大小时需要通过 $ES_HEAP_SIZE 环境变量,遵循两个规则:
- 不要超过可用
RAM的 50%,Lucene能很好利用文件系统的缓存,它是通过系统内核管理的,如果没有足够的文件系统缓存空间,性能会受到影响。 此外,专门用于堆的内存越多意味着其它可用的内存越少,例如fielddata - 不要超过
32GB,如果堆大小小于32GB,JVM可以利用指针压缩,这可以大大降低内存的使用,每个指针是 4 字节而不是 8 字节
分片的分配:shards-allocation 。
脚本的使用:modules-scripting-using 。
熔断器相关:circuit-breaker 。
节点选举、故障检测:modules-discovery-zen 。
fielddata,对字段进行 agg 时,会把数据加载到内存中【索引数据时不会】,记录的是占用内存空间情况,超过指定的值,开始回收内存,防止 OOM。
allocate 表示分片的分配【第一次分配、负载均衡过程中的再次分配】;relocate 【负载均衡过程中的再次分配】表示分片再次进行 allocate。Recovery 表示将一个索引的未分配 shard allocate 到一个结点的过程,在快照恢复、更改索引副本数量、结点故障、结点启动时发生。
Elasticsearch 慢查询日志、慢索引日志等一些配置信息只在 master 节点配置即可,不需要每个节点都配置。
如果设置索引副本数为 1,同一个索引的主分片、副本分片不会被分配在同一个节点上面,这才能保证数据高可用,挂了一个节点也没关系【如果一台物理节点开启了两个 Elasticsearch 节点,需要注意使用 cluster.routing.allocation.same_shard.host 参数】。
- 磁盘空间使用占比上限:
cluster.routing.allocation.disk.watermark.high,默认为 90%,表示如果当前节点的磁盘使用占比超过这个值,则分片【针对所有类型的分片:主分片、副本分片】会被自动relocate到其它节点,并且任何分片都不会allocate到当前节点【此外,对于新创建的primary分片也是如此,除非整个Elasticsearch集群只有一个节点了】 - 磁盘空间使用占比下限:
cluster.routing.allocation.disk.watermark.low,默认为 85%,表示如果当前节点的磁盘使用占比超过这个值,则分片【新创建的primary分片、从来没有进行过allocate的分片除外】不会被allocate到当前节点 - 索引的分片副本数:
number_of_replicas,一般设置为 1,表示总共有 2 份数据 index.auto_expand_replicas:副本数自动扩展,会根据可用Elasticsearch节点数来设置副本数,默认为false,可以设置为0-all、0-5等等- 每个节点分配的分片个数:
total_shards_per_node,一般设置为 2,一个节点只分配 2 个分片,分别为主分片、副本分片 - 索引的分片个数:
number_of_shards,当索引数据很大时,一般设置为节点个数【例如索引数据大小 / 50GB大于节点个数,例如 10 个节点,索引大小800GB,此时按照官方建议应该设置 16 个分片,但是分片过多也不好,就可以设置 10 个分片,每个分片大小80GB】,再配合 分片副本数为 1、 每个节点分配的分片个数为 2,就可以确保分片分配在所有的节点上面,并且每个节点上有 2 个分片,分别为主分片、副本分片 - 数据刷新时间:
refresh_interval,表示数据写入后等待多久可以被搜索到,默认值1s,每次索引的refresh会产生一个新的lucene段,这会导致频繁的合并行为,如果业务需求对实时性要求没那么高,可以将此参数调大,例如调整为60s,会大大降低cpu的使用率 - 索引的分片大小,官方建议是每个分片大小在
30GB到50GB不要超过50GB,所以当索引的数据很大时,就要考虑增加分片的数量 - 设置
terms最大个数:index.max_terms_count,默认最大个数为 65535,可以根据集群情况降低,例如设置为 10000,为了集群稳定,一般不需要设置那么大,索引级别的参数 - 设置
Boolean Query的子语句数量:indices.query.bool.max_clause_count,默认为 1024,不建议增大这个值,也可以根据集群情况适当减小,集群级别的参数 - 查看热点线程:
http://your_ip:your_port/_nodes/your_node_name/hot_threads,可以判断热点线程是search,bulk,还是merge,从而进一步分析是查询还是写入导致负载过高 - 数据目录:
path.data: /path/to/data,多个目录使用逗号分隔,里面存放数据文件 - 日志目录:
path.logs: /path/to/logs,里面存放的是节点的日志、慢查询日志、慢索引日志 - 家目录:
path.home: /path/to/home,elasticsearch的家目录,里面有插件、lib、配置文件等 - 插件目录:
path.plugins: /path/to/plugins,插件目录,里面存放的是插件,例如:分词器 - 设置慢获取时间边界:
index.search.slowlog.threshold.fetch.warn: 30s,超过这个时间的信息会被记录在日志文件中,path.logs参数指定的目录中cluster-name_index_fetch_slowlog.log文件 - 设置慢查询时间边界:
index.search.slowlog.threshold.query.warn: 60s,超过这个时间的信息会被记录在日志文件中,path.logs参数指定的目录中cluster-name_index_search_slowlog.log文件 - 设置慢索引时间边界:
index.search.slowlog.threshold.index.warn: 60s,超过这个时间的信息会被记录在日志文件中,path.logs参数指定的目录中cluster-name_index_indexing_slowlog.log文件 - 禁止集群重分配:
cluster.routing.allocation.enable=none,手动操作分片前需要关闭,否则会引起分片的移动,造成不必要的IO - 开启集群重分配:
cluster.routing.allocation.enable=all,集群的分片管理权限交由集群,保持数据均衡 - 设置集群均衡分片时可以同时
rebalance分片的个数,cluster.routing.allocation.cluster_concurrent_rebalance:2,不宜设置过大,一般 2-4 个为好,当然如果集群资源足够或者需要快速均衡分片,可以设置大一点 - 允许分片分配,
cluster.routing.allocation.enable=all,开启后分片的分配交由集群管理,如果偶尔需要手动管理分片或者集群停机重启,可以临时关闭,取值设置为none即可 - 推迟索引的分片分配时间,在分片节点出故障或者重启时,可以避免分片数据的移动,前提是及时把节点恢复:
index.unassigned.node_left.delayed_timeout=5m,通俗点说,就是趁分片不注意,节点已经恢复了,此时数据分片保持不变,避免了不必要的IO index.max_slices_per_scroll,除了传统的scroll读取数据的方式,v5.x之后Elasticsearch又增加了对每个分片读取数据的功能,称之为切片处理【sliced scroll】,这种读取方式可以对多个分片并行读取数据,大大提高了取数效率,elasticsearch-hadoop就是采用这种方式读取数据的。但是,这里面有一个限制,Elasticsearch默认一个scroll最大的切片数量为 1024【一般小于等于分片数,也可以通过指定切片字段来创建大于分片数的切片】,可以通过index.max_slices_per_scroll参数来变更【不建议更改】cluster.routing.allocation.same_shard.host,在单台物理节点配置多个Elasticsearch实例时,这个参数才生效,用来检查同一个分片的多个实例【主分片、副本分片】是否能分配在同一台主机上面,默认值为false。如果设置为true,表示开启检查机制,一台物理机上面启动 2 个Elasticsearch节点,则分配相同编号的分片时,不会都在这台机器上面,尽管可以满足主分片、副本分片不在同一个Elasticsearch节点上script.groovy.sandbox.enabled: false,禁用Grovvy脚本,默认是关闭的script.inline: false,允许使用内置painless脚本script.stored: false,允许使用保存在config/scripts中的脚本,调用时使用id即可,类似方法名script.file: false,允许使用外部脚本文件http.port: 9200,集群的HTTP端口号transport.tcp.port: 9300,集群的TCP端口号thread_pool.bulk.queue_size: 1500,bulk队列的大小indices.breaker.total.use_real_memory: true,熔断器回收内存,防止OOM,决定父熔断器是考虑实际内存使用情况,还是仅考虑子熔断器内存使用情况indices.breaker.total.limit: 70%,熔断器回收内存,防止OOM,当use_real_memory为true时,设置为 70%,否则默认为 70%indices.breaker.fielddata.limit: 40%,熔断器回收内存,防止OOM,默认 40%indices.breaker.request.limit: 60%,熔断器回收内存,防止OOM,默认 60%indices.fielddata.cache.size,可以设置20%,要低于fielddata.limit,默认无界限discovery.zen.fd.ping_timeout: 60s,集群故障检测discovery.zen.fd.ping_interval: 10s,集群故障检测discovery.zen.fd.ping_retries: 10,集群故障检测discovery.zen.master_election.ignore_non_master_pings: true,选举主节点,设置为true时非node.master节点不能参与选举,投票也无效
discovery.zen.minimum_master_nodes: 2,选举主节点,最少有多少个备选主节点参加选举,防止脑裂现象discovery.zen.ping_timeout: 10s,选举主节点discovery.zen.ping.unicast.hosts: ["ip1:port","ip2:port"],选举主节点,主机列表node.data: true,数据节点node.master: true,有资格被选举为主节点action.destructive_requires_name=true,设置严格校验,对于删除数据、删除索引的破坏性行为进行严格校验,不支持通配符,防止类似于rm -rf /*的悲剧network.host,绑定主机名或者ip地址,用于向集群广播自己cluster.routing.allocation.exclude._ip,临时下线节点,类似于黑名单,分片不会往指定的主机移动,同时会把分片从指定的节点全部移除,最终可以下线该节点,可通过put transient设置临时生效cluster.routing.allocation.disk.watermark.flood_stage,磁盘使用上限,超过则对应的索引被自动设置为只读模式,索引属性:index.blocks.read_only_allow_delete
当然,除了关注 Elasticsearch 本身的配置之外,如果在实际应用中使用了 Nginx 作为中间代理,此时需要注意 Nginx 的配置,它也会直接影响着请求。例如;client_max_body_size,影响着请求的请求体大小,如果参数设置过小则直接无法通过 Nginx 转发,也就无法到达 Elasticsearch 了,最终请求会失败【如果是写入大量的数据场景,请求体会很大,可能有几兆,则写入失败,一般的查询请求倒是很小】。
Spark
- 序列化方式:
spark.serializer,可以选择:org.apache.spark.serializer.KryoSerializer executor附加参数:spark.executor.extraJavaOptions,例如可以添加:-Dxx=yy【如果仅仅在driver端设置,executor是不会有的】driver附加参数:spark.driver.extraJavaOptions,例如可以添加:-Dxx=yy- 日志配置文件设置,
Spark使用的是log4j,默认在Spark安装目录的conf下面,如果想要增加log4j相关配置,更改driver机器上面的log4j.properties配置文件是无效的,必须把所有的executor节点上的配置文件全部更新。如果没有权限,也可以自己上传配置文件,然后需要在executor附加参数中指定:-Dlog4j.configuration=file:/path/to/file,启动Spark任务时还需要使用--files指定配置文件名称,多个用逗号分隔,用来上传配置文件到Spark节点 - 开启允许多
SparkContext存在:spark.driver.allowMultipleContexts,设置为true即可,在使用多个SparkContext时,需要先停用当前活跃的,使用stop方法【在Spark v2.0以及以上版本,已经取消了这个限制】 spark.executor.cores,每个执行器上面的占用核数,会消耗CPU,一般设置为 2-3spark.executor.memory,执行器上面的堆内存大小,一般设置为2048M、4096Mspark.port.maxRetries,提交任务的Spark UI重试次数spark.default.parallelism,默认并行度spark.cores.max,最大核心数spark.executor.logs.rolling.strategyspark.executor.logs.rolling.maxRetainedFilesspark.executor.logs.rolling.size.maxBytesspark.ui.showConsoleProgressspark.yarn.max.executor.failures,task失败重试次数,默认为spark.executor.cores的 2 倍,最小值为 3,如果重试最大次数后task仍旧失败,则整个Application执行失败【容错性】spark.yarn.maxAppAttempts,提交申请的最大尝试次数,小于等于yarn配置中的全局最大尝试次数,尝试最大次数后仍旧无法提交,则Application提交失败【yarn配置为yarn.resourcemanager.am.max-attempts,默认为 2,即有 2 次提交机会】
Kafka 输入数据源
Spark Streaming 配置:
spark.streaming.backpressure.enabled,开启反压机制spark.streaming.backpressure.pid.minRate,spark.streaming.kafka.maxRatePerPartition,每个partition的最大读取速度,单位秒,一般设置 500-100 即可spark.streaming.receiver.maxRate,receiver最大处理数据量,单位秒,与maxRatePerPartition、Durations有关,实际运行时由于反压机制,数据处理速度会低于这个值spark.streaming.receiver.writeAheadLog.enable,spark.streaming.stopGracefullyOnShutdown,优雅地退出spark.streaming.gracefulStopTimeout,xx,
Kakfa 配置:
metadata.broker.list,offsets.storage,设置为kafkazookeeper.connect,zookeeper.connection.timeout.msgroup.idfetch.message.max.bytesauto.offset.reset,消费的起始位置,这个参数高低版本之间的名称、值都会不同,需要注意consumer.timeout.msrebalance.max.retriesrebalance.backoff.ms
集群参数
yarn 集群:
yarn.resourcemanager.am.max-attempts,最大应用尝试次数,它是所有AM的全局设置,每个应用都可以通过API的参数指定其各自的最大应用尝试次数【参数spark.yarn.maxAppAttempts】,但是单个数字不能超过这个全局上限,如果超过了,资源管理器将覆盖它。默认数量设置为 2,以允许至少 1 次重试,即有 2 次提交的机会
standalone 集群
- 临时目录:
SPARK_LOCAL_DIRS,用来存放Spark任务运行过程中的临时数据,例如内存不足时把数据缓存到磁盘,就会有数据写入这个目录,当然,在启动Spark任务时也可以单独指定,但是最好还是设置在集群上面,可以在spark-env.sh脚本中设置,键值对的形式,例如:SPARK_LOCAL_DIRS=/your_path/spark/local。需要注意的是,启动Excutor的用户必须有这个目录的写权限,并且保证这个目录的磁盘空间足够使用,否则在Spark任务中会出现异常:java.io.IOException: Failed to create local dir in xx,进而导致Task失败 Work目录:SPARK_WORKER_DIR,用来存放Work的信息,设置方式同上面的SPARK_LOCAL_DIRS,如果Spark任务里面有System.out (),输出的内容在此目录下SPARK_LOG_DIR,Spark集群自身的日志文件,例如Work接收Spark任务后通信的内容
Storm
StormUI nimbus内容传输大小限制:nimbus.thrift.max_buffer_size: 1048576,取值的单位是字节,默认为1048576,如果nimbus汇报的内容过多,超过这个值,则在StormUI上面无法查看Topology Summary信息,会报错:Internal Server Error org.apache.thrift7.transport.TTransportException: Frame size (3052134) larger than max length (1048576)- 执行实例
worker对应的端口号:supervisor.slots.ports:,可以设置多个,和CPU的核数一致,或者稍小,提高机器资源的使用率 worker的JVM参数:WORKER_GC_OPTS,取值参考:-Xms1G -Xmx5G -XX:+UseG1GC,根据集群机器的资源多少而定,G1是一种垃圾回收器supervisor的JVM参数:SUPERVISOR_GC_OPTS,取值参考:-Xms1G -Xmx5G -XX:+UseG1GC,根据集群机器的资源多少而定,G1是一种垃圾回收器Storm UI的服务端口:ui.port,可以使用浏览器打开网页查看Topology详细信息ZooKeeper服务器列表:storm.zookeeper.serversZooKeeper连接端口:storm.zookeeper.portZooKeeper中Storm的根目录位置:storm.zookeeper.root,用来存放Storm集群元信息- 客户端连接
ZooKeeper超时时间:storm.zookeeper.session.timeout Storm使用的本地文件系统目录:storm.local.dir,注意此目录必须存在并且Storm进程有权限可读写Storm集群运行模式:storm.cluster.mode,取值可选:distributed、local
Kafka
注意,Kafka 的不同版本参数名、参数值会有变化,特别是 v0.9.x 之后,与之前的低版本差异很大,例如数据游标的参数可以参考我的另外一篇博文:记录一个 Kafka 错误:OffsetOutOfRangeException ,Kafka 官网参见:Kafka-v0.9.0.x-configuration 。
配置优化都是修改 server.properties 文件中参数值。
JVM参数:KAFKA_HEAP_OPTS,取值参考:-Xmx2G- 文件存放位置:
log.dirs,多个使用逗号分隔,注意所有的log级别需要设置为INFO - 单个
Topic的文件保留策略:log.retention.hours=72【数据保留 72 小时,超过时旧数据被删除】,log.retention.bytes=1073741824【数据保留 1GB,超过时旧数据被删除】 - 数据文件刷盘策略:
log.flush.interval.messages=10000【每当producer写入 10000 条消息时,刷数据到磁盘】,log.flush.interval.ms=1000【每间隔 1 秒钟时间,刷数据到磁盘】 Topic的分区数量:num.partitions=8- 启动
Fetch线程给副本同步数据传输大小限制:replica.fetch.max.bytes=10485760,要比message.max.bytes大 message.max.bytes=10485700,这个参数决定了broker能够接收到的最大消息的大小,要比max.request.size大max.request.size=10480000,这个参数决定了producer生产消息的大小fetch.max.bytes=10485760,这个参数决定了consumer消费消息的大小,要比message.max.bytes大broker处理消息的最大线程数:num.network.threads=17,一般num.network.threads主要处理网络IO,读写缓冲区数据,基本没有IO等待,配置线程数量为CPU核数加 1broker处理磁盘IO的线程数:num.io.threads=32,num.io.threads主要进行磁盘IO操作,高峰期可能有些IO等待,因此配置需要大些,配置线程数量为CPU核数 2 倍,最大不超过 3 倍- 强制新建一个
segment的时间:log.roll.hour=72 - 是否允许自动创建
Topic:auto.create.topics.enable=true,如果设置为false,则代码无法创建,需要通过kafka的命令创建Topic auto.offset.reset,关于数据游标的配置【earliest与latest、smallest、largest】,由于不同版本之间的差异,可以参考:记录一个 Kafka 错误:OffsetOutOfRangeExceptionadvertised.host.name、advertised.port,关于外网集群可以访问的配置,跨网络生产、消费数据,v082以及之前的版本【之后的版本有保留这两个参数,但是不建议使用】advertised.listeners、listeners,关于外网集群可以访问的配置,跨网络生产、消费数据,v090以及之后的版本kafka-topics.sh --create -zookeeper xx:2181 --replication-factor 2 --partitions 2 --topic topic_xx,kafka创建topic,里面的副本数量参数--replication-factor,和Elasticsearch的含义不一样,不仅仅是指备份的数据量,而是总体的副本数,所以设置为 1 等价于没有副本。
留意参数取值大小的限制:fetch.max.bytes 大于 message.max.bytes 大于 max.request.size,replica.fetch.max.bytes 大于 message.max.bytes 大于 max.request.size。
Zookeeper
配置 zoo.cfg 文件:
dataDir,表示快照日志目录dataLogDir,表示事务日志目录,不配置的时候事务日志目录同dataDirclientPort=2181,服务的监听端口tickTime=2000,Zookeeper的时间单元,Zookeeper中所有时间都是以这个时间单元的整数倍去配置的,例如,session的最小超时时间是2*tickTime【单位:毫秒】syncLimit=5,表示Follower和Observer与Leader交互时的最大等待时间,只不过是在与leader同步完毕之后,进入正常请求转发或ping等消息交互时的超时时间initLimit=10,Observer和Follower启动时,从Leader同步最新数据时,Leader允许initLimit * tickTime的时间内完成,如果同步的数据量很大,可以相应地把这个值设置大一些maxClientCnxns=384,最大并发客户端数,用于防止Ddos的,默认值是 10,设置为 0 是不加限制maxSessionTimeout=120000,Session超时时间限制,如果客户端设置的超时时间不在这个范围,那么会被强制设置一个最大时间,默认的Session超时时间是在2 * tickTime ~ 20 * tickTime这个范围minSessionTimeout=4000,同maxSessionTimeoutserver.x=hostname:2888:3888,x是一个数字,与每个服务器的myid文件中的id是一样的,hostname是服务器的hostname,右边配置两个端口,第一个端口用于Follower和Leader之间的数据同步和其它通信,第二个端口用于Leader选举过程中投票通信autopurge.purgeInterval=24,在v3.4.0及之后的版本,Zookeeper提供了自动清理事务日志文件和快照日志文件的功能,这个参数指定了清理频率,单位是小时,需要配置一个 1 或更大的整数。默认是 0,表示不开启自动清理功能autopurge.snapRetainCount=30,参数指定了需要保留的事务日志文件和快照日志文件的数目,默认是保留 3 个,和autopurge.purgeInterval搭配使用

