编程知识 cdmana.com

Kafka入门到精通

原文链接:https://blog.csdn.net/qq_43323585/article/details/105824989

Kafka概述介绍

kafka是什么

Kafka是Apache开源的流处理平台,该平台提供了消息的订阅与发布。具有高吞吐、简单、易部署等特点。

Kafka干什么

  1. 消息队列:用于系统解耦、异步通信、流量填谷等。
  2. Kaka Streaming:实时在线流处理。

消息队列工作模式

消息队列的两种工作模式 :1.最多一次 2.没有限制。如图:
在这里插入图片描述
那么Kafka是怎样的工作模式?继续

Kafka架构和概念

一些概念还是要知道的,编码会用到并且可以帮助理解!最后我会附上我的git地址,我想关于kafka的业务用我的代码库基本都可以搞定了

Kafka消息的生产和消费

Kafka以Topic形式管理分类集群中的消息(Record)。每个Record属于一个Topic并且每个Topic存在多个分区(partition)存放Record,每个分区对应一个服务器(Broker),这个Broker被称为leader,分区副本对应的Broker成为follower。**需要注意的是只有leader可以读写。**没错,我们很容易就想到zookeeper,那个号称目前为止唯一的分布式一致性算法Paxos,这也为Kafka的数据可靠性提供了保证。可能有点抽象,看图:
在这里插入图片描述在这里插入图片描述

分区和日志

日志就是log,其实就是数据,有点像redis里的log。并不是我们说的打印日志,呵呵
3. 每个Topic可以被多个消费者订阅,Kafka通过Topic管理partiton。
4. 消费者可以通过轮询,负载(对Record的key取模)的方式将Record存入partition中。
5. partition是一个有序不可变的日志序列,每个Record有唯一的offset,用于记录消费情况、为持久化策略提供支撑。
6. kafka默认配置log.retention.hours=168,不管消息有没有被消费Record都可以保存168小时–硬盘保存,具体持久化策略可以通过配置文件定制。
7. partiton内部有序,在多个分区情况下,不要指望先生产先消费。写业务和编码要注意
为什么Kafka要放弃有序,采用分区局部有序性?
就像hadoop的一样,分布式集群,打破物理局限性,不管是性能容量并发量都得到了质的提升,一台搞不定就一百台。毕竟Kafka是大数据框架。

消费组

概念:是一个逻辑的消费者,有多个消费者实例组成。如果4个消费者同时订阅topic1(4个分区),那么一个分区要被消费4次。引入消费组就可以避免重复消费。
编码:消费者可以采用subscribe和assign,采用subscribe订阅topic必须指定消费组!
在这里插入图片描述

Kafka高性能之道

为什么Kafka吞吐量如此之高

Kafka可以轻松支持百万级的写入请求,而且数据会持久化到硬盘,恐怖吗。其实现在想想,一个高性能的技术都是对内核的一个封装,比如Redis底层调用的epoll(),最厉害的还是老大哥OS。

顺序写、mmap

顺序写:硬盘是一个机械结构,寻址是极其耗时的机械动作,所以Kafka用了顺序IO,避免了大量的内存开销和IO寻址的时间。
mmap:Memory Mapped Files内存映射文件,它的工作原理是利用操作系统的PageCache实现文件到物理内存的直接映射,直接写入到了pagecache中,用户对内存的所有操作会被操作系统自动刷新到磁盘上,极大的降低了IO使用率。mmap相当于一个用户态可以直接访问内核态的共享空间,避免了用户态向内核态的切换和copy。
在这里插入图片描述

ZeroCopy

Kafka服务器在响应客户端读取的时候,底层使用ZeroCopy技术,直接将磁盘无需拷贝到用户空间,而是直接将数据通过内核空间传输出去,数据并没有抵达用户空间。
IO模型在这里不赘述了,以后再写吧。
在这里插入图片描述

搭建Kafka集群

请看我写的zookeeper和kafka搭建两篇文章,参考:Kafka集群搭建

Topic管理

  • bootstrap-server:消费者,拉取数据,老版本用-zookeeper这个参数。并且好多数据放到zk中了,这个很不合理。
  • broker-list:生产者,推送数据
  • partitions:分区个数
  • replication-factor : 分区副本因子

创建topic

bin/kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --create --partitions 3 --replication-factor 2 --topic debug
  • 1

查看主题列表

bin/kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --list
  • 1

查看主题详情

bin/kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --describe --topic debug
  • 1

删除topic

bin/kafka-topics.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --delete --topic debug
  • 1

生产某个主题的消息

bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic debug
  • 1

消费某个主题的消息

bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic debug --from-beginning
  • 1
bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic debug --group group1
  • 1

这几个命令还是很有用的!可以辅助我们的测试。

查看消费组

bin/kafka-consumer-groups.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --list
  • 1

Kafka API

资料来源于工具书的实例,非常全面,已经上传到GIt,现在还没来得及整理,不过以后会整理的,如果觉得不错给个星星哦!包括:生产消费,自定义分区,序列化,拦截器,事务,kafka流处理等。
Git地址:Kafka API

Kafka特性

acks、retries机制

acks应答机制:

  1. acks=1,leader写成功,不等待follower确认返回。无法保证单点故障
  2. acks=0,发送给套接字缓存,确认返回。消息安全系数最低
  3. acks=-1,leader至少一个follower应答后再确认返回。高可用集群
    retries重试机制:
    request.timeout.ms=3000默认超时重试。
    retries=2147483647xxx重试次数。
    Kafka消息语义:消息能至少一次保存

幂等写

幂等性:多次请求的结果和一次请求的结果一致。
Kafka幂等写解决方案:
在这里插入图片描述
对于一些业务上的幂等问题是可以借鉴一下的。要真正解决幂等:{黑名单、签名、token}
需要注意的是
enable.idempotence=false默认关闭幂等
前提条件:retries=true;acks=all

事务

因为 Kafka幂等写不提供跨多个 Partition 和跨会话场景下的保证,因此,我们是需要一种更强的事务保证,能够原子处理多个 Partition 的写入操作,数据要么全部写入成功,要么全部失败,这就是 Kafka Transactions,即 Kafka 事务。
producer提供了initTransactions,beginTransaction,sendOffsetsToTransaction,commitTransaction,abortTransaction 五个事务方法。
consumer提供了read_committed和read_uncommitted。

KafkaEagle监控软件

开源地址:https://github.com/smartloli/kafka-eagle
包括我一开始学习Kafka的书也是他写的。

安装过程

  1. 下载kafka-eagle-bin-1.4.0.tar.gz并解压
  2. mv kafka-eagle-bin-1.4.0 /opt/
  3. 解压之后里面还有个压缩包,再次解压
  4. 修改环境变量
vim /etc/profile
------------------------
# kafka-eagle
export KE_HOME=/opt/kafka-eagle
# PATH
export PATH=$PATH:$JAVA_HOME/bin:$ZOOKEEPER_HOME/bin:$KE_HOME/bin
------------------------
source /etc/profile 
echo $PATH
OK~
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  1. 修改kafka-eagle配置
[root@node01 conf]# vim system-config.properties
-----------------------------------------------------
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=node01:2181,node02:2181,node03:2181
#cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
cluster1.kafka.eagle.offset.storage=kafka
#cluster2.kafka.eagle.offset.storage=zk
kafka.eagle.metrics.charts=true
cluster1.kafka.eagle.sasl.enable=false
cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256
cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.kafka.eagle.sasl.client.id=

#cluster2.kafka.eagle.sasl.enable=false
#cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
#cluster2.kafka.eagle.sasl.mechanism=PLAIN
#cluster2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle";
#cluster2.kafka.eagle.sasl.client.id=
######################################
# kafka sqlite jdbc driver address
######################################
#kafka.eagle.driver=org.sqlite.JDBC
#kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
#kafka.eagle.username=root
#kafka.eagle.password=www.kafka-eagle.org

######################################
# kafka mysql jdbc driver address
######################################
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  1. 修改kafka启动配置,开启JMS
vim kafka-server-start.sh
-------------------------------------
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
 export JMX_PORT="7379"
 export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. 最后启动,发现没有启动权限
chmod u+x ke.sh
./ke.sh
  • 1
  • 2
Version 1.4.0
*******************************************************************
* Kafka Eagle Service has started success.
* Welcome, Now you can visit 'http://192.168.83.11:8048/ke'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

总结

到此为止,3台zk+3台kafka+kafka-eagle监控其实可以做很多事了。比如:日志收集、消息系统、活动追踪、运营指标、流式处理等。希望能给看到的人些许帮助!
辅助文档:
kafka集群:链接
zk集群:链接

版权声明
本文为[枫树湾河桥]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/fswhq/p/13853519.html

Scroll to Top