fluentd采集数据到kafka时的版本问题

在使用fluentd采集数据到kafka时,一直不通,碰到了很多报错。
fluentd版本为:1.2.5
fluent-plugin-kafka版本为:0.7.8
kafka版本为:0.9
最开始碰到了这个报错:

2018-09-05 01:42:06 +0000 [warn]: fluent/log.rb:342:warn: Send exception occurred: unknown topic 
2018-09-05 01:42:06 +0000 [warn]: fluent/log.rb:342:warn: Exception Backtrace : /var/lib/gems/2.3.0/gems/ruby-kafka-0.6.8/lib/kafka/protocol/metadata_response.rb:141:in `partitions_for'
/var/lib/gems/2.3.0/gems/ruby-kafka-0.6.8/lib/kafka/cluster.rb:155:in `partitions_for'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/kafka_producer_ext.rb:190:in `assign_partitions!'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/kafka_producer_ext.rb:153:in `block in deliver_messages_with_retries'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/kafka_producer_ext.rb:148:in `loop'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/kafka_producer_ext.rb:148:in `deliver_messages_with_retries'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/kafka_producer_ext.rb:102:in `deliver_messages'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/out_kafka2.rb:220:in `write'
/var/lib/gems/2.3.0/gems/fluentd-1.2.4/lib/fluent/plugin/output.rb:1110:in `try_flush'
/var/lib/gems/2.3.0/gems/fluentd-1.2.4/lib/fluent/plugin/output.rb:1389:in `flush_thread_run'
/var/lib/gems/2.3.0/gems/fluentd-1.2.4/lib/fluent/plugin/output.rb:444:in `block (2 levels) in start'
/var/lib/gems/2.3.0/gems/fluentd-1.2.4/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2018-09-05 01:42:06 +0000 [info]: fluent/log.rb:322:info: initialized kafka producer: fluentd
2018-09-05 01:42:06 +0000 [debug]: fluent/log.rb:302:debug: taking back chunk for errors. chunk="57515e0ef787da843836cc864f9d1581"
2018-09-05 01:42:06 +0000 [warn]: fluent/log.rb:342:warn: failed to flush the buffer. retry_time=2 next_retry_seconds=2018-09-05 01:42:06 +0000 chunk="57515e0ef787da843836cc864f9d1581" error_class=Kafka::UnknownTopicOrPartition error="unknown topic "
  2018-09-05 01:42:06 +0000 [warn]: plugin/output.rb:1157:rescue in try_flush: suppressed same stacktrace
2018-09-05 01:42:09 +0000 [debug]: fluent/log.rb:302:debug: 61 messages send.
2018-09-05 01:42:09 +0000 [warn]: fluent/log.rb:342:warn: Send exception occurred: unknown topic 
2018-09-05 01:42:09 +0000 [warn]: fluent/log.rb:342:warn: Exception Backtrace : /var/lib/gems/2.3.0/gems/ruby-kafka-0.6.8/lib/kafka/protocol/metadata_response.rb:141:in `partitions_for'
/var/lib/gems/2.3.0/gems/ruby-kafka-0.6.8/lib/kafka/cluster.rb:155:in `partitions_for'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/kafka_producer_ext.rb:190:in `assign_partitions!'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/kafka_producer_ext.rb:153:in `block in deliver_messages_with_retries'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/kafka_producer_ext.rb:148:in `loop'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/kafka_producer_ext.rb:148:in `deliver_messages_with_retries'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/kafka_producer_ext.rb:102:in `deliver_messages'
/var/lib/gems/2.3.0/gems/fluent-plugin-kafka-0.7.6/lib/fluent/plugin/out_kafka2.rb:220:in `write'
/var/lib/gems/2.3.0/gems/fluentd-1.2.4/lib/fluent/plugin/output.rb:1110:in `try_flush'
/var/lib/gems/2.3.0/gems/fluentd-1.2.4/lib/fluent/plugin/output.rb:1389:in `flush_thread_run'
/var/lib/gems/2.3.0/gems/fluentd-1.2.4/lib/fluent/plugin/output.rb:444:in `block (2 levels) in start'
/var/lib/gems/2.3.0/gems/fluentd-1.2.4/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'
2018-09-05 01:42:09 +0000 [info]: fluent/log.rb:322:info: initialized kafka producer: fluentd
2018-09-05 01:42:09 +0000 [debug]: fluent/log.rb:302:debug: taking back chunk for errors. chunk="57515e0ef787da843836cc864f9d1581"
2018-09-05 01:42:09 +0000 [warn]: fluent/log.rb:342:warn: failed to flush the buffer. retry_time=3 next_retry_seconds=2018-09-05 01:42:09 +0000 

这是因为没有配置default_topic,使用下面的配置指定topic就可以了。

  <store>
    @type kafka

    brokers 10.12.128.36:9092
    default_topic k8stest
    default_message_key message 

  </store>

但是后面还是报错:

2018-09-07 10:28:43 +0800 [info]: #0 initialized kafka producer: kafka
2018-09-07 10:28:43 +0800 [warn]: #0 emit transaction failed: error_class=Kafka::ConnectionError error="Could not connect to any of the seed brokers:\n- kafka://10.12.128.36:9092: Connection error EOFError: EOFError" location="/opt/td-a
gent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.6.5/lib/kafka/cluster.rb:407:in `fetch_cluster_info'" tag="tomcat"
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.6.5/lib/kafka/cluster.rb:407:in `fetch_cluster_info'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.6.5/lib/kafka/cluster.rb:367:in `cluster_info'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.6.5/lib/kafka/cluster.rb:95:in `refresh_metadata!'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.6.5/lib/kafka/cluster.rb:50:in `add_target_topics'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.6.5/lib/kafka/producer.rb:276:in `deliver_messages_with_retries'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.6.5/lib/kafka/producer.rb:238:in `block in deliver_messages'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.6.5/lib/kafka/instrumenter.rb:23:in `instrument'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/ruby-kafka-0.6.5/lib/kafka/producer.rb:231:in `deliver_messages'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluent-plugin-kafka-0.7.8/lib/fluent/plugin/out_kafka.rb:240:in `emit'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.2/lib/fluent/compat/output.rb:164:in `process'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.2/lib/fluent/plugin/output.rb:763:in `emit_sync'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.2/lib/fluent/plugin/out_copy.rb:58:in `block in process'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.2/lib/fluent/plugin/out_copy.rb:56:in `each'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.2/lib/fluent/plugin/out_copy.rb:56:in `each_with_index'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.2/lib/fluent/plugin/out_copy.rb:56:in `process'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.2/lib/fluent/plugin/multi_output.rb:148:in `emit_sync'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.2/lib/fluent/event_router.rb:96:in `emit_stream'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.2/lib/fluent/plugin/in_tail.rb:385:in `receive_lines'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.2/lib/fluent/plugin/in_tail.rb:503:in `wrap_receive_lines'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.2/lib/fluent/plugin/in_tail.rb:714:in `block in handle_notify'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.2/lib/fluent/plugin/in_tail.rb:758:in `with_io'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.2/lib/fluent/plugin/in_tail.rb:692:in `handle_notify'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.2/lib/fluent/plugin/in_tail.rb:688:in `block in on_notify'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.2/lib/fluent/plugin/in_tail.rb:688:in `synchronize'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.2/lib/fluent/plugin/in_tail.rb:688:in `on_notify'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.2/lib/fluent/plugin/in_tail.rb:534:in `on_notify'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.2/lib/fluent/plugin/in_tail.rb:620:in `on_change'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/cool.io-1.5.3/lib/cool.io/loop.rb:88:in `run_once'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/cool.io-1.5.3/lib/cool.io/loop.rb:88:in `run'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.2/lib/fluent/plugin_helper/event_loop.rb:93:in `block in start'
  2018-09-07 10:28:43 +0800 [warn]: #0 /opt/td-agent/embedded/lib/ruby/gems/2.4.0/gems/fluentd-1.2.2/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'

后来看网上说是ruby-kafka库的原因,于是就写了一个脚本:

require "kafka"

kafka = Kafka.new(["10.12.0.26:9092"], client_id: "my-application")

kafka.deliver_message("Hello, World!", topic: "k8stest")

运行后,发现报同样的错误,后再查看ruby-kafka的文档:

Compatibility

kafka Producer API Consumer API
Kafka 0.8 Full support in v0.4.x Unsupported
Kafka 0.9 Full support in v0.4.x Full support in v0.4.x
Kafka 0.10 Full support in v0.5.x Full support in v0.5.x
Kafka 0.11 Limited support Limited support
Kafka 1.0 Limited support Limited support

这个表也容易给人误解,通常我们会以为高版本的ruby-kafka会支持低版本的kafka,比如居然ruby-kafka 0.4.x支持kafka 0.9,那理所当然,ruby-kafka 0.4.x支持kafka 0.9和0.10吧。但其实这是一一对应的,主要是因为kafka不同版本之间接口的差别过大。
找到原因后就容易了,更换ruby-kafka及fluent-plugin-kafka的版本:

td-agent-gem uninstall ruby-kafka --version 0.6.8
td-agent-gem uninstall fluent-plugin-kafka --version 0.7.3
td-agent-gem install ruby-kafka  --version 0.4.3
td-agent-gem install fluent-plugin-kafka  --version 0.6.6

最终支持kafka 0.9的ruby-kafka及fluent-plugin-kafka版本为:

[root@bs-ops-test-docker-dev-04 ~]# td-agent-gem list | grep kafka
fluent-plugin-kafka (0.6.6)
ruby-kafka (0.4.3)

注意:fluent-plugin-kafka 0.7.x的版本不能配合ruby-kafka 0.4.x版本工作。

参考:
https://github.com/zendesk/ruby-kafka
https://blog.csdn.net/mtj66/article/details/79209302

发表评论

电子邮件地址不会被公开。 必填项已用*标注

This site uses Akismet to reduce spam. Learn how your comment data is processed.