本文介绍GBase 8a和OGG +Kafka相关参数,主要用于consume功能,与加载导出等无关。
gbase_8a_gcluster.cnf
目录导航
9.5.2版本kafka参数列表
gbase> show variables like '%kaf%';
+----------------------------------------------------+-----------+
| Variable_name | Value |
+----------------------------------------------------+-----------+
| _gbase_kafka_transaction_mode | OFF |
| _t_gcluster_kafka_allow_offset_jump | 1 |
| _t_gcluster_kafka_consumer_compare_field_only_once | 0 |
| _t_gcluster_kafka_consumer_force_compare_field | 1 |
| _t_gcluster_kafka_ignore_when_update_not_hit | 0 |
| _t_gcluster_kafka_null_transform | 0 |
| _t_gcluster_kafka_seek_offset | 0 |
| _t_gcluster_kafka_trust_kafka_returned_offset | 0 |
| _t_kafka_varchar_auto_truncate | 0 |
| gbase_kafka_broker_version | |
| gbase_kafka_keytab | |
| gbase_kafka_principal | |
| gcluster_kafka_batch_commit_dml_count | 100000 |
| gcluster_kafka_consume_batch | 10 |
| gcluster_kafka_consume_timeout | 2000 |
| gcluster_kafka_consumer_escape_zero | 0 |
| gcluster_kafka_consumer_latency_time_statistics | 0 |
| gcluster_kafka_consumer_output_charset_name | |
| gcluster_kafka_consumer_special_use_charset_name | |
| gcluster_kafka_consumer_support_partial_update | 0 |
| gcluster_kafka_data_buf_size | 0 |
| gcluster_kafka_dataflow | 0 |
| gcluster_kafka_debug_on | 0 |
| gcluster_kafka_delete_execute_directly | 0 |
| gcluster_kafka_ignore_if_table_not_exist | 0 |
| gcluster_kafka_ignore_pos_field | 0 |
| gcluster_kafka_loader_max_start_count | 20 |
| gcluster_kafka_local_queue_size | 201000 |
| gcluster_kafka_max_message_size | 100000000 |
| gcluster_kafka_parallel_commit | 1 |
| gcluster_kafka_primarykey_can_be_null | 0 |
| gcluster_kafka_result_check | 0 |
| gcluster_kafka_user_allowed_max_latency | 10000 |
+----------------------------------------------------+-----------+
33 rows in set (Elapsed: 00:00:00.00)
gcluster_kafka_consumer_enable
打开 kafka consumer 功能,如果不打开,则 consumer 相关命令都不可用(报错)
- 1 打开
- 0 关闭, 默认
该参数在集群show 时看不到,但在默认的配置文件里能看到。
gcluster_assign_kafka_topic_period
自动接管 consumer 的时间周期,单位为秒。
例如 A 节点宕机了,最大需要等待 gcluster_assign_kafka_topic_period秒之后,A 节点负责的同步任务会被其他节点接管。最小值 20s,最大值 120s。
gcluster_kafka_max_message_size
从 kafka topic 获得消息的最大长度,
单位为字节,最大值 1000000000 字节,这个值需要大于等于 kafka server 的配置(message.max.bytes),否则可能造成消费问题,如果 kafka 队列中存在一条消息,其大小超过 gcluster_kafka_max_message_size 就会造成消费卡住。
gcluster_kafka_batch_commit_dml_count
一次提交 dml 操作的数量。
适当调大能明显提高性能,但是如果一个 topic 涉及的表很多(几百个表)则建议该参数调小,表越多越应该调小,调小的目的是使得一次提交命中的表少一些,具体需要结合具体用户场景、同步速度、资源占用情况具体对待。未来启用新事务后,表数量多对性能的影响会降低,会再次更新手册。需要注意的是,此参数是一个意向值,程序未必会严格按照此参数来提交,比如如果一个事务包含大量 DML 操作,那么程序必须确保事务完整性;再比如从 kafka 取消息、解析消息的速度慢于往单机提交数据的速度,那么程序也会选择先提交,而不是一定要等待满足 gcluster_kafka_batch_commit_dml_count 参数。
gcluster_kafka_user_allowed_max_latency
允许消息在 GBase 8a MPP Cluster 集群层缓存多长时间,超时之后必须马上提交,单位是毫秒。
此参数与gcluster_kafka_batch_commit_dml_count 作用类似,都是决定什么时候提交的。多攒一些数据再提交,有利于降低磁盘占用,如果用户对数据延迟不太敏感,而对磁盘占用比较敏感,可以通过这个参数来调节。典型值一般可以设置为 50000~20000,需要注意提交动作本身也需要消耗时间。
gcluster_kafka_local_queue_size
储存 dml 操作的队列的长度,建议至少为 gcluster_kafka_batch_commit_dml_count 的二倍多一些。
gcluster_kafka_consume_batch
consumer 一次读取 kafka 消息的条数。
如果 kafka 队列里的消息 size 较小,可以设大,反之设小,此参数对性能的影响不大,所以一般没必要设太大,建议设为 10~1000。
gcluster_kafka_ignore_pos_field
控制单个 consumer 是否比对 POS(防止
重复消费)。客户多线程往 kafka 中写入数据,写入 kafka 的数据不能确保 POS有序,原 consumer 消费数据时会做 POS 检查导致无序的数据入库时会有遗漏。现在参数 gcluster_kafka_ignore_pos_field,控制 consumer 是否进行 POS 检查。
- POS 检查开启,consumer 消费时会丢弃已消费序号之前的消息;
- POS 检查关闭,consumer 会将 kafka 的每条消息均入库,所以需要生产端确保发送到 kafka的消息无重复。
- 默认值为 0,即检查重复消息;
- 值为 1 时,不检查重复消息。
用于 Consumer 消费 only insert 消息,客户能保证 kafka 消息无重复的特殊场景。配置方法可以手动修改 gclusterdb.kafka_consumers。如:
Update gclusterdb.kafka_consumers set common_options='gcluster_kafka_ignore_pos_field=1' where name='consumer_1';
最后重启consumer_1。
_t_kafka_varchar_auto_truncate
在consumer消费kafka信息时,遇到长度超数据库定义长度的字段(仅限 varchar 类型),开启可以自动进行截位并正常消费入库模式。缺省值为 0;设置值为 1 时,表示让 consumer 对 json消息中的 after 内容进行长度判断,如果长度超过了目标表的列宽,则自动按列宽(字符长度)截断,只对 varchar 列做处理。
gcluster_kafka_message_format_type
设定 consumer 在解析 kafka 消息时,以什么格式来解析。
取值范围:JSON、PUREDATA、AUTO_DETECT
说明:
- puredata 对应 rtsync 生产的 protobuf 消息;
- AUTO_DETECT(默认)是让 consumer 自己侦测消息格式,这时候 consumer 会先尝试用 puredata 格式进行解析,通过就认为是 puredata 格式,否则就认为是json 格式。
注:consumer 启动后,只在解析第一条消息时做这个判断,后面直接用这个判断结果。
gcluster_kafka_ignore_pos_field
控制单个 consumer 是否比对 POS(防止重复消费)。
客户多线程往 kafka 中写入数据,写入 kafka 的数据不能确保 POS 有序,原 consumer 消费数据时会做 POS 检查导致无序的数据入库时会有遗漏。参数 gcluster_kafka_ignore_pos_field,控制 consumer 是否进行 POS 检查。
- POS检查开启,consumer 消费时会丢弃已消费序号之前的消息;
- POS 检查关闭,consumer 会将 kafka 的每条消息均入库,所以需要生产端确保发送到 kafka 的消息无重复。
配置方法:手动修改 gclusterdb.kafka_consumers
Update gclusterdb.kafka_consumers set common_options='gcluster_kafka_ignore_pos_field=1' where name='consumer_1';
重启 consumer_1。
gcluster_kafka_broker_version
设定 kafka server 的版本,例如 0.9.0,0.8.2… 当 kafka server 低于 0.10 版本的时候必须设置此参数,而高于 0.10版本不要设置。
gbase_kafka_principal
配置 kafka kerberos principal
gbase_kafka_keytab
配置 kafka kerberos keytab file path
gcluster_kafka_ignore_if_table_not_exist
consumer 处理一个消息时,如果消息指定的目标表不存在,是否自动忽略此消息。1 代表忽略。
gcluster_kafka_parallel_commit
consumer 向 gnode 发送 sql 是否采用并行方式。默认值是 1,代表不并行。这个参数目前不要使用,会造成主备不一致,或者 delete 不掉数据。