Kafka是高性能的分布式流处理平台,它的特点有:
- 类似于传统的消息队列,为海量流式数据提供了消息发布/订阅模型。
- 支持容错的流式数据存储。
- 流式数据的实时处理。
Kafka是一款吞吐性能非常优秀的分布式流处理系。虽然吞吐性能优秀,但Kafka的处理延迟较高,一般多用于日志等离线处理,不会用于实时的消息队列系统。
本节将讨论Kafka集群的部署。
与我们之前讨论的MySQL、Memcached等组件稍有不同:
- Kafka对I/O资源消耗较大,使用Volume挂载的方式,存在一定性能损耗。
- 并且Kafka本身内置了高可用、集群的功能。
- Kafka依赖Zookeeper,后者对资源波动较为敏感,一般需要独立部署。
综上所述,对于Kafka和其依赖的ZooKeeper,我们将在服务器上独立部署,而不会将其部署在Kubernetes集群中。
准备Java环境
我们假设你手里的是仅安装了操作系统的"裸机"服务器,在这里,我们以以Ubuntu 18.04为例进行讲解。
首先准备Java的apt源
sudo add-apt-repository -y ppa:webupd8team/java
sudo apt update
然后,自动同意许可、自动安装
echo debconf shared/accepted-oracle-license-v1-1 select true | sudo debconf-set-selections
echo debconf shared/accepted-oracle-license-v1-1 seen true | sudo debconf-set-selections
sudo apt install -y oracle-java8-set-default
安装好后,我们验证一下
java -version
java version "1.8.0_171"
Java(TM) SE Runtime Environment (build 1.8.0_171-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode)
部署Kafka集群至少需要6台机器,3台给Zookeeper,另外3台给Kafka的Broker。
为了说明方便,我们假设6台机器的主机名分别为zk1~zk3,kafka1~kafka3
请在6台机器上都进行上述Java 8的安装。
准备主机环境
zk和kafka集群的部署,都依赖2个先决条件:
- 主机之间必须支持内网访问
- 内网可以通过hostname直接访问
由于是在同一个机房内部署,所以我们假设上述条件1是满足的。
对于条件2,有多种实现方案,我们这里采用最传统的hostname修改方式。
对于上述6台主机,内网IP分别为:
- z1~zk3:192.168.0.11 ~ 192.168.0.13
- kafka1~kafka3:192.168.0.21 ~ 192.168.0.23
则我们修改6台主机的hosts文件如下:
sudo vim /etc/hosts
127.0.0.1 localhost
#127.0.1.1 zk2
# The following lines are desirable for IPv6 capable hosts
::1 localhost ip6-localhost ip6-loopback
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters
192.168.0.11 zk1
192.168.0.12 zk2
192.168.0.13 zk3
192.168.0.21 kafka1
192.168.0.22 kafka2
192.168.0.23 kafka3
修改后,任意一台机器应该都可以通过hostname来ping通其他主机,例如:
zk1$ ping kafka2
PING baidu.com (192.168.0.22) 56(84) bytes of data.
64 bytes from 192.168.0.22: icmp_seq=1 ttl=47 time=10.0 ms
...
注意,在上面的配置中,我们还去掉127.0.1.1的映射,最终效果是ping也会返回内网ip,而不是127.0.1.1:
ping zk2
PING zk2 (192.168.0.12) 56(84) bytes of data.
64 bytes from zk2 (192.168.0.12): icmp_seq=1 ttl=64 time=0.046 ms
部署Zookeeper
接下来,我们将在zk1~zk3上部署zookeeper,请确认这三台机器已经安装了Java 8。
首先为zookeeper创建本地用户,在zk1~zk3上分别执行:
useradd -m zookeeper
下载并解压到本地,同样在zk1~zk3上分别执行:
su zookeeper
cd $HOME
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz
tar -xzvf ./zookeeper-3.4.12.tar.gz
ln -s zookeeper-3.4.12 zookeeper
mkdir /home/zookeeper/zookeeper_data
注意最后创建了一个文件夹,用于储存zk的数据文件
为zk1和zk3添加不同的id
zookeeper@zk1:~$ echo "1" > /home/zookeeper/zookeeper_data/myid
zookeeper@zk2:~$ echo "2" > /home/zookeeper/zookeeper_data/myid
zookeeper@zk3:~$ echo "3" > /home/zookeeper/zookeeper_data/myid
在zk1~zk3上分别添加配置
vim /home/zookeeper/zookeeper/conf/zoo.cfg
tickTime=2000
dataDir=/home/zookeeper/zookeeper_data
clientPort=2181
initLimit=5
syncLimit=2
server.1=zk1:2888:3888
server.2=zk2:2888:3888
server.3=zk3:2888:3888
在zk1~zk3上启动zookeeper
cd /home/zookeeper/zookeeper/bin
./zkServer.sh start
启动后,可以在zookeeper.out中查看错误输出日志。
如果一切正常,我们用客户端尝试连接。
./zookeeper/bin/zkCli.sh -server zk1:2181,zk2:2181,zk3:2181
....
[zk: zk1:2181,zk2:2181,zk3:2181(CONNECTED) 0]
...
如上如果能显示"CONNECTED",就是连接成功了。
我们尝试创建结点,也能成功:
[zk: zk1:2181,zk2:2181,zk3:2181(CONNECTED) 5] create /hello world
Created /hello
[zk: zk1:2181,zk2:2181,zk3:2181(CONNECTED) 6] get /hello
world
cZxid = 0x100000002
ctime = Tue Jun 12 11:36:22 UTC 2018
mZxid = 0x100000002
mtime = Tue Jun 12 11:36:22 UTC 2018
pZxid = 0x100000002
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0
至此,我们已经完成了zookeeper集群的配置。
Kafka集群配置
首先为kafka创建本地用户,在kafka1~kafka3上分别执行:
useradd -m kafka
下载kafka并解压缩
su kafka
cd $HOME
wget http://www-eu.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
tar -xzvf kafka_2.11-1.1.0.tgz
ln -s kafka_2.11-1.1.0 kafka
创建数据目录
mkdir /home/kafka/kafka_logs
配置文件(kafka1)
vim kafka/config/server.properties
broker.id=1
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
listeners=PLAINTEXT://kafka1:9092
log.dirs=/home/kafka/kafka_logs
分别在kafka1~kafka3上启动
cd $HOME
kafka/bin/kafka-server-start.sh -daemon ./kafka/config/server.properties
创建队列(topic)
kafka/bin/kafka-topics.sh --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --replication-factor 2 --partitions 3 --topic topic1
Created topic "topic1".
查看队列(topic)
kafka/bin/kafka-topics.sh --describe --zookeeper zk1:2181,zk2:2181,zk3:2181 --topic topic1
Topic:topic1 PartitionCount:3 ReplicationFactor:2 Configs:
Topic: topic1 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: topic1 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
列出所有队列(topic)
kafka/bin/kafka-topics.sh --list --zookeeper zk1:2181,zk2:2181,zk3:2181
topic1
生产消息
kafka/bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic topic1
>a
>b
消费消息
kafka/bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2181,zk3:2181 --topic topic1 --from-beginning
a
b
删除队列
kafka/bin/kafka-topics.sh --delete --zookeeper zk1:2181,zk2:2181,zk3:2181 --topic topic1
Topic topic1 is marked for deletion.
至此,我们完成了Kafka的集群配置,更多内容可以参考Kafka 官方文档。
下一节:在上一节,我们介绍了分布式流处理平台Kafka的运维工作,在这一节,我们将讨论Kafka的应用开发。