1. 简介

Kafka专为分布式高吞吐量系统而设计。 作为一个更传统的消息代理的替代品。 与其他消息传递系统相比,Kafka具有更好的吞吐量,内置分区,复制和固有的容错能力,这使得它非常适合大规模消息处理应用程序。

在实际使用中,kafka的不同版本协议也不完全一致,不是向下兼容的,使用时需特别注意,我这里使用的是0.9.0.1的版本。

2. 服务器配置

本次安装的服务器为物理机3台:
CPU:双Intel(R) Xeon(R) Silver 4110 CPU @ 2.10GHz
内存:128G配置
10块SAS HDD 1T盘做raid10,挂载到/data1。

主机名 IP 备注
sh-saas-k8s-kafka-online-01 10.16.252.33 zookeeper,kafka
sh-saas-k8s-kafka-online-02 10.16.252.39 zookeeper,kafka
sh-saas-k8s-kafka-online-03 10.16.252.40 zookeeper,kafka

2. kafka安装及配置

kafka需要用到zookeeper,所以我们需要先安装zookeeper。kafka及zookeeper都需要是JAVA语言开发,需要使用JDK,推荐1.8版本。

2.1 ZooKeeper安装和配置

下载zookeeper安装:

cd /data/software/
rpm -ivh jdk-8u191-linux-x64.rpm
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5-bin.tar.gz

cd /usr/local/
tar xvzf /data/software/apache-zookeeper-3.5.5-bin.tar.gz 

ln -s  apache-zookeeper-3.5.5-bin/ zk
cd zk/conf
cp zoo_sample.cfg zoo.cfg

zookeeper使用的配置文件为zoo.cfg,上面我们已经从zoo_sample.cfg复制了一份配置文件,现在修改它:
vim zoo.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/data1/appdata/zk
dataLogDir=/data/log/zk
# the port at which the clients will connect
#dataDir和dataLogDir:dataDir和dataLogDir也需要区分下,将数据文件和日志文件分开存放,同时每个server的这两变量所对应的路径都是不同的
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
#server.X和myid: server.X 这个数字就是对应,data/myid中的数字。在3个server的myid文件中分别写入了1,2,3,那么每个server中的zoo.cfg都配 server.1 server.2,server.3就行了。因为在同一台机器上,后面连着的2个端口,3个server都不要一样,否则端口冲突。
server.1=10.16.252.33:2288:3388
server.2=10.16.252.39:2288:3388
server.3=10.16.252.40:2288:3388

创建zookeeper需要使用的目录:
mkdir -p /data1/appdata/zk /data/log/zk

#生成myid,下面的ID要与配置文件内的server.x的ID x一致:
echo "1">/data1/appdata/zk/myid

启动zookeeper:
sh bin/zkServer.sh start

日志在logs目录下,有问题可以先看日志。

在其它2台服务器上,使用上面同样的方法安装后,整个zookeeper集群就安装和配置好了。

可以用下面的命令测试是否可用:
sh bin/zkCli.sh -server 10.16.252.40:2181

[root@sh-saas-k8s-kafka-online-03 zk]# sh bin/zkCli.sh -server 10.16.252.40:2181
/bin/java
Connecting to 10.16.252.40:2181
2019-09-04 15:52:55,637 [myid:] - INFO  [main:Environment@109] - Client environment:zookeeper.version=3.5.5-390fe37ea45dee01bf87dc1c042b5e3dcce88653, built on 05/03/2019 12:07 GMT
2019-09-04 15:52:55,640 [myid:] - INFO  [main:Environment@109] - Client environment:host.name=sh-saas-k8s-kafka-online-03
2019-09-04 15:52:55,640 [myid:] - INFO  [main:Environment@109] - Client environment:java.version=1.8.0_191
2019-09-04 15:52:55,642 [myid:] - INFO  [main:Environment@109] - Client environment:java.vendor=Oracle Corporation
2019-09-04 15:52:55,642 [myid:] - INFO  [main:Environment@109] - Client environment:java.home=/usr/java/jdk1.8.0_191-amd64/jre
2019-09-04 15:52:55,642 [myid:] - INFO  [main:Environment@109] - Client environment:java.class.path=/usr/local/zk/bin/../zookeeper-server/target/classes:/usr/local/zk/bin/../build/classes:/usr/local/zk/bin/../zookeeper-server/target/lib/*.jar:/usr/local/zk/bin/../build/lib/*.jar:/usr/local/zk/bin/../lib/zookeeper-jute-3.5.5.jar:/usr/local/zk/bin/../lib/zookeeper-3.5.5.jar:/usr/local/zk/bin/../lib/slf4j-log4j12-1.7.25.jar:/usr/local/zk/bin/../lib/slf4j-api-1.7.25.jar:/usr/local/zk/bin/../lib/netty-all-4.1.29.Final.jar:/usr/local/zk/bin/../lib/log4j-1.2.17.jar:/usr/local/zk/bin/../lib/json-simple-1.1.1.jar:/usr/local/zk/bin/../lib/jline-2.11.jar:/usr/local/zk/bin/../lib/jetty-util-9.4.17.v20190418.jar:/usr/local/zk/bin/../lib/jetty-servlet-9.4.17.v20190418.jar:/usr/local/zk/bin/../lib/jetty-server-9.4.17.v20190418.jar:/usr/local/zk/bin/../lib/jetty-security-9.4.17.v20190418.jar:/usr/local/zk/bin/../lib/jetty-io-9.4.17.v20190418.jar:/usr/local/zk/bin/../lib/jetty-http-9.4.17.v20190418.jar:/usr/local/zk/bin/../lib/javax.servlet-api-3.1.0.jar:/usr/local/zk/bin/../lib/jackson-databind-2.9.8.jar:/usr/local/zk/bin/../lib/jackson-core-2.9.8.jar:/usr/local/zk/bin/../lib/jackson-annotations-2.9.0.jar:/usr/local/zk/bin/../lib/commons-cli-1.2.jar:/usr/local/zk/bin/../lib/audience-annotations-0.5.0.jar:/usr/local/zk/bin/../zookeeper-*.jar:/usr/local/zk/bin/../zookeeper-server/src/main/resources/lib/*.jar:/usr/local/zk/bin/../conf:
2019-09-04 15:52:55,642 [myid:] - INFO  [main:Environment@109] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2019-09-04 15:52:55,642 [myid:] - INFO  [main:Environment@109] - Client environment:java.io.tmpdir=/tmp
2019-09-04 15:52:55,642 [myid:] - INFO  [main:Environment@109] - Client environment:java.compiler=<NA>
2019-09-04 15:52:55,642 [myid:] - INFO  [main:Environment@109] - Client environment:os.name=Linux
2019-09-04 15:52:55,642 [myid:] - INFO  [main:Environment@109] - Client environment:os.arch=amd64
2019-09-04 15:52:55,642 [myid:] - INFO  [main:Environment@109] - Client environment:os.version=3.10.0-957.27.2.el7.x86_64
2019-09-04 15:52:55,642 [myid:] - INFO  [main:Environment@109] - Client environment:user.name=root
2019-09-04 15:52:55,642 [myid:] - INFO  [main:Environment@109] - Client environment:user.home=/root
2019-09-04 15:52:55,642 [myid:] - INFO  [main:Environment@109] - Client environment:user.dir=/usr/local/apache-zookeeper-3.5.5-bin
2019-09-04 15:52:55,643 [myid:] - INFO  [main:Environment@109] - Client environment:os.memory.free=235MB
2019-09-04 15:52:55,644 [myid:] - INFO  [main:Environment@109] - Client environment:os.memory.max=245MB
2019-09-04 15:52:55,644 [myid:] - INFO  [main:Environment@109] - Client environment:os.memory.total=245MB
2019-09-04 15:52:55,647 [myid:] - INFO  [main:ZooKeeper@868] - Initiating client connection, connectString=10.16.252.40:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@6ae40994
2019-09-04 15:52:55,651 [myid:] - INFO  [main:X509Util@79] - Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation
2019-09-04 15:52:55,656 [myid:] - INFO  [main:ClientCnxnSocket@237] - jute.maxbuffer value is 4194304 Bytes
2019-09-04 15:52:55,663 [myid:] - INFO  [main:ClientCnxn@1653] - zookeeper.request.timeout value is 0. feature enabled=
Welcome to ZooKeeper!
JLine support is enabled
2019-09-04 15:52:55,674 [myid:10.16.252.40:2181] - INFO  [main-SendThread(10.16.252.40:2181):ClientCnxn$SendThread@1112] - Opening socket connection to server sh-saas-k8s-kafka-online-03/10.16.252.40:2181. Will not attempt to authenticate using SASL (unknown error)
2019-09-04 15:52:55,727 [myid:10.16.252.40:2181] - INFO  [main-SendThread(10.16.252.40:2181):ClientCnxn$SendThread@959] - Socket connection established, initiating session, client: /10.16.252.40:11714, server: sh-saas-k8s-kafka-online-03/10.16.252.40:2181
2019-09-04 15:52:55,754 [myid:10.16.252.40:2181] - INFO  [main-SendThread(10.16.252.40:2181):ClientCnxn$SendThread@1394] - Session establishment complete on server sh-saas-k8s-kafka-online-03/10.16.252.40:2181, sessionid = 0x300003cd6d40001, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: 10.16.252.40:2181(CONNECTED) 0] help
ZooKeeper -server host:port cmd args
        addauth scheme auth
        close 
        config [-c] [-w] [-s]
        connect host:port
        create [-s] [-e] [-c] [-t ttl] path [data] [acl]
        delete [-v version] path
        deleteall path
        delquota [-n|-b] path
        get [-s] [-w] path
        getAcl [-s] path
        history 
        listquota path
        ls [-s] [-w] [-R] path
        ls2 path [watch]
        printwatches on|off
        quit 
        reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]
        redo cmdno
        removewatches path [-c|-d|-a] [-l]
        rmr path
        set [-s] [-v version] path data
        setAcl [-s] [-v version] [-R] path acl
        setquota -n|-b val path
        stat [-w] path
        sync path
Command not found: Command not found help
[zk: 10.16.252.40:2181(CONNECTED) 1] stat
stat [-w] path
[zk: 10.16.252.40:2181(CONNECTED) 2] config -s
server.1=10.16.252.33:2288:3388:participant
server.2=10.16.252.39:2288:3388:participant
server.3=10.16.252.40:2288:3388:participant
version=0
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Wed Sep 04 15:40:12 CST 2019
pZxid = 0x0
cversion = 0
dataVersion = -1
aclVersion = -1
ephemeralOwner = 0x0
dataLength = 141
numChildren = 0
[zk: 10.16.252.40:2181(CONNECTED) 3] quit

WATCHER::

WatchedEvent state:Closed type:None path:null
2019-09-04 15:53:37,175 [myid:] - INFO  [main:ZooKeeper@1422] - Session: 0x300003cd6d40001 closed
2019-09-04 15:53:37,175 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@524] - EventThread shut down for session: 0x300003cd6d40001
[root@sh-saas-k8s-kafka-online-03 zk]# 

2.2 kafka安装和配置

下载安装:

cd /data/software/
wget https://archive.apache.org/dist/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
cd /usr/local/
tar xzvf /data/software/kafka_2.11-0.9.0.1.tgz
ln -s kafka_2.11-0.9.0.1/ kafka

配置kafka:

cd /usr/local/kafka
vim config/server.properties

server.properties配置文件:

#broker.id在集群内每台应该都不一样
broker.id=1
host.name=10.16.252.39
listeners=PLAINTEXT://10.16.252.39:9092
num.network.threads=16
num.io.threads=32
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data1/appdata/kafka
num.partitions=6
num.recovery.threads.per.data.dir=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

zookeeper.connect=10.16.252.33:2181,10.16.252.39:2181,10.16.252.40:2181
zookeeper.connection.timeout.ms=6000
default.replication.factor=2
delete.topic.enable=true

创建存储目录:
mkdir -p /data1/appdata/kafka

配置JVM内存:

vim bin/kafka-server-start.sh
export KAFKA_HEAP_OPTS="-Xmx31G -Xms31G"

启动:
sh bin/kafka-server-start.sh -daemon config/server.properties

停止:
sh bin/kafka-server-stop.sh

其它2台机器也与同样的方法安装及配置后,就完成集群的安装了。

3. kafka压测

kafka自带了一个压测工具,在bin目录下,叫:kafka-producer-perf-test.sh。
在压测之前,先把压测脚本的JVM内存配置调大一点,默认为512M
vim bin/kafka-producer-perf-test.sh

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx8G"
fi

使用示例:
sh bin/kafka-producer-perf-test.sh --topic test_perf2 --num-records 1000000000 --record-size 1000 --throughput 2000000 --producer-props bootstrap.servers=10.16.252.33:9092,10.16.252.39:9092,10.16.252.40:9092
相关参数:
–topic topic名称,本例为test_perf2
–num-records 总共需要发送的消息数,本例为1000000000
–record-size 每个记录的字节数,本例为1000
–throughput 每秒钟发送的记录数,本例为2000000
–producer-props bootstrap.servers=10.16.252.33:9092,10.16.252.39:9092,10.16.252.40:9092 kafka的配置信息

现在开始压测:

topic分区数 kafka配置参数 record-size 测试客户端数 测试结果
3 num.network.threads=16,num.io.threads=32 1000 1 24万
3 num.network.threads=16,num.io.threads=32 1000 2 28万
3 num.network.threads=16,num.io.threads=32 1000 3 36万
6 num.network.threads=16,num.io.threads=32 1000 1 24万
6 num.network.threads=16,num.io.threads=32 1000 2 38万
6 num.network.threads=16,num.io.threads=32 1000 3 45万
6 num.network.threads=16,num.io.threads=32 500 1 50万
6 num.network.threads=16,num.io.threads=32 500 2 80万
6 num.network.threads=16,num.io.threads=32 500 3 90万
12 num.network.threads=16,num.io.threads=32 1000 1 24万
12 num.network.threads=16,num.io.threads=32 1000 2 39万
12 num.network.threads=16,num.io.threads=32 1000 3 46万
12 num.network.threads=6,num.io.threads=8 1000 1 24万
12 num.network.threads=6,num.io.threads=8 1000 2 39万
12 num.network.threads=6,num.io.threads=8 1000 3 46万

总的来说,只有topic的分区能平均落到每台服务器上,不同参数对性能影响不大,除了分片数,这说明对kafka影响最大的是IO,网络IO及磁盘IO。记录的大小与性能成反比。
下面是其中一台的监控数据,另外说明一下,压测客户端也运行在这些服务器上。

今天换了一组服务器,同样是3台:
CPU:Intel(R) Xeon(R) CPU E5-2680 v4 @ 2.40GHz
内存:256G (每台只跑了一个实例,没有意义,只用了32G)
磁盘:12块ssd硬盘做raid10

测试结果:

topic分区数 kafka配置参数 record-size 测试客户端数 测试结果
12 num.network.threads=16,num.io.threads=32 1000 1 53万
12 num.network.threads=16,num.io.threads=32 1000 2 85万
12 num.network.threads=16,num.io.threads=32 1000 3 120万
12 num.network.threads=16,num.io.threads=32 2000 1 30万
12 num.network.threads=16,num.io.threads=32 2000 2 50万
12 num.network.threads=16,num.io.threads=32 2000 3 64万

可以看到还是SSD硬盘强悍,性能直接翻了一倍多。

另外也大致测试了一台服务器的IOPS:
读35k iops:

写25k iops:

但是压测时,kafka基本只能用于12K左右的iops: