跳至主要內容

分布式消息队列系统Kafka-1

ZnyoungJava大数据Kafka

参考文档

分布式实时消息队列Kafka(一)

知识点01:课程回顾

知识点02:课程目标

【模块二:消息队列的介绍】

知识点05:【了解】消息队列:MQ介绍

  • 目标了解消息队列的功能,应用场景及特点
  • 实施
    • 举个栗子:让A将数据传递给B、C、D三个系统分别做不同的数据处理
      • 常规做法
        image-20220118153940268.png
      • 问题
        • 读写速度:不匹配,导致接受数据的一方数据丢失或者故障,影响需求的实现
        • 架构变化:高耦合,频繁的需要将数据生产者停止运行,修改代码,影响业务
        • 串行处理:慢,挨个实现发送,按照顺序一个一个来实现数据传输
      • 解决:消息中间件=消息队列
        image-20220118154223099.png
    • 定义
      • 官方定义:消息队列是一种异步的服务间通信方式,是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
      • 简单点说:消息队列MQ用于实现两个系统之间或者两个模块之间传递消息数据时,实现数据缓存
    • 功能
      • 基于队列的方式,实现消息传递的数据缓存
      • 保证消息传递有序性:队列先进先出,消息队列的实现先产生的先消费
    • 应用场景
      • 用于所有需要实现实时、高性能、高吞吐、高可靠的消息传递架构中
      • 大数据应用中:作为唯一的实时数据存储平台
    • 优点
      • 实现了架构解耦:将架构中的高耦合转换为低耦合,更加灵活
      • 实现异步,提高传输性能:生产一次,所有消费都可以并行
      • 限流削峰:不用再考虑不同时间资源分配不均衡的问题
    • 缺点
      • 增加了消息队列,架构运维更加复杂:一旦消息队列故障,整个系统全部崩溃
      • 数据保证更加复杂,必须保证生产安全和消费安全:每一步都必须不丢失和不重复
  • 小结:了解消息队列的功能,应用场景及特点

知识点06:【掌握】消息队列:订阅发布模式

  • 目标掌握消息队列中消息传递的订阅发布模式
  • 实施
    image-20220722191636516.png
    • 角色
      • 生产者:负责往消息队列中写数据
      • 消息队列:临时存放两个系统之间需要传递的数据
      • 消费者:负责从消息队列中读数据
      • 主题:数据的分类,用于区分消息队列中不同的业务的数据
    • 流程
      • step1:生产者往消息队列中生产数据,将数据写入对应的主题中
      • step2:消费者可以订阅主题,如果主题中出现新的数据,消费就可以立即消费
    • 特点:一个消费者可以订阅多个主题,一个主题可以被多个消费者订阅
      • 消费成功以后,不会立即主动删除数据
      • 可以实现数据共享
    • 理解:订阅发布模式高度类似于微信公众号概念:一个公众号可以被多人订阅,一个人可以订阅多个公众号。
  • 小结:掌握消息队列中消息传递的订阅发布模式

【模块三:Kafka的介绍及核心概念】

知识点07:【掌握】Kafka的介绍及特点

  • 目标掌握Kafka的功能、特点及应用场景
  • 实施
    • 官网:kafka.apache.orgopen in new window
    • 公司:领英LinkedIn公司基于Scala语言开发的工具
      image-20220722201133710.png
    • 功能
      • 功能一:分布式流式数据实时存储:分布式消息队列系统
      • 功能二:分布式流式计算:分布式计算:KafkaStream
    • 定义分布式的基于订阅发布模式的高吞吐高性能的实时消息队列系统
    • 应用场景:大数据实时计算的架构中,实现实时数据缓存的场景
      • 目前只要做实时大数据,都必用Kafka或者Pulsar
    • 特点
      • 高性能:实时的对数据进行高性能的读写【内存+磁盘】
      • 高并发:分布式并行读写
      • 高吞吐:使用分布式磁盘存储
      • 高可靠:分布式主从架构
      • 高安全性:数据安全保障机制
        • 数据安全:存储在Kafka中的数据是相对安全的
        • 传输安全:从理论上能保证数据生产和消费满足一次性语义:不丢失不重复
      • 高灵活性:根据需求,随意添加生产者和消费者
  • 小结:Kafka在大数据中专门用于实现实时的数据存储,实现大数据实时计算

知识点08:【掌握】生产和消费

  • 目标掌握Kafka中的Producer、Broker、Consumer概念及其功能
  • 实施
    image-20210328161427289.png
    • Broker:Kafka是一个分布式集群,多台机器构成,每台Kafka的节点就是一个Broker
      • Kafka节点:Broker
      • Kafka进程:Kafka
    • Producer生产者
    • Consumer:消费者
    • Consumer GroupKafka的订阅发布模式中必须以消费者组的形式从Kafka中消费数据
      • 举个栗子:公司有一个Topic,存储所有用户行为的数据共100条数据
        • 数据分析希望拿到一份:必须构建一个数据分析的消费者组
          • Consumer Group1
consumer1:100条数据
     -  推荐系统希望拿到一份:必须构建一个推荐系统的消费者组 
        -  Consumer Group2 
consumer1:25条数据
consumer2:10条数据
consumer3:30条数据
consumer4:35条数据
     -  …… 
  -  **任何一个消费者必须属于某一个消费者组** 
  -  一个消费者组中可以有多个消费者:**多个消费者共同并行消费数据,提高消费性能** 
     - **消费者组中多个消费者消费的数据是不一样的**
     - **整个消费者组中所有消费者消费的数据加在一起是一份完整的数据**
image-20210629111241440.png
image-20210629111241440.png
  • 小结:掌握Kafka中的Producer、Broker、Consumer概念及其功能

知识点09:【掌握】Topic、Partition

  • 目标掌握Kafka中的Topic、Partition概念及其功能
  • 路径
image-20210328163508888.png
image-20210328163508888.png
  • 实施

    • Topic:数据主题,用于区分不同的数据,对数据进行分类
      • Topic是Kafka中分布式的数据存储对象,类似于数据库中表的概念,但是Topic是分布式的。
        • 类似于HDFS中文件的概念,不同的数据存储在不同的文件中,Kafka中不同的数据存储在不同的Topic中
      • Kafka是分布式存储,所以Kafka中的Topic也是分布式的概念,写入Topic的数据会分布式的存储在Kakfa中
        • 类似于HDFS中每个文件会被拆分成Block,每个Block会分散在不同的节点上存储。
  • Partition:数据分区,用于实现Topic的分布式存储,对Topic的数据进行划分

    • 每个分区存储在不同的Kafka节点Broker上
    • 例如上图中:Topic名称为T1,T1有三个分区:P0、P1、P2
    • 写入Topic:根据一定的规则决定写入哪个具体的分区
  • 类比HDFS
    | 概念 | HDFS | Kafka |
    | --- | --- | --- |
    | 节点 | NameNode、DataNode | Broker |
    | 对象 | 文件 | Topic |
    | 划分 | 分块:Block | 分区:Partition |
    | 规则 | 按大小拆分:128M | 自己指定 |

  • 小结:掌握Kafka中的Topic、Partition概念及其功能

知识点10:【掌握】分区副本机制

  • 目标掌握Kafka中的分区副本机制
  • 路径
image-20210328163508888.png
image-20210328163508888.png
  • 实施
    • 问题1:Kafka中的每个Topic的每个分区存储在不同的节点上,如果某个节点故障,怎么保证集群正常可用?
      • Kafka选用了副本机制来保证数据的安全性
      • Kafka每一个分区都可以有多个副本,类似于HDFS的副本机制,一个块构建多个副本
      • 如果某台机器故障,其他机器还有这个分区的副本,其他机器的副本照样可以对外提供客户端读写
      • 注意:Kafka中一个分区的副本个数最多只能等于机器的个数,相同分区的副本不允许放在同一台机器,没有意义
    • 问题2:一个分区有多个副本,读写这个分区的数据时候,到底读写哪个分区副本呢?
      • Kafka将一个分区的多个副本,划分为两种角色
      • Leader副本:负责对外提供读写,生产者和消费者只对leader副本进行读写
      • Follower副本:与Leader同步数据,如果leader故障,从follower新的leader副本对外提供读写
      • 设计:方式类似于ZK中节点的设计,但是这里不是做节点的选举而是分区副本的选举
      • 实现:由Kafka的主节点根据机器的健康状态、数据的完整性来选择Leader和Follower副本
  • 小结:掌握Kafka的分区副本机制

知识点11:【理解】Segment

  • 目标理解Kafka中的Segment概念及其功能
  • 路径
    image-20210328164220108.png
  • 实施
    • 本质:对每个分区的数据进行了更细的划分,用于存储Kafka中的数据并记录索引
    • 规则:先写入的数据会先生成一对Segment文件,存储到一定条件以后,后面数据写入另外一对Segment文件,每个文件就叫Segment文件对
    • 实现:每个Segment对应两种【三个】文件
      • xxxxxxxxx.log:存储数据
      • xxxxxxxxx.index/xxxxxxxx.timeindex:对应.log的文件的索引
    • 设计为了加快数据检索的效率
      • 将数据按照规则写入不同文件,以后可以根据规则快速的定位数据所在的文件
      • 读取对应的小的segment文件,不用读取所有数据文件
    • 举例
      • 如果不设计Segment,整个分区的所有数据都存储在一个文件中
part2.log => 10G => 1亿
  -  当文件越来越大,存储的数据越来越多,影响读的性能,会再构建一个新的segment,老的segment不再被写入 
--第一个Segment:1G = 20万条
00000000000000000000000.log  -- 0:这个文件中的最小的offset,也就是这个文件中存放的第一条数据偏移
00000000000000000000000.index
00000000000000000000000.timeindex

--第二个Segment:1G = 20万条
00000000000000000200000.log - 这个文件中最小的那条数据的offset为20万,就是这个分区的第20万01条数据
00000000000000000200000.index
00000000000000000200000.timeindex

--第三个Segment:
00000000000000000400000.log
00000000000000000400000.index
00000000000000000400000.timeindex

……
  -  Segment文件的名字就是这个Segment记录的offset的最小值 
     - 消费者消费数据是根据offset进行消费的
     - 消费者1:想消费分区1:offset偏移量 = 39999这个offset开始消费
     - 先根据文件文件名来判断我要的offset在哪个文件中
  • 小结:理解Kafka中的Segment概念及其功能

知识点12:【掌握】Offset

  • 目标掌握Kafka中的Offset概念及其功能
  • 实施
    • 消息队列:将两个系统之间传递的数据进行缓存
    • 问题:怎么能保证这个顺序性,先写入的数据先被读取呢?
      • A:生产者
a
b
c
d
……
  -  MQ:消息队列 
0-a
1-b
2-c
3-d
……
  -  B:消费者:只要按照编号进行顺序读取就可以保证消费的顺序 
a
b
c
d
……
  • 定义Offset是每条数据在自己分区中的偏移量
    • 写入分区的顺序就是offset偏移量,Offset是分区级别的,每个分区的offset独立管理,都从0开始
  • 消息队列:先进先出
    • 先写入的offset就越小
      • 第一条数据的offset就为0
    • 第二条数据的offset就为1
      • ……
      • 第N条数据的offset就为N-1
  • 生成:生产者往Kafka中写入数据,写入某个分区
    • 每个分区单独管理一套Offset【分区】,offset从0开始对每条数据进行编号
    • Kafka写入数据也是按照KV来写入数据
#Kafka中一条数据存储的结构
offset		Key				Value
  • 功能基于offset来指定数据的顺序,消费时候按照offset顺序来读取
    • 消费者消费Topic分区中的数据是按照offset进行分区级别顺序消费
    • 怎么保证不丢失不重复:只要保证消费者每次按照offset的顺序消费即可
      • 如果没有Offset
    • 从头取一遍:数据重复
  • 从最新的读:数据丢失
    • Kafka中所有消费者数据的读取都是按照Offset来读取数据
      image-20210629120053481.png
  • 小结:掌握Kafka中的Offset概念及其功能
    • Offset用于标记分区中的每条数据,消费者根据上一次消费的offset对分区继续进行消费,保证顺序
    • 实现保证数据不丢失不重复

知识点13:【掌握】概念总结

  • 目标掌握Kafka中的概念

  • 路径
    image-20210328162448721.png

  • 实施
    | Kafka | 解释 |
    | --- | --- |
    | Producer | Kafka生产者,负责往Kafka写数据的客户端 |
    | Consumer | Kafka消费者,负责从Kafka读取数据的客户端 |
    | ConsumerGroup | 消费者组,必须以消费组的形式才能消费,一个消费者组中可以包含多个消费者,任何一个消费者都必须属于某个消费组 |
    | Broker | Kafka节点,每个节点叫做一个Broker |
    | Topic | 主题,用于区分不同的数据,实现数据分类,分布式逻辑的概念,一个Topic可以对应多个分区 |
    | Partition | 分区,用于实现Kafka中Topic的分布式存储,每个分区可以分布在不同节点上,每个分区可以有多份 |
    | Replication | 副本,用于保证Kafka中分区的数据安全,每个分区中副本个数小于等于节点个数,两种角色:leader、follower |
    | Segment | 分区文件段,用于将分区中的数据按照一定的规则进行细分,加快查询性能,由两种文件组成,.log数据文件和.index索引文件 |
    | KV | Kafka中写入数据的结构也是KV |
    | Offset | 每条数据在分区中的偏移量,第N条数据的offset=n-1,用于保证消费者按照offset顺序消费,一致性 |

  • 小结:掌握Kafka中的概念

【模块四:Kafka集群架构及部署】

知识点14:【理解】Kafka集群架构及角色功能

  • 目标理解Kafka集群架构及角色功能
  • 实施
    966929b7708f42718311ab0289f75f86-1642492706417.jpg
    • Kafka中的每个角色以及对应的功能
      • 分布式主从架构【公平】:允许从节点选举成为主节点
      • 主:Kafka Broker 【Controller】
        • 是一种特殊的Broker,从所有Broker中选举出来的,负责普通Broker的工作
        • 负责管理所有从节点:Topic、分区和副本【Controller负责决定一个分区的leader副本和follower副本】
        • 每次启动集群,会从所有Broker中选举一个Controller,由ZK实现(谁能在ZK创建临时节点就是Controller)
      • 从:Kafka Broker
        • 对外提供读写请求
        • 其他的Broker监听Controller在ZK中创建的临时节点,如果Controller故障,会重新从Broker选举一个新的Controller
    • ZK的功能
      • 辅助选举Controller:启动时所有Broker都会到Zk中创建临时节点,谁创建成功谁就是主节点
      • 存储Kafka元数据
    • Kafka 3.0版本新特性:可以摆脱ZK,独立运行了
  • 小结:理解Kafka集群架构及角色功能

知识点15:【实现】Kafka分布式集群部署

cd /export/software/
rz
  -  解压 
# 解压安装
tar -zxvf kafka_2.12-2.4.1.tgz -C /export/server/
cd /export/server/kafka_2.12-2.4.1/
# Kafka数据存储的位置
mkdir datas
  • 修改配置
    • 切换到配置文件目录
cd /export/server/kafka_2.12-2.4.1/config
image-20210330091020349-1642492706417.png
image-20210330091020349-1642492706417.png
  -  修改server.properties:vim server.properties 
#21行:唯一的 服务端id
broker.id=0
#60行:指定kafka的日志及数据【segment【.log,.index】】存储的位置
log.dirs=/export/server/kafka_2.12-2.4.1/datas
#123行:指定zookeeper的地址
zookeeper.connect=node1:2181,node2:2181,node3:2181
#在最后添加两个配置,允许删除topic,当前kafkaServer的主机名
delete.topic.enable=true
host.name=node1
  -  分发 
cd /export/server/
scp -r kafka_2.12-2.4.1 node2:$PWD
scp -r kafka_2.12-2.4.1 node3:$PWD
  -  第二台:server.properties 
#21行:唯一的 服务端id
broker.id=1
#最后
host.name=node2
  -  第三台:server.properties 
#21行:唯一的 服务端id
broker.id=2
#最后
host.name=node3
  -  添加环境变量 
vim /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/export/server/kafka_2.12-2.4.1
export PATH=:$PATH:$KAFKA_HOME/bin
source /etc/profile
  • 小结:实现Kafka分布式集群的搭建部署
    • 按照笔记一步步来,不做过多要求,只要配置含义,实现安装即可

知识点16:【实现】Kafka启动与关闭

  • 目标实现Kafka启动与关闭
  • 实施
    • 启动Zookeeper
start-zk-all.sh
  • 启动Kafka (只能启动当前节点)
bin/kafka-server-start.sh config/server.properties >> /dev/null 2>&1 &
  • 关闭Kafka
bin/kafka-server-stop.sh 
  • 封装Kafka脚本
  • 启动脚本
vim /export/server/kafka_2.12-2.4.1/bin/start-kafka.sh
#!/bin/bash
KAFKA_HOME=/export/server/kafka_2.12-2.4.1

for number in {1..3}
do
host=node${number}
echo ${host}
/usr/bin/ssh ${host} "cd ${KAFKA_HOME};source /etc/profile;export JMX_PORT=9988;${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >>/dev/null 2>&1 &"
echo "${host} started"
done
chmod u+x /export/server/kafka_2.12-2.4.1/bin/start-kafka.sh
  • 关闭脚本
vim /export/server/kafka_2.12-2.4.1/bin/stop-kafka.sh
#!/bin/bash
KAFKA_HOME=/export/server/kafka_2.12-2.4.1

for number in {1..3}
do
host=node${number}
echo ${host}
/usr/bin/ssh ${host} "cd ${KAFKA_HOME};source /etc/profile;${KAFKA_HOME}/bin/kafka-server-stop.sh"
echo "${host} stoped"
done
chmod u+x /export/server/kafka_2.12-2.4.1/bin/stop-kafka.sh
  • 关闭,可能需要多等待一会,才看不到进程
  • 小结:实现Kafka启动与关闭

【模块五:Topic管理及生产消费测试】

知识点17:【掌握】Topic管理:创建与列举

  • 目标掌握Kafka集群中Topic的管理命令,实现创建Topic及列举Topic
  • 实施
    • Topic管理脚本
      image-20210330093455692-1642492986068.png
    • 查看用法
      image-20210530145454778-1642492986069.png
    • 创建Topic
kafka-topics.sh --create --topic bigdata01 --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
  • --create:创建
  • --list:列举
  • --topic:topic名称
  • --partitions:指定topic的分区数
  • --replication-factors:指定分区副本个数
  • --bootstrap-server:指定服务端地址
  • 列举Topic
kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
  • 小结:掌握Kafka集群中Topic的管理命令,实现创建Topic及列举Topic

知识点18:【掌握】Topic管理:查看与删除

  • 目标掌握Kafka集群中Topic查看与删除
  • 实施
    • 查看Topic信息
kafka-topics.sh --describe --topic bigdata01  --bootstrap-server node1:9092,node2:9092,node3:9092
Topic: bigdata01        PartitionCount: 3       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: bigdata01        Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: bigdata01        Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: bigdata01        Partition: 2    Leader: 0       Replicas: 0     Isr: 0
  - Kafka中唯一决定一个分区:Topic+Partition
  - Topic:Topic的名称
  - Partition:分区的编号,从0开始
  - Replicas:显示的是这个分区的所有副本所在的Broker节点的ID
  - Leader: 显示的是这个分区的Leader
  - Lsr:
  • 删除Topic
kafka-topics.sh --create --topic bigdata02 --partitions 2  --replication-factor 3 --bootstrap-server node1:9092,node2:9092,node3:9092 

kafka-topics.sh --delete --topic bigdata02  --bootstrap-server node1:9092,node2:9092,node3:9092
image-20210922104436800.png
image-20210922104436800.png
  • 小结:掌握Topic查看与删除

知识点19:【实现】生产者及消费者测试

  • 目标实现命令行如何模拟测试生产者和消费者
  • 实施
    • 命令行提供的脚本
      image-20210330101943181-1642492986069.png
    • Console生产者
kafka-console-producer.sh --topic bigdata01 --broker-list node1:9092,node2:9092,node3:9092
  • Console消费者
kafka-console-consumer.sh --topic bigdata01 --bootstrap-server node1:9092,node2:9092,node3:9092
  • --from-beginning:从每个分区的最初开始消费,默认从最新的offset进行消费
  • 默认从最新位置开始消费
image-20210330102756214-1642492986069.png
image-20210330102756214-1642492986069.png
  • 小结:只要生产者不断生产,消费就能实时的消费到Topic中的数据

【模块六:生产者及消费者Java API】

知识点20:【掌握】生产者API:生产数据到Kafka

  • 目标掌握如何将数据写入Kafka中
  • 实施
    • 使用方式
      • 命令行:一般只用于topic的管理:创建、删除
      • Java API:Spark、Flink构建生产者和消费者
    • 生产流程
    • 生产代码
package bigdata.itcast.cn.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * @ClassName KafkaProducerClient
 * @Description TODO 用于构建Kafka生产者,生产数据到Kafka中
 * @Create By     Frank
 */
public class KafkaProducerClient {
    public static void main(String[] args) {
        //todo:1-构建连接
        //构建配置对象,类似于Hadoop中的Configuration对象
        Properties props = new Properties();
        //指定服务端地址
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        /**
         *  Kafka怎么保证数据存储安全?--分区副本机制
         *  Kafka怎么保证数据传输安全?
         *              消费数据时不丢失和不重复? --消费者按照offset进行顺序消费
         *              生产数据时不丢失? -- ACK应答机制和重试机制
         *  acks = 0/1/all
         *  0:生产者发送一条数据给Kafka,不管kafka有没有写入这条数据,直接发送下一条【快,数据丢失的概率大】
         *  1:生产者发送一条数据给Kafka,等待Kafka将这条数据写入这个分区的Leader副本以后,就返回ack,生产者收到ack就发送下一条【安全和性能之间】
         *  all[-1]:生产者发送一条数据给Kafka,等待Kafka将这条数据写入这个分区的所有可用副本中,再返回ack,生产者收到ack就发送下一条【安全性最高,慢】
         *  工作中选择:1或者all
         *      注重安全性:all
         *      注重性能:1
         *  如果没有收到ack,生产者会根据重试机制,重新发送这条数据:retries = 3
         */
        props.put("acks", "all");
        // 指定写入的KV序列化的类型
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //构建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);


        //todo:2-执行操作
        for (int i = 0; i < 10; i++){
            //构建生产的数据对象
            //方式一:指定Key和Value,Topic+Key+Value
//            ProducerRecord<String, String> record1 = new ProducerRecord<>("bigdata01", i+"", "itcast"+i);
//            producer.send(record1); //调用生产的方法写入数据
            //方式二:不指定Key,Topic+Value:将数据写入了同一个分区中
//            ProducerRecord<String, String> record2 = new ProducerRecord<>("bigdata01", "itcast"+i);
//            producer.send(record2); //调用生产的方法写入数据
            //方式三:指定写入某个分区,Topic+Partition+Key+Value:指定写入某个分区
            ProducerRecord<String, String> record3 = new ProducerRecord<>("bigdata01", 0,i+"", "itcast"+i);
            producer.send(record3); //调用生产的方法写入数据
        }

        //todo:3-关闭资源,注意:这一步在实时中是没有,实时是7*24运行,不停的
        producer.close();
    }
}
  • 小结:掌握如何将数据写入Kafka中

知识点21:【掌握】消费者API:消费Topic数据

  • 目标掌握如何从Kafka中消费数据
  • 实施
    • 消费流程
    • 消费代码
package bigdata.itcast.cn.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * @ClassName KafkaConsumerClient
 * @Description TODO 用于测试消费者消费数据
 * @Create By     Frank
 */
public class KafkaConsumerClient {
    public static void main(String[] args) {
        //todo:1-构建连接
        //构建配置对象
        Properties props = new Properties();
        //指定服务端地址
        props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        //指定当前消费者属于哪个消费者组
        props.setProperty("group.id", "test01");
        //开启自动提交
        props.setProperty("enable.auto.commit", "true");
        //自动提交的时间间隔
        props.setProperty("auto.commit.interval.ms", "1000");
        //指定KV读取反序列化的类型
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //构建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        //todo:2-处理数据:先订阅Topic,再消费数据,最后处理数据
        //step1:先订阅Topic:一个消费者可以订阅多个Topic
        consumer.subscribe(Arrays.asList("bigdata01"));
        //源源不断的消费和处理数据
        while (true) {
            //step2:消费拉取数据,每次拉取到的所有数据翻入ConsumerRecords集合中
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            //step3:处理每条数据:ConsumerRecord存储消费到的一条数据
            for (ConsumerRecord<String, String> record : records){
                //输出每条数据中的信息
                String topic = record.topic();//当前这条数据的topic
                int part = record.partition();//来自于这个topic的哪个分区
                long offset = record.offset();//在这个分区中的offset
                //获取这条数据中的Keyvalue
                String key = record.key();
                String value = record.value();
                System.out.println(topic+"\t"+part+"\t"+offset+"\t"+key+"\t"+value);
            }
        }
        //todo:3-消费者是源源不断的消费的,不停的,没有关闭的过程
    }
}
  • 小结:掌握如何从Kafka中消费数据

知识点22:【掌握】生产分区规则

  • 目标掌握Kafka生产者生产数据的分区规则
  • 实施
    • 面试题:Kafka生产者怎么实现生产数据的负载均衡?
    • 需求:生产数据的时候尽量保证相对均衡的分到Topic多个分区中
    • 问题:为什么生产数据的方式不同,分区的规则就不一样?
- ProducerRecordTopicValue//将所有数据写入某一个分区
- ProducerRecordTopicKeyValue//按照Key的Hash取余方式
- ProducerRecordTopicPartitionKeyValue//指定写入某个分区
  • 解答
    • 1:先判断是否指定了某个分区,如果指定了分区,就写入指定的分区,如果没有执行step2
    • 2:再判断是否自定义分区器,如果有,就调用自定义分区器。如果没有就调用默认分区器DefaultPartitioner,进入step3
    • 3:先判断是否指定了Key,如果指定了Key,就按照Key的MUR值取模分区个数,决定分区,没有指定Key,就执行step4
    • 4:执行StickyPartition黏性分区,从缓存中获取上一次的分区编号,如果有就直接返回,如果没有就随机选择一个分区编号
先判断有没有指定分区,指定了就写入指定的分区
没有指定就判断有没有自定义分区,如果有,就调用自定义的分区
没有自定义就判断有没有指定Key,如果有,按照Key的mur取模分区个数
没有就使用黏性分区,随机选择一个分区,将这一批次所有数据写入这个分区
  - 按照Key的MUR取模分区:可能导致数据热点
  - 黏性分区:相对数据更加均衡,一般为了保证均衡性,数据都存储在Value,不用指定Key
  • 小结
    • 掌握Kafka生产者生产数据的分区规则

知识点23:【了解】自定义开发生产分区器

  • 目标了解Kafka自定义开发生产分区器,以随机分区为例
  • 实施
    • 开发一个随机分区器
package bigdata.itcast.cn.kafka.userpart;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;
import java.util.Random;

/**
 * @ClassName UserPartition
 * @Description TODO 用户自定义分区器,随机分区
 * @Create By     Frank
 */
public class UserPartition implements Partitioner {
    //计算当前这条数据的分区,返回对应的分区编号
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //或者这个Topic的分区个数
        Integer count = cluster.partitionCountForTopic(topic);
        //构建随机值
        Random random = new Random();
        int i = random.nextInt(count);
        //返回一个随机值
        return i;
    }

    @Override
    public void close() {
        //释放资源
    }

    @Override
    public void configure(Map<String, ?> configs) {
        //获取配置
    }
}
  • 加载分区器
  //指定分区器
 props.put("partitioner.class","bigdata.itcast.cn.kafka.userpart.UserPartition");
  • 结果
    image-20210701103429865.png
  • 小结:了解Kafka自定义开发生产分区器

附录一:可视化工具Kafka Tool

  • 目标了解Windows版可视化工具Kafka Tool的使用
  • 实施
    • 安装Kafka Tool:不断下一步即可
      image-20210329163448255-1642492986069.png
      image-20210329163613726-1642492986069.png
    • 构建集群连接:连接Kafka集群
      image-20210329163535693-1642492986069.png
      image-20210329163710836-1642492986069.png
    • 查看集群信息
      image-20210330104345929-1642492986069.png
  • 小结:可视化工具

附录二:Kafka集群压力测试

  • 目标了解如何实现Kafka集群的吞吐量及压力测试
  • 实施
    • 创建Topic
kafka-topics.sh --create --topic bigdata --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
  • 生产测试
kafka-producer-perf-test.sh --topic bigdata --num-records 1000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1
  - --num-records:写入数据的条数
  - --throughput:是否做限制,-1表示不限制
  - --record-size:每条数据的字节大小
image-20210330104918488-1642492986069.png
image-20210330104918488-1642492986069.png
  • 消费测试
kafka-consumer-perf-test.sh --topic bigdata --broker-list node1:9092,node2:9092,node3:9092  --fetch-size 1048576 --messages 1000000
image-20210330105146154-1642492986069.png
image-20210330105146154-1642492986069.png
  • 小结:工作中一般根据实际的需求来调整参数,测试kafka集群的最高性能,判断是否能满足需求

附录三:Kafka Maven依赖

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
</repositories>
<dependencies>
    <!-- Kafka的依赖 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
    </dependency>
</dependencies>
上次编辑于:
贡献者: 麦正阳