从kafka导入导出数据

导入:
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 10.16.252.48:9092,10.16.252.49:9092,10.16.252.50:9092 --topic k8s_log_pl_old < /data/k8s_log_pl.txt

导出:
/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 10.16.252.48:2181,10.16.252.49:2181,10.16.252.50:2181 --topic k8s_log_pl --from-beginning > /data/k8s_log_pl.txt &

kafka增加分区:
./kafka-topics.sh --alter --zookeeper 10.16.252.48:2181,10.16.252.49:2181,10.16.252.50:2181 --topic k8scs1_log_online --partitions 24

kafka 0.9.0.1 安装配置及简单压测

1. 简介

Kafka专为分布式高吞吐量系统而设计。 作为一个更传统的消息代理的替代品。 与其他消息传递系统相比,Kafka具有更好的吞吐量,内置分区,复制和固有的容错能力,这使得它非常适合大规模消息处理应用程序。

在实际使用中,kafka的不同版本协议也不完全一致,不是向下兼容的,使用时需特别注意,我这里使用的是0.9.0.1的版本。

2. 服务器配置

本次安装的服务器为物理机3台:
CPU:双Intel(R) Xeon(R) Silver 4110 CPU @ 2.10GHz
内存:128G配置
10块SAS HDD 1T盘做raid10,挂载到/data1。

主机名 IP 备注
sh-saas-k8s-kafka-online-01 10.16.252.33 zookeeper,kafka
sh-saas-k8s-kafka-online-02 10.16.252.39 zookeeper,kafka
sh-saas-k8s-kafka-online-03 10.16.252.40 zookeeper,kafka

2. kafka安装及配置

kafka需要用到zookeeper,所以我们需要先安装zookeeper。kafka及zookeeper都需要是JAVA语言开发,需要使用JDK,推荐1.8版本。

(更多…)

fluentd采集数据到kafka时的版本问题

在使用fluentd采集数据到kafka时,一直不通,碰到了很多报错。
fluentd版本为:1.2.5
fluent-plugin-kafka版本为:0.7.8
kafka版本为:0.9
最开始碰到了这个报错:

2018-09-05 01:42:06 +0000 [warn]: fluent/log.rb:342:warn: Send exception occurred: unknown topic 
2018-09-05 01:42:06 +0000 [warn]: fluent/log.rb:342:warn: Exception Backtrace : /var/lib/gems/2.3.0/gems/ruby-kafka-0.6.8/lib/kafka/protocol/metadata_response.rb:141:in `partitions_for'
/var/lib/gems/2.3.0/gems/ruby-kafka-0.6.8/lib/kafka/cluster.rb:155:in `partitions_for'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/kafka_producer_ext.rb:190:in `assign_partitions!'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/kafka_producer_ext.rb:153:in `block in deliver_messages_with_retries'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/kafka_producer_ext.rb:148:in `loop'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/kafka_producer_ext.rb:148:in `deliver_messages_with_retries'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/kafka_producer_ext.rb:102:in `deliver_messages'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/out_kafka2.rb:220:in `write'
/var/lib/gems/2.3.0/gems/fluentd-1.2.4/lib/fluent/plugin/output.rb:1110:in `try_flush'
/var/lib/gems/2.3.0/gems/fluentd-1.2.4/lib/fluent/plugin/output.rb:1389:in `flush_thread_run'
/var/lib/gems/2.3.0/gems/fluentd-1.2.4/lib/fluent/plugin/output.rb:444:in `block (2 levels) in start'
/var/lib/gems/2.3.0/gems/fluentd-1.2.4/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2018-09-05 01:42:06 +0000 [info]: fluent/log.rb:322:info: initialized kafka producer: fluentd
2018-09-05 01:42:06 +0000 [debug]: fluent/log.rb:302:debug: taking back chunk for errors. chunk="57515e0ef787da843836cc864f9d1581"
2018-09-05 01:42:06 +0000 [warn]: fluent/log.rb:342:warn: failed to flush the buffer. retry_time=2 next_retry_seconds=2018-09-05 01:42:06 +0000 chunk="57515e0ef787da843836cc864f9d1581" error_class=Kafka::UnknownTopicOrPartition error="unknown topic "
  2018-09-05 01:42:06 +0000 [warn]: plugin/output.rb:1157:rescue in try_flush: suppressed same stacktrace
2018-09-05 01:42:09 +0000 [debug]: fluent/log.rb:302:debug: 61 messages send.
2018-09-05 01:42:09 +0000 [warn]: fluent/log.rb:342:warn: Send exception occurred: unknown topic 
2018-09-05 01:42:09 +0000 [warn]: fluent/log.rb:342:warn: Exception Backtrace : /var/lib/gems/2.3.0/gems/ruby-kafka-0.6.8/lib/kafka/protocol/metadata_response.rb:141:in `partitions_for'
/var/lib/gems/2.3.0/gems/ruby-kafka-0.6.8/lib/kafka/cluster.rb:155:in `partitions_for'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/kafka_producer_ext.rb:190:in `assign_partitions!'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/kafka_producer_ext.rb:153:in `block in deliver_messages_with_retries'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/kafka_producer_ext.rb:148:in `loop'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/kafka_producer_ext.rb:148:in `deliver_messages_with_retries'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/kafka_producer_ext.rb:102:in `deliver_messages'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/out_kafka2.rb:220:in `write'
/var/lib/gems/2.3.0/gems/fluentd-1.2.4/lib/fluent/plugin/output.rb:1110:in `try_flush'
/var/lib/gems/2.3.0/gems/fluentd-1.2.4/lib/fluent/plugin/output.rb:1389:in `flush_thread_run'
/var/lib/gems/2.3.0/gems/fluentd-1.2.4/lib/fluent/plugin/output.rb:444:in `block (2 levels) in start'
/var/lib/gems/2.3.0/gems/fluentd-1.2.4/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2018-09-05 01:42:09 +0000 [info]: fluent/log.rb:322:info: initialized kafka producer: fluentd
2018-09-05 01:42:09 +0000 [debug]: fluent/log.rb:302:debug: taking back chunk for errors. chunk="57515e0ef787da843836cc864f9d1581"
2018-09-05 01:42:09 +0000 [warn]: fluent/log.rb:342:warn: failed to flush the buffer. retry_time=3 next_retry_seconds=2018-09-05 01:42:09 +0000 

这是因为没有配置default_topic,使用下面的配置指定topic就可以了。
(更多…)