Kafka
Kafka
MR.XSS消息队列
交互模型
消息队列两种模式
点对点模式
点对点模式特点:
每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中)
发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;
发布订阅模式
发布/订阅模式特点:
每个消息可以有多个订阅者;
发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行;
Kafka
简介
Kafka是由Apache软件基金会开发的一个开源流平台,由Scala和Java编写
Apache Kafka是一个分布式流平台。一个分布式的流平台应该包含3点关键的能力:
发布和订阅流数据流,类似于消息队列或者是企业消息传递系统
以容错的持久化方式存储数据流
处理数据流
上图,我们可以看到:
Producers:可以有很多的应用程序,将消息数据放入到Kafka集群中。
Consumers:可以有很多的应用程序,将消息数据从Kafka集群中拉取出来。
Connectors:Kafka的连接器可以将数据库中的数据导入到Kafka,也可以将Kafka的数据导出到
数据库中。
- Stream Processors:流处理器可以Kafka中拉取数据,也可以将数据写入到Kafka中。
应用场景
- 消息队列
- 流量削峰
- 日志处理
- 异步任务
- 流处理
为什么选择Kafka
- 多生产者、多消费者
- 基于磁盘存储,磁盘持久化
- 高伸缩性
- 高性能
搭建Kafka环境
介绍
搭建Kafka必须需要JDK 、Zookeeper 和 Kafka
1、安装jdk
可以直接一键云安装jdk、也可以自己下载安装包进行安装、此处为了方便,采用云安装方式
1 | yum install -y java-1.8.0-openjdk.x86_64 |
2、 安装Zookeeper
下载Zookeeper安装包
官网 https://zookeeper.apache.org/
直接使用wegt命令进行下载
1
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz
解压ZK
1
tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz
修改配置文件
由于服务启动的时候默认回去读conf下的zoo.cfg配置文件,如果没有直接会报错!
刚下载的zookeeper的conf目录下是没有zoo.cfg,但是给我们提供了zoo_sample.cfg(模板配置文件)记得修改指定Data文件位置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24# zookeeper时间配置中的基本单位 (毫秒)
tickTime=2000
# 允许follower初始化连接到leader最⼤时⻓,它表示tickTime时间倍数
# 即:initLimit*tickTime
initLimit=10
# 允许follower与leader数据同步最⼤时⻓,它表示tickTime时间倍数
syncLimit=5
#zookeper 数据存储⽬录及⽇志保存⽬录(如果没有指明dataLogDir,则⽇志也保存在这个⽂件中)
dataDir=/tmp/zookeeper
#对客户端提供的端⼝号
clientPort=2181
#单个客户端与zookeeper最⼤并发连接数
maxClientCnxns=60
# 保存的数据快照数量,之外的将会被清除
autopurge.snapRetainCount=3
#⾃动触发清除任务时间间隔,⼩时为单位。默认为0,表示不⾃动清除。
autopurge.purgeInterval=1配置环境变量
在/etc/profile目录下面进行环境变量的配置, 直接追加到结尾也可以不进行配置,在bin目录下面执行命令即可
1
2
3#zookeeper
export ZOOKEEPER_HOME=/opt/module/zookeeper-3.7.1
export PATH=$PATH:${ZOOKEEPER_HOME}/bin编辑完成之后记得执行以下命令,使其立即生效
1
source /etc/profile
ZK命令
1
2
3zkServer.sh start #启动
zkServer.sh status #查看状态
zkServer.sh stop #停止启动zkCli.sh客户端
zkCli.sh可以理解成客户端,也可以理解成命令行工具,把命令交给他,让他和zk的服务端打交道。
类似于mysql,我们安装完mysql想要执行命令,那么就必须要通过mysql -u账号 -p密码进入命令行工具里面,才能执行sql。1
2zkCli.sh #启动客户端
ls / #查询节点
3、安装Kafka
上传Kafka安装包并且解压
1
tar -xvzf kafka_2.12-2.4.1.tgz -C ../server/
修改 server.properties
1 | # 指定broker的id |
配置Kafka环境变量
1
2
3#环境变量
export KAFKA_HOME=/export/server/kafka_2.12-2.4.1
export PATH=:$PATH:${KAFKA_HOME}启动Kafka并且检查是否成功
1
2kafka-server-start.sh -daemon ../config/server.properties #启动
ps -aux | grep server.properties #查看是否成功
Kafka基础概念
概念一:生产者与消费者
对于 Kafka 来说客户端有两种基本类型:生产者(Producer)和消费者(Consumer)。除此之外,还有用来做数据集成的 Kafka Connect API 和流式处理的 Kafka Streams 等高阶客户端,但这些高阶客户端底层仍然是生产者和消费者API,它们只不过是在上层做了封装。
这很容易理解,生产者(也称为发布者)创建消息,而消费者(也称为订阅者)负责消费or读取消息。
概念二:主题(Topic)与分区(Partition)
在 Kafka 中,消息以主题(Topic)来分类,每一个主题都对应一个「消息队列」,这有点儿类似于数据库中的表。但是如果我们把所有同类的消息都塞入到一个“中心”队列中,势必缺少可伸缩性,无论是生产者/消费者数目的增加,还是消息数量的增加,都可能耗尽系统的性能或存储。
我们使用一个生活中的例子来说明:现在 A 城市生产的某商品需要运输到 B 城市,走的是公路,那么单通道的高速公路不论是在「A 城市商品增多」还是「现在 C 城市也要往 B 城市运输东西」这样的情况下都会出现「吞吐量不足」的问题。所以我们现在引入分区(Partition)的概念,类似“允许多修几条道”的方式对我们的主题完成了水平扩展。
概念三:Broker 和集群(Cluster)
一个 Kafka 服务器也称为 Broker,它接受生产者发送的消息并存入磁盘;Broker 同时服务消费者拉取分区消息的请求,返回目前已经提交的消息。使用特定的机器硬件,一个 Broker 每秒可以处理成千上万的分区和百万量级的消息。
若干个 Broker 组成一个集群(Cluster),其中集群内某个 Broker 会成为集群控制器(Cluster Controller),它负责管理集群,包括分配分区到 Broker、监控 Broker 故障等。在集群内,一个分区由一个 Broker 负责,这个 Broker 也称为这个分区的 Leader;当然一个分区可以被复制到多个 Broker 上来实现冗余,这样当存在 Broker 故障时可以将其分区重新分配到其他 Broker 来负责。下图是一个样例:
Kafka 的一个关键性质是日志保留(retention),我们可以配置主题的消息保留策略,譬如只保留一段时间的日志或者只保留特定大小的日志。当超过这些限制时,老的消息会被删除。我们也可以针对某个主题单独设置消息过期策略,这样对于不同应用可以实现个性化。
概念四:多集群
随着业务发展,我们往往需要多集群,通常处于下面几个原因:
- 基于数据的隔离;
- 基于安全的隔离;
- 多数据中心(容灾)
当构建多个数据中心时,往往需要实现消息互通。举个例子,假如用户修改了个人资料,那么后续的请求无论被哪个数据中心处理,这个更新需要反映出来。又或者,多个数据中心的数据需要汇总到一个总控中心来做数据分析。
上面说的分区复制冗余机制只适用于同一个 Kafka 集群内部,对于多个 Kafka 集群消息同步可以使用 Kafka 提供的 MirrorMaker 工具。本质上来说,MirrorMaker 只是一个 Kafka 消费者和生产者,并使用一个队列连接起来而已。它从一个集群中消费消息,然后往另一个集群生产消息。