编程知识 cdmana.com

Kafka learning notes

img

1 Kafka summary

1.1 Message queue

image-20201111212446563

(1) Point to point mode ( one-on-one , Consumers take the initiative to pull data , Message is cleared when received )

The point-to-point model is usually a message delivery model based on pull or polling , This model requests information from a queue , Instead of pushing the message to the client . The feature of this model is that messages sent to the queue are received by one and only one receiver , This is true even with multiple message listeners .

(2) Release / A subscription model ( One to many , After data production , Push to all subscribers )

The publish subscribe model is a push based messaging model . The publish-to-subscribe model can have many different subscribers , The temporary subscriber receives messages only when the topic is actively listened on , The persistent subscriber listens for all messages on the topic , Even if the current subscriber is unavailable , Off line .

 

1.2 Why message queues

1) decoupling :

   Allows you to independently extend or modify both sides of the process , Just make sure they adhere to the same interface constraints .

2) redundancy :

Message queues persist data until they are fully processed , In this way, the risk of data loss is avoided . Used by many message queues " Insert - obtain - Delete " In the paradigm , Before deleting a message from the queue , Requires that your processing system explicitly indicate that the message has been processed , This ensures that your data is stored safely until you have finished using it .

3) Extensibility :

Because message queues decouple your processing , So it is easy to increase the frequency of message enqueue and processing , Just add another process .

4) flexibility & Peak processing capacity :

In the case of a dramatic increase in traffic , Applications still need to work , But such bursts of traffic are rare . It would be a huge waste of resources to be on call in order to be able to handle this kind of peak access . Using message queues enables critical components to withstand sudden access pressures , It won't crash completely because of a sudden overload of requests .

5) Recoverability :

When a part of the system fails , It doesn't affect the whole system . Message queues reduce the degree of coupling between processes , So even if a process that processes the message fails , Messages enqueued can still be processed after the system recovers .

6) Sequence assurance :

In most usage scenarios , The order in which the data is processed is important . Most message queues are sorted anyway , And it guarantees that the data will be processed in a particular order .(Kafka To ensure a Partition The orderliness of messages within )

7) buffer :

Helps control and optimize the speed at which data flows through the system , Resolve situations where the processing speed of production and consumption messages is inconsistent .

8) asynchronous communication :

A lot of times , The user does not want or need to process the message immediately . Message queues provide an asynchronous processing mechanism , Allows the user to queue a message , But not immediately . Put as many messages as you want into the queue , Then deal with them as needed .

 

1.3 What is? Kafka

In flow calculation ,Kafka Generally used to cache data ,Storm Through consumption Kafka The data of .

1)Apache Kafka It's open source news System , from Scala It's written in . By Apache An open source messaging system project developed by the software foundation .

2)Kafka It was originally made by LinkedIn Companies to develop , And in 2011 Open source at the beginning of the year .2012 year 10 Month from Apache Incubator graduation . The goal of the project is to provide a unified 、 High throughput 、 Low waiting platform .

3)Kafka Is a distributed message queue .**Kafka Save the message according to Topic categorize , The sender of the message is called Producer, The message receiver is called Consumer, Besides kafka There are multiple clusters kafka Examples consist of , Each instance (server) be called broker.

4) Whether it's kafka colony , still consumer Rely on a zookeeper The cluster holds some meta Information , To ensure system availability .

 

1.4 Kafka framework

Kafka Overall architecture :

image-20201111212641669

Kafka Detailed architecture :

image-20201111212749115

1)Producer : Message producer , That is to say kafka broker The sending client ;

2)Consumer : Message consumer , towards kafka broker The client to fetch the message ;

3)Topic : You can think of it as a queue ;

4) Consumer Group (CG): This is a kafka To implement a topic The broadcast of news ( To all consumer) And unicast ( To any one consumer) The means of . One topic There can be multiple CG.topic The message will be copied ( It's not a real copy , It's conceptual ) To all the CG, But every partion Only messages will be sent to that CG One of them consumer. If you need to implement broadcast , As long as each consumer There's a separate one CG That's all right. . All you need to implement unicast is all you need consumer In the same CG. use CG Can also be consumer Free grouping without sending multiple messages to different topic;

5)Broker : a kafka The server is one broker. A cluster consists of many broker form . One broker Can hold multiple topic;

6)Partition: For scalability , A very big one topic It can be distributed to more than one broker( The server ) On , One topic Can be divided into multiple partition, Every partition It's an ordered queue .partition Each message is assigned an ordered one id(offset).kafka I'm only guaranteed to press one partition The order in which messages are sent consumer, Not a guarantee topic The whole of ( Multiple partition between ) The order of ;

7)Offset:kafka The stored files are all according to offset.kafka Named after the , use offset The advantage of making a name is that it's easy to find . Let's say you want to find a location 2049 The location of , Just find 2048.kafka Is ok . Of course the first offset Namely 00000000000.kafka.

 

2 Kafka Cluster deployment

2.1 Environmental preparation

2.1.1 Cluster planning

hadoop102 hadoop103 hadoop104
zk zk zk
kafka kafka kafka

 

2.1.2 jar Package download

http://kafka.apache.org/downloads.html

image-20201111215853577

The current stable version is 2.6, This semester's course is based on kafka0.11.0.0

2.2 Kafka Cluster deployment

1) Unzip the installation package

[atguigu@hadoop102 software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/

2) Change the unzipped file name

[atguigu@hadoop102 module]$ mv kafka_2.11-0.11.0.0/ kafka

3) stay /opt/module/kafka Create under directory logs Folder

[atguigu@hadoop102 kafka]$ mkdir logs

4) Modify the configuration file

[atguigu@hadoop102 kafka]$ cd config/  [atguigu@hadoop102 config]$ vi server.properties

Enter the following :

#broker The global unique number of , Can't repeat   broker.id=0  # Delete topic Function enables   delete.topic.enable=true  # The number of threads handling network requests   num.network.threads=3  # For processing disks IO On the shelf quantity of   num.io.threads=8  # The buffer size of the sent socket   socket.send.buffer.bytes=102400  # The buffer size of the receiving socket   socket.receive.buffer.bytes=102400  # The buffer size of the request socket   socket.request.max.bytes=104857600  #kafka Run the path where the log is stored       log.dirs=/opt/module/kafka/logs  #topic At present broker The number of partitions on   num.partitions=1  # For recovery and cleanup data Number of threads under the data   num.recovery.threads.per.data.dir=1  #segment The maximum length of time a document can be retained , The timeout will be deleted   log.retention.hours=168  # configure connections Zookeeper The cluster address   zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

5) Configure environment variables

[atguigu@hadoop102 module]$ sudo vi /etc/profile   #KAFKA_HOME  export KAFKA_HOME=/opt/module/kafka  export PATH=$PATH:$KAFKA_HOME/bin

 

[atguigu@hadoop102 module]$ source /etc/profile

6) Distribute installation package

[atguigu@hadoop102 module]$ xsync kafka/

Be careful : Remember to configure the environment variables for other machines after distribution

7) Respectively in hadoop103 and hadoop104 Modify the configuration file above /opt/module/kafka/config/server.properties Medium broker.id=1、broker.id=2

notes :broker.id Don't repeat

8) Start cluster

In turn, it's hadoop102、hadoop103、hadoop104 Start on the node kafka

[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties &  [atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties &  [atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh config/server.properties &

9) Shut down the cluster

[atguigu@hadoop102 kafka]$ bin/kafka-server-stop.sh stop  [atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh stop  [atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh stop

 

2.3 Kafka Command line operations

1) View all in the current server topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list

2) establish topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \  --create --replication-factor 3 --partitions 1 --topic first

Option description :

--topic Definition topic name

--replication-factor Define the number of copies

--partitions Define the number of partitions

3) Delete topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \  --delete --topic first

need server.properties Set in delete.topic.enable=true Otherwise, it's just a mark to delete or simply restart .

4) Send a message

[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh \  --broker-list hadoop102:9092 --topic first  \>hello world  \>atguigu atguigu

 

5) News consumption

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh \  --zookeeper hadoop102:2181 --from-beginning --topic first

--from-beginning: Will be able to first All the previous data in the topic is read . Choose whether to add this configuration based on the business scenario .

6) View a certain Topic Details of

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 \  --describe --topic first

 

3 Kafka Workflow analysis

image-20201111220845169

3.1 Kafka Production process analysis

3.1.1 How to write

producer Push (push) The pattern publishes the message to broker, Each message is appended (append) To the partition (patition) in , It belongs to sequential write disk ( Sequential writes to disk are more efficient than random writes to memory , guarantee kafka Throughput rate ).

3.1.2 Partition (Partition)

Messages are sent to one when they are sent topic, It is essentially a directory , and topic It's made up of some Partition Logs( Partition log ) form , Its organizational structure is shown in the following figure :

image-20201111220932251

 

image-20201111220943017

We can see , Every Partition It's all about Orderly Of , The production message is continually appended to Partition log On , Each of these messages is assigned a unique offset value .

1) Reasons for zoning

(1) Easy to scale in a cluster , Every Partition It can be adjusted to fit the machine it is on , And one topic You can have multiple Partition form , Therefore, the entire cluster can accommodate data of any size ;

(2) Can improve concurrency , Because you can use Partition Is read and written in units .

2) Zoning principle

(1) It specifies patition, Direct use ;

(2) Not specified patition But specify key, Through to key Of value Conduct hash a patition;

(3)patition and key None of them have been specified , Use polling to pick one patition.

 

3.1.3 copy (Replication)

The same partition There may be more than one replication( Corresponding server.properties The configuration of the default.replication.factor=N). No, replication Under the circumstances , once broker Downtime , All of it patition Is not consumable , meanwhile producer You can no longer store data on it patition. introduce replication after , The same partition There may be more than one replication, And then you need to have these replication Pick one between leader,producer and consumer Only with this leader Interaction , Other replication As follower from leader Intermediate replication data .

 

3.1.4 Write process

producer Write the message flow as follows :

image-20201111221251487

1)producer First from zookeeper Of "/brokers/.../state" The node finds the partition Of leader

2)producer Send the message to this leader

3)leader Write the message locally log

4)followers from leader pull news , Write local log Back leader send out ACK

5)leader Received all ISR Medium replication Of ACK after , increase HW(high watermark, Last commit Of offset) And to producer send out ACK

 

3.2 Broker Save the message

3.2.1 storage

Physically, put topic Into one or more patition( Corresponding server.properties Medium num.partitions=3 To configure ), Every patition Physically, it corresponds to a folder ( This folder stores this patition All message and index files ), as follows :

[atguigu@hadoop102 logs]$ ll drwxrwxr-x. 2 atguigu atguigu  4096 8 month    6 14:37 first-0 drwxrwxr-x. 2 atguigu atguigu  4096 8 month    6 14:35 first-1 drwxrwxr-x. 2 atguigu atguigu  4096 8 month    6 14:37 first-2 [atguigu@hadoop102 logs]$ cd first-0 [atguigu@hadoop102 first-0]$ ll -rw-rw-r--. 1 atguigu atguigu 10485760 8 month    6 14:33 00000000000000000000.index -rw-rw-r--. 1 atguigu atguigu      219 8 month    6 15:07 00000000000000000000.log -rw-rw-r--. 1 atguigu atguigu 10485756 8 month    6 14:33 00000000000000000000.timeindex -rw-rw-r--. 1 atguigu atguigu        8 8 month    6 14:37 leader-epoch-checkpoint 

3.2.2 Storage strategy

Whether the message is consumed or not ,kafka All messages are retained . There are two strategies for deleting old data :

1) Based on time :log.retention.hours=168

2) Based on size :log.retention.bytes=1073741824

It should be noted that , because Kafka The time complexity of reading a particular message is O(1), That is, regardless of file size , So delete the expired file here with the enhancement Kafka Performance independent .

 

3.2.3 Zookeeper Storage structure

image-20201111221534351

Be careful :producer be not in zk Register in , Consumers are zk Register in .

 

3.3 Kafka Consumption process analysis

kafka Two sets of consumer API: senior Consumer API And low level Consumer API.

3.3.1 senior API

1) senior API advantage

senior API It's easy to write

You don't have to manage it yourself offset, The system goes through zookeeper Manage it yourself .

There is no need to manage partitions , Copy etc. ,. Automatic system management .

Consumer disconnection will be automatically based on the last recorded in zookeeper Medium offset Go get the data ( default setting 1 Update in minutes zookeeper In the middle of the offset)

have access to group To distinguish between pairs topic Separate access to different programs ( Different group Record different offset, So different programs read the same topic Not because offset Interact with each other )

2) senior API shortcoming

Out of control offset( For some special needs )

Cannot refine controls such as partitions 、 copy 、zk etc.

 

3.3.2 low-level API

1) low-level API advantage

To give the developer control offset, Read from wherever you want .

Self - control connection partition , Load balancing for partition customization

Yes zookeeper Decreased dependence of ( Such as :offset It doesn't have to be zk Storage , Store it yourself offset that will do , Like in a file or in memory )

2) low-level API shortcoming

It's too complicated , It needs to be self controlled offset, Which partition to connect to , Find the partition leader etc. .

 

3.3.3 Consumer group

image-20201111231304674

Consumers use consumer group The way consumer groups work , A group composed of one or more consumers , Common consumption topic. Each partition at the same time can only be by group Is read by a consumer , But many group You can consume this at the same time partition. In the picture , There's one that's made up of three consumers group, There is a consumer that reads two partitions in a topic , The other two each read a partition . A consumer reads a partition , It can also be said that a consumer is the owner of a partition .

under these circumstances , Consumers can read a large number of messages at the same time through horizontal expansion . in addition , If a consumer fails , So the others group Members automatically load balance the partitions read by previously failed consumers .

 

3.3.4 The way of consumption

consumer use pull( PULL ) The pattern is from broker Read data from .

push( PUSH ) Patterns are difficult to adapt to consumers with different consumption rates , Because the message sending rate is by broker Decisive . Its goal is to deliver messages as quickly as possible , But that's easy to do consumer Too late to process messages , Typical manifestations are denial of service and network congestion . and pull Patterns can be based on consumer Consuming power to consume messages at an appropriate rate .

about Kafka for ,pull The pattern is more appropriate , It can simplify broker The design of the ,consumer You can autonomously control the rate of consuming messages , meanwhile consumer You can control your spending patterns —— Can batch consumption can also be consumption item by item , At the same time, you can choose different delivery methods to achieve different transport semantics .

pull Here's where the model falls short , If kafka No data , Consumers can get caught in a loop , Waiting for the data to arrive . To avoid that , We have parameters in our pull request , Allow consumer requests while waiting for data to arrive “ Long polling ” Intermediate blocking ( And optionally wait up to a given number of bytes , To ensure a large transfer size ).

 

3.3.5 Consumer group case

1) demand : Test consumers in the same consumer group , Only one consumer can consume at a time .

2) Case practice

(1) stay hadoop102、hadoop103 Modify the /opt/module/kafka/config/consumer.properties In the configuration file group.id Property to any group name .

[atguigu@hadoop103 config]$ vi consumer.properties group.id=atguigu

(2) stay hadoop102、hadoop103 The above start consumers respectively

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \ --zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties [atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties

(3) stay hadoop104 Up-start producer

[atguigu@hadoop104 kafka]$ bin/kafka-console-producer.sh \ --broker-list hadoop102:9092 --topic first \>hello world

(4) see hadoop102 and hadoop103 The receiver of .

Only one consumer receives the message at a time .

 

4 Kafka API actual combat

4.1 Environmental preparation

1) start-up zk and kafka colony , stay kafka Open a consumer in the cluster

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \ --zookeeper hadoop102:2181 --topic first

2) Import pom rely on

<dependencies>    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->    <dependency>      <groupId>org.apache.kafka</groupId>      <artifactId>kafka-clients</artifactId>      <version>0.11.0.0</version>    </dependency>    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->    <dependency>      <groupId>org.apache.kafka</groupId>      <artifactId>kafka_2.12</artifactId>      <version>0.11.0.0</version>    </dependency>  </dependencies>

 

4.2 Kafka producer Java API

4.2.1 Create producer ( Out of date API)

package com.atguigu.kafka;  import java.util.Properties;  import kafka.javaapi.producer.Producer;  import kafka.producer.KeyedMessage;  import kafka.producer.ProducerConfig;   public class OldProducer {     @SuppressWarnings("deprecation")    public static void main(String[] args) {           Properties properties = new Properties();      properties.put("metadata.broker.list", "hadoop102:9092");      properties.put("request.required.acks", "1");      properties.put("serializer.class", "kafka.serializer.StringEncoder");           Producer<Integer, String> producer = new Producer<Integer,String>(new ProducerConfig(properties));           KeyedMessage<Integer, String> message = new KeyedMessage<Integer, String>("first", "hello world");      producer.send(message );   }  }

 

4.2.2 Create producer ( new API)

package com.atguigu.kafka; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class NewProducer { public static void main(String[] args) { Properties props = new Properties(); // Kafka The hostname and port number of the server  props.put("bootstrap.servers", "hadoop103:9092"); //  Wait for replies from all replica nodes  props.put("acks", "all"); //  Maximum number of attempts to send a message  props.put("retries", 0); //  Batch message processing size  props.put("batch.size", 16384); //  Request delay  props.put("linger.ms", 1); //  Send cache memory size  props.put("buffer.memory", 33554432); // key serialize  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value serialize  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 50; i++) { producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i)); } producer.close(); } }

 

4.2.3 Create a producer callback function ( new API)

package com.atguigu.kafka; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class CallBackProducer { public static void main(String[] args) { Properties props = new Properties(); // Kafka The hostname and port number of the server  props.put("bootstrap.servers", "hadoop103:9092"); //  Wait for replies from all replica nodes  props.put("acks", "all"); //  Maximum number of attempts to send a message  props.put("retries", 0); //  Batch message processing size  props.put("batch.size", 16384); //  Increase server request latency  props.put("linger.ms", 1); //  Send cache memory size  props.put("buffer.memory", 33554432); // key serialize  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value serialize  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); for (int i = 0; i < 50; i++) { kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (metadata != null) { System.err.println(metadata.partition() + "---" + metadata.offset()); } } }); } kafkaProducer.close(); } }

 

4.2.4 Custom partition producer

0) demand : Store all data in topic Of the 0 On Division 1

1) Define a class implementation Partitioner Interface , Rewrite the methods inside ( obsolete API)

package com.atguigu.kafka; import java.util.Map; import kafka.producer.Partitioner; public class CustomPartitioner implements Partitioner { public CustomPartitioner() { super(); } @Override public int partition(Object key, int numPartitions) { //  Control partition  return 0; } }

2) Custom partition ( new API)

package com.atguigu.kafka; import java.util.Map; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; public class CustomPartitioner implements Partitioner { @Override public void configure(Map<String, ?> configs) { } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //  Control partition  return 0; } @Override public void close() { } }

3) Called in code

package com.atguigu.kafka; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class PartitionerProducer { public static void main(String[] args) { Properties props = new Properties(); // Kafka The hostname and port number of the server  props.put("bootstrap.servers", "hadoop103:9092"); //  Wait for replies from all replica nodes  props.put("acks", "all"); //  Maximum number of attempts to send a message  props.put("retries", 0); //  Batch message processing size  props.put("batch.size", 16384); //  Increase server request latency  props.put("linger.ms", 1); //  Send cache memory size  props.put("buffer.memory", 33554432); // key serialize  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value serialize  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //  Custom partition  props.put("partitioner.class", "com.atguigu.kafka.CustomPartitioner"); Producer<String, String> producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<String, String>("first", "1", "atguigu")); producer.close(); } }

4) test

(1) stay hadoop102 Upper monitoring /opt/module/kafka/logs/ Under the table of contents first The theme 3 Partitioned log Log dynamic changes

[atguigu@hadoop102 first-0]$ tail -f 00000000000000000000.log

[atguigu@hadoop102 first-1]$ tail -f 00000000000000000000.log

[atguigu@hadoop102 first-2]$ tail -f 00000000000000000000.log

(2) Found that all data is stored in the specified partition .

 

4.3 Kafka consumer Java API

4.3.1 senior API

0) Create the sender in the console

[atguigu@hadoop104 kafka]$ bin/kafka-console-producer.sh \ --broker-list hadoop102:9092 --topic first >hello world

1) Create consumer ( obsolete API)

package com.atguigu.kafka.consume; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class CustomConsumer { @SuppressWarnings("deprecation") public static void main(String[] args) { Properties properties = new Properties(); properties.put("zookeeper.connect", "hadoop102:2181"); properties.put("group.id", "g1"); properties.put("zookeeper.session.timeout.ms", "500"); properties.put("zookeeper.sync.time.ms", "250"); properties.put("auto.commit.interval.ms", "1000"); //  Create a consumer connector  ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); HashMap<String, Integer> topicCount = new HashMap<>(); topicCount.put("first", 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCount); KafkaStream<byte[], byte[]> stream = consumerMap.get("first").get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { System.out.println(new String(it.next().message())); } } }

2) Official case ( Automatic maintenance of consumption )( new API)

package com.atguigu.kafka.consume; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class CustomNewConsumer { public static void main(String[] args) { Properties props = new Properties(); //  Definition kakfa  Address of service , You don't need to put all of broker Specify  props.put("bootstrap.servers", "hadoop102:9092"); //  To develop consumer group props.put("group.id", "test"); //  Auto confirm or not offset props.put("enable.auto.commit", "true"); //  Automatic confirmation offset Time interval of  props.put("auto.commit.interval.ms", "1000"); // key The serialization class of  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value The serialization class of  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //  Definition consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //  Consumers subscribe to topic,  You can subscribe to more than one at the same time  consumer.subscribe(Arrays.asList("first", "second","third")); while (true) { //  Reading data , The read timeout is 100ms ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }

 

4.3.2 low-level API

Implementation uses low level API Read specified topic, Appoint partition, Appoint offset The data of .

1) Consumers use low-level API The main steps :

step Main work
1 Find the primary copy from the theme metadata according to the specified partition
2 Get the latest consumption progress of the partition
3 Pull message from primary copy for partition
4 Identify changes to the master replica , retry

2) Methods described :

findLeader() The client sends the subject metadata to the seed node , Add the replica set to the standby node
getLastOffset() Consumer client sends offset request , Get the nearest offset of the partition
run() Consumers are low-level AP I The main way to pull messages
findNewLeader() When the primary replica node of the partition fails , The customer will find a new master copy

3) Code :

package com.atguigu;  import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map;  import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.cluster.BrokerEndPoint; import kafka.common.ErrorMapping; import kafka.common.TopicAndPartition; import kafka.javaapi.FetchResponse; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; public class SimpleExample {   private List<String> m_replicaBrokers = new ArrayList<>();  public SimpleExample() {     m_replicaBrokers = new ArrayList<>();   }   public static void main(String args[]) {    SimpleExample example = new SimpleExample();    //  Maximum number of read messages     long maxReads = Long.parseLong("3");    //  To subscribe to topic    String topic = "test1";    //  Partition to find     int partition = Integer.parseInt("0");    // broker Node ip    List<String> seeds = new ArrayList<>();    seeds.add("192.168.9.102");    seeds.add("192.168.9.103");    seeds.add("192.168.9.104");    //  port     int port = Integer.parseInt("9092");     try {       example.run(maxReads, topic, partition, seeds, port);     } catch (Exception e) {      System.out.println("Oops:" + e);      e.printStackTrace();    }  }  public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {     //  Get specified Topic partition Metadata      PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);     if (metadata == null) {       System.out.println("Can't find metadata for Topic and Partition. Exiting");       return;     }     if (metadata.leader() == null) {       System.out.println("Can't find Leader for Topic and Partition. Exiting");       return;     }     String leadBroker = metadata.leader().host();     String clientName = "Client_" + a_topic + "_" + a_partition;      SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);     long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);     int numErrors = 0;     while (a_maxReads > 0) {       if (consumer == null) {         consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);       }       FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();       FetchResponse fetchResponse = consumer.fetch(req);        if (fetchResponse.hasError()) {         numErrors++;         // Something went wrong!         short code = fetchResponse.errorCode(a_topic, a_partition);         System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);         if (numErrors > 5)           break;         if (code == ErrorMapping.OffsetOutOfRangeCode()) {           // We asked for an invalid offset. For simple case ask for           // the last element to reset           readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);           continue;         }         consumer.close();         consumer = null;         leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);         continue;       }       numErrors = 0;        long numRead = 0;       for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {         long currentOffset = messageAndOffset.offset();         if (currentOffset < readOffset) {           System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);           continue;         }         readOffset = messageAndOffset.nextOffset();         ByteBuffer payload = messageAndOffset.message().payload();          byte[] bytes = new byte[payload.limit()];         payload.get(bytes);         System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));         numRead++;         a_maxReads--;       }        if (numRead == 0) {         try {           Thread.sleep(1000);         } catch (InterruptedException ie) {         }       }     }     if (consumer != null)       consumer.close();   }    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {     TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);     Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();     requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));     kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);     OffsetResponse response = consumer.getOffsetsBefore(request);      if (response.hasError()) {       System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));       return 0;     }     long[] offsets = response.offsets(topic, partition);     return offsets[0];   }     private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {     for (int i = 0; i < 3; i++) {       boolean goToSleep = false;       PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);       if (metadata == null) {         goToSleep = true;       } else if (metadata.leader() == null) {         goToSleep = true;       } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {         // first time through if the leader hasn't changed give         // ZooKeeper a second to recover         // second time, assume the broker did recover before failover,         // or it was a non-Broker issue         //         goToSleep = true;       } else {         return metadata.leader().host();       }       if (goToSleep) {             Thread.sleep(1000);       }     }     System.out.println("Unable to find new leader after Broker failure. Exiting");     throw new Exception("Unable to find new leader after Broker failure. Exiting");   }    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {     PartitionMetadata returnMetaData = null;     loop:     for (String seed : a_seedBrokers) {       SimpleConsumer consumer = null;       try {         consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");         List<String> topics = Collections.singletonList(a_topic);         TopicMetadataRequest req = new TopicMetadataRequest(topics);         kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);          List<TopicMetadata> metaData = resp.topicsMetadata();         for (TopicMetadata item : metaData) {           for (PartitionMetadata part : item.partitionsMetadata()) {             if (part.partitionId() == a_partition) {               returnMetaData = part;                 break loop;             }           }         }       } catch (Exception e) {         System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);       } finally {         if (consumer != null)           consumer.close();       }     }     if (returnMetaData != null) {       m_replicaBrokers.clear();       for (BrokerEndPoint replica : returnMetaData.replicas()) {         m_replicaBrokers.add(replica.host());       }     }     return returnMetaData;   }  }

 

5 Kafka producer Interceptor (interceptor)

5.1 Principle of interceptor

Producer Interceptor (interceptor) Is in Kafka 0.10 Version was introduced , Mainly used to realize clients Customized control logic at the end .

about producer for ,interceptor Enable users to send messages before and producer There is an opportunity to make some customized requirements for messages before callback logic , For example, modifying messages . meanwhile ,producer Allow users to specify multiple interceptor It acts on the same message in order to form an interception chain (interceptor chain).Intercetpor The implementation interface of is org.apache.kafka.clients.producer.ProducerInterceptor, The way to define it includes :

(1)configure(configs)

Called when getting configuration information and initialization data .

(2)onSend(ProducerRecord):

The method is encapsulated in KafkaProducer.send In the method , That is, it runs in the user main thread .Producer Ensure that the method is called before the message is serialized and the partition is calculated . The user can do anything with the message in this method , But it's best to make sure that you don't modify the topic And zoning , Otherwise, it will affect the calculation of the target partition

(3)onAcknowledgement(RecordMetadata, Exception):

This method is called when the message is answered or the message fails to be sent , And it's usually in producer Before the callback logic triggers .onAcknowledgement Running on the producer Of IO In the thread , So don't put heavy logic in this method , Otherwise it will slow down producer The efficiency of sending messages is

(4)close:

close interceptor, Mainly used to carry out some resource cleaning work

As mentioned earlier ,interceptor May be run in multiple threads , Therefore, in the specific implementation, users need to ensure their own thread safety . In addition, if more than one is specified interceptor, be producer They will be called in the specified order , And just capture each one interceptor Exceptions that may be thrown are recorded in the error log instead of being passed up . This should be paid special attention to during the use .

5.2 Interceptor case

1) demand :

Realize a simple double interceptor Composed of interception chain . first interceptor The time stamp information will be added to the message before it is sent value At the front of ; the second interceptor The number of successful or failed messages will be updated after the message is sent .

image-20201112165815731

2) Case practice

(1) Add timestamp interceptor

package com.atguigu.kafka.interceptor; import java.util.Map; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TimeInterceptor implements ProducerInterceptor<String, String> { @Override public void configure(Map<String, ?> configs) { } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { //  Create a new record, Write the timestamp to the front of the message body  return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + "," + record.value().toString()); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } }

(2) Count the number of successful and failed messages , And in producer Print these two counters when off

package com.atguigu.kafka.interceptor; import java.util.Map; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class CounterInterceptor implements ProducerInterceptor<String, String>{ private int errorCounter = 0; private int successCounter = 0; @Override public void configure(Map<String, ?> configs) { } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { //  Count the number of successes and failures  if (exception == null) { successCounter++; } else { errorCounter++; } } @Override public void close() { //  Save results  System.out.println("Successful sent: " + successCounter); System.out.println("Failed sent: " + errorCounter); } }

(3)producer The main program

package com.atguigu.kafka.interceptor; import java.util.ArrayList; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; public class InterceptorProducer { public static void main(String[] args) throws Exception { // 1  Setting configuration information  Properties props = new Properties(); props.put("bootstrap.servers", "hadoop102:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2  Build a chain of interceptors  List<String> interceptors = new ArrayList<>(); interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor"); interceptors.add("com.atguigu.kafka.interceptor.CounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); String topic = "first"; Producer<String, String> producer = new KafkaProducer<>(props); // 3  Send a message  for (int i = 0; i < 10; i++) { ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i); producer.send(record); } // 4  Be sure to close producer, This will call interceptor Of close Method  producer.close(); } }

3) test

(1) stay kafka Start consumers on , Then run the client java Program .

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \ --zookeeper hadoop102:2181 --from-beginning --topic first

 

1501904047034,message0 1501904047225,message1 1501904047230,message2 1501904047234,message3 1501904047236,message4 1501904047240,message5 1501904047243,message6 1501904047246,message7 1501904047249,message8 1501904047252,message9

(2) Observe java The output data of platform console is as follows :

Successful sent: 10 Failed sent: 0

 

6 Kafka Streams

6.1 summary

6.1.1 Kafka Streams

Kafka Streams.Apache Kafka An integral part of an open source project . Is a powerful , Easy to use library . Used in Kafka Build highly distributed 、 Expansibility , Fault tolerant applications .

6.1.2 Kafka Streams characteristic

1) Powerful

High scalability , elastic , Fault tolerance

2) Lightweight

There is no need for specialized clusters

A library , Not the frame

3) Fully integrated

100% Of Kafka 0.10.0 Version compatibility

Easy to integrate into existing applications

4) The real time

Millisecond delay

It's not micro batch

Windows allow out of order data

Allow late data

6.1.3 Why would there be Kafka Stream

There are already a lot of Streaming Systems , The most well-known and most widely used open source streaming systems are Spark Streaming and Apache Storm.Apache Storm It has developed for many years , Widely applied , Provide record level processing capabilities , At present, it also supports SQL on Stream. and Spark Streaming be based on Apache Spark, It can be very convenient to calculate with graphs ,SQL Integration of processing and so on , Powerful , For familiarity with other Spark The user of application development has low threshold for use . in addition , At present, the mainstream Hadoop Distribution version , Such as Cloudera and Hortonworks, All integrated Apache Storm and Apache Spark, Make deployment easier .

since Apache Spark And Apache Storm With so many advantages , Then why do we need Kafka Stream Well ? There are mainly the following reasons .

First of all ,Spark and Storm They're all streaming frameworks , and Kafka Stream It's based on Kafka Stream processing class library of . The framework requires developers to develop logical parts in a specific way , For the framework to call . It's hard for developers to understand how the framework works , So the debugging cost is high , And use is limited . and Kafka Stream As a streaming class library , Provide specific classes directly to developers to call , The operation mode of the whole application is mainly controlled by the developer , Easy to use and debug .

image-20201112165842987

second , although Cloudera And Hortonworks It's convenient Storm and Spark Deployment of , But the deployment of these frameworks is still relatively complex . and Kafka Stream As a class library , It can be very convenient to embed in the application , It basically has no requirements for application packaging and deployment .

Third , As far as streaming systems are concerned , Basically supported Kafka As a data source . for example Storm Having a specialty kafka-spout, and Spark It also offers specialized spark-streaming-kafka modular . in fact ,Kafka It's basically the standard data source for mainstream streaming systems . In other words , Most streaming systems are deployed Kafka, At this time to use Kafka Stream The cost is very low .

Fourth , Use Storm or Spark Streaming when , Resources need to be reserved for the process of the framework itself , Such as Storm Of supervisor and Spark on YARN Of node manager. Even for application instances , The framework itself will also take up some of the resources , Such as Spark Streaming Need to be for shuffle and storage Reserve memory . however Kafka As a class library, it does not take up system resources .

The fifth , because Kafka Provides data persistence on its own , therefore Kafka Stream Provide rolling deployment and rolling upgrade and recalculation capabilities .

The sixth , because Kafka Consumer Rebalance Mechanism ,Kafka Stream Can adjust parallelism dynamically online .

6.2 Kafka Stream Data cleaning case

0) demand :

Real time processing of words with ”>>>” The content of the prefix . For example, the input ”atguigu>>>ximenqing”, Finally, it will be treated as “ximenqing”

1) Demand analysis :

 

2) Case practice

(1) Create a project , And add jar package

(2) Create the main class

package com.atguigu.kafka.stream; import java.util.Properties; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.TopologyBuilder; public class Application { public static void main(String[] args) { //  Define the topic String from = "first"; //  Define the output of topic String to = "second"; //  Set parameters  Properties settings = new Properties(); settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter"); settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); StreamsConfig config = new StreamsConfig(settings); //  Building a topology  TopologyBuilder builder = new TopologyBuilder(); builder.addSource("SOURCE", from) .addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() { @Override public Processor<byte[], byte[]> get() { //  Specific analysis processing  return new LogProcessor(); } }, "SOURCE") .addSink("SINK", to, "PROCESS"); //  establish kafka stream KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); } }

(3) Specific business processing

package com.atguigu.kafka.stream; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; public class LogProcessor implements Processor<byte[], byte[]> { private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; } @Override public void process(byte[] key, byte[] value) { String input = new String(value); //  If you include “>>>” Only the contents after the tag are retained  if (input.contains(">>>")) { input = input.split(">>>")[1].trim(); //  Output to the next topic context.forward("logProcessor".getBytes(), input.getBytes()); }else{ context.forward("logProcessor".getBytes(), input.getBytes()); } } @Override public void punctuate(long timestamp) { } @Override public void close() { } }

(4) Run the program

(5) stay hadoop104 Up-start producer

[atguigu@hadoop104 kafka]$ bin/kafka-console-producer.sh \ --broker-list hadoop102:9092 --topic first >hello>>>world >h>>>atguigu >hahaha

(6) stay hadoop103 Start consumers on

[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh \  --zookeeper hadoop102:2181 --from-beginning --topic second   world  atguigu  hahaha

 

7 Add

7.1 kafka Has been rebalance problem

About Kafka Three configurations of

session.timeout.ms=10000 //  Company : millisecond ,kafka There will be a heartbeat thread to synchronize the server , Tell the server that it is normal to use it , The default is 3 Send a heartbeat every second , exceed session.timeout.ms( Default 10 second ) If the server does not receive the heartbeat, it will think that the current consumer is invalid .  max.poll.interval.ms=300000 //  Company : millisecond , Determines the maximum time to commit the offset after getting the message , Beyond the set time ( Default 5 minute ), The service side will also think that the consumer is invalid .  max.poll.records=500 // Set it properly every time poll The number of messages consumed by , The default is 500, If there are too many , Lead to a time of poll The message record returned by the operation of cannot be completed in the specified time , It will appear rebalance, namely kafka The server thinks the consumer is invalid , Will reallocate the partition , The offset is not committed , This will lead to repeated consumption .

img

A friend has a problem , Find out kafka Has been rebalance,( When rebalance after , It was time to consumer Owned partitions and offset And the information doesn't work , At the same time, it leads to constant reports auto offset commit failed.) The problem is solved by changing the following two configurations ,

for example : Make sure to get 300 A message is in 400 Consume in seconds , Submit offset offset, And go again poll(). So there will be no problem .

# Turn this up a little bit , The default is 300 second  max.poll.interval.ms=400000 # Make this smaller , The default is 500 Bar message  max.poll.records=300

 

Okay , This time it's all about , knowledge has no limit , Pay attention to me , Let's learn and improve together . If you think the content is OK , Give me a favor , I'm looking at it , thank you ~ See you next time .

Data acquisition : Official account 【 Good time 】, Reply key Kafka obtain , Access to notes and learning videos .

 The official account is two-dimensional code

版权声明
本文为[Good day 1]所创,转载请带上原文链接,感谢

Scroll to Top