kafka 0.9.0.1 安装配置及简单压测
1. 简介
Kafka专为分布式高吞吐量系统而设计。 作为一个更传统的消息代理的替代品。 与其他消息传递系统相比,Kafka具有更好的吞吐量,内置分区,复制和固有的容错能力,这使得它非常适合大规模消息处理应用程序。
在实际使用中,kafka的不同版本协议也不完全一致,不是向下兼容的,使用时需特别注意,我这里使用的是0.9.0.1的版本。
2. 服务器配置
本次安装的服务器为物理机3台:
CPU:双Intel(R) Xeon(R) Silver 4110 CPU @ 2.10GHz
内存:128G配置
10块SAS HDD 1T盘做raid10,挂载到/data1。
主机名 | IP | 备注 |
---|---|---|
sh-saas-k8s-kafka-online-01 | 10.16.252.33 | zookeeper,kafka |
sh-saas-k8s-kafka-online-02 | 10.16.252.39 | zookeeper,kafka |
sh-saas-k8s-kafka-online-03 | 10.16.252.40 | zookeeper,kafka |
2. kafka安装及配置
kafka需要用到zookeeper,所以我们需要先安装zookeeper。kafka及zookeeper都需要是JAVA语言开发,需要使用JDK,推荐1.8版本。
2.1 ZooKeeper安装和配置
下载zookeeper安装:
1 2 3 4 5 6 7 8 9 10 11 |
cd /data/software/ rpm -ivh jdk-8u191-linux-x64.rpm wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5-bin.tar.gz cd /usr/local/ tar xvzf /data/software/apache-zookeeper-3.5.5-bin.tar.gz ln -s apache-zookeeper-3.5.5-bin/ zk cd zk/conf cp zoo_sample.cfg zoo.cfg |
zookeeper使用的配置文件为zoo.cfg,上面我们已经从zoo_sample.cfg复制了一份配置文件,现在修改它:
vim zoo.cfg
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/data1/appdata/zk dataLogDir=/data/log/zk # the port at which the clients will connect #dataDir和dataLogDir:dataDir和dataLogDir也需要区分下,将数据文件和日志文件分开存放,同时每个server的这两变量所对应的路径都是不同的 clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 #server.X和myid: server.X 这个数字就是对应,data/myid中的数字。在3个server的myid文件中分别写入了1,2,3,那么每个server中的zoo.cfg都配 server.1 server.2,server.3就行了。因为在同一台机器上,后面连着的2个端口,3个server都不要一样,否则端口冲突。 server.1=10.16.252.33:2288:3388 server.2=10.16.252.39:2288:3388 server.3=10.16.252.40:2288:3388 |
创建zookeeper需要使用的目录:
mkdir -p /data1/appdata/zk /data/log/zk
#生成myid,下面的ID要与配置文件内的server.x的ID x一致:
echo "1">/data1/appdata/zk/myid
启动zookeeper:
sh bin/zkServer.sh start
日志在logs目录下,有问题可以先看日志。
在其它2台服务器上,使用上面同样的方法安装后,整个zookeeper集群就安装和配置好了。
可以用下面的命令测试是否可用:
sh bin/zkCli.sh -server 10.16.252.40:2181
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
[root@sh-saas-k8s-kafka-online-03 zk]# sh bin/zkCli.sh -server 10.16.252.40:2181 /bin/java Connecting to 10.16.252.40:2181 2019-09-04 15:52:55,637 [myid:] - INFO [main:Environment@109] - Client environment:zookeeper.version=3.5.5-390fe37ea45dee01bf87dc1c042b5e3dcce88653, built on 05/03/2019 12:07 GMT 2019-09-04 15:52:55,640 [myid:] - INFO [main:Environment@109] - Client environment:host.name=sh-saas-k8s-kafka-online-03 2019-09-04 15:52:55,640 [myid:] - INFO [main:Environment@109] - Client environment:java.version=1.8.0_191 2019-09-04 15:52:55,642 [myid:] - INFO [main:Environment@109] - Client environment:java.vendor=Oracle Corporation 2019-09-04 15:52:55,642 [myid:] - INFO [main:Environment@109] - Client environment:java.home=/usr/java/jdk1.8.0_191-amd64/jre 2019-09-04 15:52:55,642 [myid:] - INFO [main:Environment@109] - Client environment:java.class.path=/usr/local/zk/bin/../zookeeper-server/target/classes:/usr/local/zk/bin/../build/classes:/usr/local/zk/bin/../zookeeper-server/target/lib/*.jar:/usr/local/zk/bin/../build/lib/*.jar:/usr/local/zk/bin/../lib/zookeeper-jute-3.5.5.jar:/usr/local/zk/bin/../lib/zookeeper-3.5.5.jar:/usr/local/zk/bin/../lib/slf4j-log4j12-1.7.25.jar:/usr/local/zk/bin/../lib/slf4j-api-1.7.25.jar:/usr/local/zk/bin/../lib/netty-all-4.1.29.Final.jar:/usr/local/zk/bin/../lib/log4j-1.2.17.jar:/usr/local/zk/bin/../lib/json-simple-1.1.1.jar:/usr/local/zk/bin/../lib/jline-2.11.jar:/usr/local/zk/bin/../lib/jetty-util-9.4.17.v20190418.jar:/usr/local/zk/bin/../lib/jetty-servlet-9.4.17.v20190418.jar:/usr/local/zk/bin/../lib/jetty-server-9.4.17.v20190418.jar:/usr/local/zk/bin/../lib/jetty-security-9.4.17.v20190418.jar:/usr/local/zk/bin/../lib/jetty-io-9.4.17.v20190418.jar:/usr/local/zk/bin/../lib/jetty-http-9.4.17.v20190418.jar:/usr/local/zk/bin/../lib/javax.servlet-api-3.1.0.jar:/usr/local/zk/bin/../lib/jackson-databind-2.9.8.jar:/usr/local/zk/bin/../lib/jackson-core-2.9.8.jar:/usr/local/zk/bin/../lib/jackson-annotations-2.9.0.jar:/usr/local/zk/bin/../lib/commons-cli-1.2.jar:/usr/local/zk/bin/../lib/audience-annotations-0.5.0.jar:/usr/local/zk/bin/../zookeeper-*.jar:/usr/local/zk/bin/../zookeeper-server/src/main/resources/lib/*.jar:/usr/local/zk/bin/../conf: 2019-09-04 15:52:55,642 [myid:] - INFO [main:Environment@109] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib 2019-09-04 15:52:55,642 [myid:] - INFO [main:Environment@109] - Client environment:java.io.tmpdir=/tmp 2019-09-04 15:52:55,642 [myid:] - INFO [main:Environment@109] - Client environment:java.compiler=<NA> 2019-09-04 15:52:55,642 [myid:] - INFO [main:Environment@109] - Client environment:os.name=Linux 2019-09-04 15:52:55,642 [myid:] - INFO [main:Environment@109] - Client environment:os.arch=amd64 2019-09-04 15:52:55,642 [myid:] - INFO [main:Environment@109] - Client environment:os.version=3.10.0-957.27.2.el7.x86_64 2019-09-04 15:52:55,642 [myid:] - INFO [main:Environment@109] - Client environment:user.name=root 2019-09-04 15:52:55,642 [myid:] - INFO [main:Environment@109] - Client environment:user.home=/root 2019-09-04 15:52:55,642 [myid:] - INFO [main:Environment@109] - Client environment:user.dir=/usr/local/apache-zookeeper-3.5.5-bin 2019-09-04 15:52:55,643 [myid:] - INFO [main:Environment@109] - Client environment:os.memory.free=235MB 2019-09-04 15:52:55,644 [myid:] - INFO [main:Environment@109] - Client environment:os.memory.max=245MB 2019-09-04 15:52:55,644 [myid:] - INFO [main:Environment@109] - Client environment:os.memory.total=245MB 2019-09-04 15:52:55,647 [myid:] - INFO [main:ZooKeeper@868] - Initiating client connection, connectString=10.16.252.40:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@6ae40994 2019-09-04 15:52:55,651 [myid:] - INFO [main:X509Util@79] - Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation 2019-09-04 15:52:55,656 [myid:] - INFO [main:ClientCnxnSocket@237] - jute.maxbuffer value is 4194304 Bytes 2019-09-04 15:52:55,663 [myid:] - INFO [main:ClientCnxn@1653] - zookeeper.request.timeout value is 0. feature enabled= Welcome to ZooKeeper! JLine support is enabled 2019-09-04 15:52:55,674 [myid:10.16.252.40:2181] - INFO [main-SendThread(10.16.252.40:2181):ClientCnxn$SendThread@1112] - Opening socket connection to server sh-saas-k8s-kafka-online-03/10.16.252.40:2181. Will not attempt to authenticate using SASL (unknown error) 2019-09-04 15:52:55,727 [myid:10.16.252.40:2181] - INFO [main-SendThread(10.16.252.40:2181):ClientCnxn$SendThread@959] - Socket connection established, initiating session, client: /10.16.252.40:11714, server: sh-saas-k8s-kafka-online-03/10.16.252.40:2181 2019-09-04 15:52:55,754 [myid:10.16.252.40:2181] - INFO [main-SendThread(10.16.252.40:2181):ClientCnxn$SendThread@1394] - Session establishment complete on server sh-saas-k8s-kafka-online-03/10.16.252.40:2181, sessionid = 0x300003cd6d40001, negotiated timeout = 30000 WATCHER:: WatchedEvent state:SyncConnected type:None path:null [zk: 10.16.252.40:2181(CONNECTED) 0] help ZooKeeper -server host:port cmd args addauth scheme auth close config [-c] [-w] [-s] connect host:port create [-s] [-e] [-c] [-t ttl] path [data] [acl] delete [-v version] path deleteall path delquota [-n|-b] path get [-s] [-w] path getAcl [-s] path history listquota path ls [-s] [-w] [-R] path ls2 path [watch] printwatches on|off quit reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*] redo cmdno removewatches path [-c|-d|-a] [-l] rmr path set [-s] [-v version] path data setAcl [-s] [-v version] [-R] path acl setquota -n|-b val path stat [-w] path sync path Command not found: Command not found help [zk: 10.16.252.40:2181(CONNECTED) 1] stat stat [-w] path [zk: 10.16.252.40:2181(CONNECTED) 2] config -s server.1=10.16.252.33:2288:3388:participant server.2=10.16.252.39:2288:3388:participant server.3=10.16.252.40:2288:3388:participant version=0 cZxid = 0x0 ctime = Thu Jan 01 08:00:00 CST 1970 mZxid = 0x0 mtime = Wed Sep 04 15:40:12 CST 2019 pZxid = 0x0 cversion = 0 dataVersion = -1 aclVersion = -1 ephemeralOwner = 0x0 dataLength = 141 numChildren = 0 [zk: 10.16.252.40:2181(CONNECTED) 3] quit WATCHER:: WatchedEvent state:Closed type:None path:null 2019-09-04 15:53:37,175 [myid:] - INFO [main:ZooKeeper@1422] - Session: 0x300003cd6d40001 closed 2019-09-04 15:53:37,175 [myid:] - INFO [main-EventThread:ClientCnxn$EventThread@524] - EventThread shut down for session: 0x300003cd6d40001 [root@sh-saas-k8s-kafka-online-03 zk]# |
2.2 kafka安装和配置
下载安装:
1 2 3 4 5 6 7 |
cd /data/software/ wget https://archive.apache.org/dist/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz cd /usr/local/ tar xzvf /data/software/kafka_2.11-0.9.0.1.tgz ln -s kafka_2.11-0.9.0.1/ kafka |
配置kafka:
1 2 3 |
cd /usr/local/kafka vim config/server.properties |
server.properties配置文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
#broker.id在集群内每台应该都不一样 broker.id=1 host.name=10.16.252.39 listeners=PLAINTEXT://10.16.252.39:9092 num.network.threads=16 num.io.threads=32 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data1/appdata/kafka num.partitions=6 num.recovery.threads.per.data.dir=1 log.flush.interval.messages=10000 log.flush.interval.ms=1000 log.retention.hours=24 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=10.16.252.33:2181,10.16.252.39:2181,10.16.252.40:2181 zookeeper.connection.timeout.ms=6000 default.replication.factor=2 delete.topic.enable=true |
创建存储目录:
mkdir -p /data1/appdata/kafka
配置JVM内存:
1 2 3 |
vim bin/kafka-server-start.sh export KAFKA_HEAP_OPTS="-Xmx31G -Xms31G" |
启动:
sh bin/kafka-server-start.sh -daemon config/server.properties
停止:
sh bin/kafka-server-stop.sh
其它2台机器也与同样的方法安装及配置后,就完成集群的安装了。
3. kafka压测
kafka自带了一个压测工具,在bin目录下,叫:kafka-producer-perf-test.sh。
在压测之前,先把压测脚本的JVM内存配置调大一点,默认为512M
vim bin/kafka-producer-perf-test.sh
1 2 3 4 |
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx8G" fi |
使用示例:
sh bin/kafka-producer-perf-test.sh --topic test_perf2 --num-records 1000000000 --record-size 1000 --throughput 2000000 --producer-props bootstrap.servers=10.16.252.33:9092,10.16.252.39:9092,10.16.252.40:9092
相关参数:
–topic topic名称,本例为test_perf2
–num-records 总共需要发送的消息数,本例为1000000000
–record-size 每个记录的字节数,本例为1000
–throughput 每秒钟发送的记录数,本例为2000000
–producer-props bootstrap.servers=10.16.252.33:9092,10.16.252.39:9092,10.16.252.40:9092 kafka的配置信息
现在开始压测:
topic分区数 | kafka配置参数 | record-size | 测试客户端数 | 测试结果 |
---|---|---|---|---|
3 | num.network.threads=16,num.io.threads=32 | 1000 | 1 | 24万 |
3 | num.network.threads=16,num.io.threads=32 | 1000 | 2 | 28万 |
3 | num.network.threads=16,num.io.threads=32 | 1000 | 3 | 36万 |
6 | num.network.threads=16,num.io.threads=32 | 1000 | 1 | 24万 |
6 | num.network.threads=16,num.io.threads=32 | 1000 | 2 | 38万 |
6 | num.network.threads=16,num.io.threads=32 | 1000 | 3 | 45万 |
6 | num.network.threads=16,num.io.threads=32 | 500 | 1 | 50万 |
6 | num.network.threads=16,num.io.threads=32 | 500 | 2 | 80万 |
6 | num.network.threads=16,num.io.threads=32 | 500 | 3 | 90万 |
12 | num.network.threads=16,num.io.threads=32 | 1000 | 1 | 24万 |
12 | num.network.threads=16,num.io.threads=32 | 1000 | 2 | 39万 |
12 | num.network.threads=16,num.io.threads=32 | 1000 | 3 | 46万 |
12 | num.network.threads=6,num.io.threads=8 | 1000 | 1 | 24万 |
12 | num.network.threads=6,num.io.threads=8 | 1000 | 2 | 39万 |
12 | num.network.threads=6,num.io.threads=8 | 1000 | 3 | 46万 |
总的来说,只有topic的分区能平均落到每台服务器上,不同参数对性能影响不大,除了分片数,这说明对kafka影响最大的是IO,网络IO及磁盘IO。记录的大小与性能成反比。
下面是其中一台的监控数据,另外说明一下,压测客户端也运行在这些服务器上。
今天换了一组服务器,同样是3台:
CPU:Intel(R) Xeon(R) CPU E5-2680 v4 @ 2.40GHz
内存:256G (每台只跑了一个实例,没有意义,只用了32G)
磁盘:12块ssd硬盘做raid10
测试结果:
topic分区数 | kafka配置参数 | record-size | 测试客户端数 | 测试结果 |
---|---|---|---|---|
12 | num.network.threads=16,num.io.threads=32 | 1000 | 1 | 53万 |
12 | num.network.threads=16,num.io.threads=32 | 1000 | 2 | 85万 |
12 | num.network.threads=16,num.io.threads=32 | 1000 | 3 | 120万 |
12 | num.network.threads=16,num.io.threads=32 | 2000 | 1 | 30万 |
12 | num.network.threads=16,num.io.threads=32 | 2000 | 2 | 50万 |
12 | num.network.threads=16,num.io.threads=32 | 2000 | 3 | 64万 |
可以看到还是SSD硬盘强悍,性能直接翻了一倍多。
另外也大致测试了一台服务器的IOPS:
读35k iops:
写25k iops:
但是压测时,kafka基本只能用于12K左右的iops: