Welcome to my WeChat official account. 【 Lao Zhou talks about architecture 】,Java The principle of the back-end mainstream technology stack 、 Source code analysis 、 Architecture and all kinds of Internet high concurrency 、 High performance 、 Highly available solutions .
One 、 Preface
In the previous articles, we talked about Kafka Infrastructure and construction , From the beginning of this article, we will analyze a wave of source code . We use this Kafka The version is 2.7.0, Its Client End is by Java Realization ,Server End is by Scala To achieve , In the use of Kafka when ,Client It is the first part that users touch , therefore , We from Client End start , We'll start with Producer End start , Today we'll come to Producer Analyze the source code .
Two 、Producer Use
First, let's show... Through a piece of code KafkaProducer How to use . In the following example , We use KafkaProducer To achieve Kafka The ability to send messages . In the example program , First of all, will KafkaProduce Write the configuration used To Properties in , The specific meaning of each configuration is explained in the notes . After this Properties Object is constructed for parameters KafkaProducer object , Finally through send Method completes sending , The code contains synchronous sending 、 There are two cases of asynchronous transmission .
As you can see from the code above Kafka It provides users with a very simple and convenient API, When use , Just two steps :
- initialization KafkaProducer example
- call send Interface sends data
This paper mainly focuses on initialization KafkaProducer Examples and how to implement send Interface to send data .
3、 ... and 、KafkaProducer Instantiation
I understand KafkaProducer Basic use of , Then let's take a closer look at the core logic of the method :
public KafkaProducer(Properties properties) {
this(Utils.propsToMap(properties), (Serializer)null, (Serializer)null, (ProducerMetadata)null, (KafkaClient)null, (ProducerInterceptors)null, Time.SYSTEM);
}
Copy code
Four 、 Message sending process
The user is directly using producer.send() Data sent , Have a look first send() Interface implementation
// Asynchronously to a topic send data
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return this.send(record, (Callback)null);
}
// towards topic Send data asynchronously , After sending the confirmation, the callback function is called
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return this.doSend(interceptedRecord, callback);
}
Copy code
The final implementation of data sending still calls Producer Of doSend() Interface .
4.1 Interceptor
First, the method will enter the interceptor set first ProducerInterceptors , onSend The method is to traverse the interceptor onSend Fang Law , The purpose of interceptor is to process data , Kafka It does not give the implementation of the default interceptor . If you need to use the interceptor function , You must implement the interface yourself .
4.1.1 Interceptor code
4.1.2 Interceptor core logic
ProducerInterceptor The interface consists of three methods :
onSend(ProducerRecord<K, V> var1)
: The method is encapsulated in KafkaProducer.send In the method , That is, it runs in the user's main thread . Ensure that the method is called before the message is serialized to calculate the partition . The user can do anything with the message in this method , But it's best to make sure that you don't modify the topic And zoning , Otherwise, it will affect the calculation of the target partition .onAcknowledgement(RecordMetadata var1, Exception var2)
: This method will be called before the message is answered or when the message fails to be sent , And it's usually in producer Before the callback logic triggers .onAcknowledgement Running on the producer Of IO In the thread , So don't put heavy logic in this method , Otherwise it will slow down producer The efficiency of sending messages is .close()
: close interceptor, Mainly used to carry out some resource cleaning work .
Interceptors may be running in multiple threads , Therefore, in the specific implementation, users need to ensure their own thread safety . In addition, if more than one is specified interceptor, be producer They will be called in the specified order , And just capture each one interceptor Exceptions that may be thrown are recorded in the error log instead of being passed up .
4.2 Producer Of doSend Realization
Here is doSend() The concrete realization of :
stay doSend() Method implementation , One Record Data transmission , It is mainly divided into the following five steps :
- Confirm the data to be sent to topic Of metadata Is available ( If it's time to partition Of leader Existence is available , If the permission is turned on ,client Have corresponding authority ), without topic Of metadata Information , You need to get the corresponding metadata;
- serialize record Of key and value;
- Get the record To send to partition( You can specify , It can also be calculated according to the algorithm );
- towards accumulator Middle append record data , The data will be cached first ;
- If after adding data , Corresponding RecordBatch It's reached batch.size Size ( perhaps batch There is not enough space left to add the next item Record), Then wake up sender Thread sending data .
Data sending process , It can be summarized as the above five points , The specific implementation of these parts will be analyzed in detail below .
5、 ... and 、 Message sending process
5.1 obtain topic Of metadata Information
Producer adopt waitOnMetadata() Method to get the corresponding topic Of metadata Information , I'll talk about this next time .
5.2 key and value Serialization
Producer End to record Of key and value Value for serialization , stay Consumer The end is deserialized accordingly ,Kafka The serialization and deserialization algorithms provided internally are shown in the figure below : Of course, we can also customize the specific implementation of serialization , But in general ,Kafka These methods provided internally are enough to use .
5.3 Get the record To send to partition
obtain partition value , It can be divided into the following three situations :
- To specify partition Under the circumstances , Take the specified value directly as partiton value ;
- There is no indication of partition It's worth it, but it's worth it key Under the circumstances , take key Of hash Value and topic Of partition Take the remainder of a number to get partition value ;
- Neither partition It's not worth it key When it's worth it , The first call randomly generates an integer ( Every subsequent call is incremented by this integer ), Compare this value with topic Usable partition Take the rest of the total to get partition value , That's what they say round-robin Algorithm .
The specific implementation is as follows :
// When record There is partition When the value of , Go straight back to , Call... Without partitioner Of partition Method to calculate (KafkaProducer.class)
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
Copy code
Producer Default partitioner yes org.apache.kafka.clients.producer.internals.DefaultPartitioner, Users can also customize partition The strategy of , The following is the specific implementation of the default partition policy :
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return this.partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
Copy code
The core of the above default algorithm is the adhesive partition cache
5.4 towards RecordAccmulator Middle append record data
Let's talk about RecordAccumulator Look at this picture before , In this way, you will have an overall view of the whole sending process .
RecordAccmulator Assume the role of buffer . The default is 32 MB.
stay Kafka Producer in , Messages are not sent one by one broker Of , Instead, multiple messages form a ProducerBatch, Then from Sender Send it out at once , there batch.size It's not the number of messages ( Send as many as you can get together ), It's a size . The default is 16 KB, It can be optimized according to the specific situation .
stay RecordAccumulator in , The core parameter is :
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
Copy code
It's a ConcurrentMap,key yes TopicPartition class , Representing one topic One of the partition.value Is a containing ProducerBatch The two terminal queue . wait for Sender Thread sent to broker. Draw a picture to see :
I don't know if you have any questions about the above code ? Why isn't the code that allocates memory synchronized Allocation in synchronization block ? This leads to the following synchronized There are also... In the synchronization block tryAppend once .
Because at this time, other threads may have been created RecordBatch 了 , Cause extra memory requests .
If you put the allocated memory in synchronized What's wrong with the synchronization block ?
If the memory request is not received, the thread will wait , If it is placed in the synchronization block, it will not be released all the time Deque Queue lock , Then other threads will not be able to Deque The queue performs thread safe synchronization operations .
Follow me again tryAppend() Method , It's easier .
See illustration for the above code :
5.5 Wake up the sender Thread send RecordBatch
When record After writing successfully , If you find that RecordBatch The conditions for sending have been met ( Usually queue There are multiple batch, So the first ones added batch It must be possible to send ), Then it will wake up sender Threads , send out RecordBatch.
sender The thread of RecordBatch The treatment is in run() Method , The implementation of this method is as follows :
The core method is run() Methods org.apache.kafka.clients.producer.internals.Sender#sendProducerData
among pollTimeout It means the longest blocking until at least one channel is ready for the event you registered . return 0 It means that you have started .
Let's keep following :org.apache.kafka.clients.producer.internals.RecordAccumulator#ready
Finally, let's look at the method inside org.apache.kafka.clients.producer.internals.RecordAccumulator#drain, from accumulator The buffer gets the data to be sent , Maximum one-time hair max.request.size Size data .
6、 ... and 、 summary
Finally, in order to let you know Kafka Producer Have a macro structural understanding , Please look at the chart below. :
Brief description :
- new KafkaProducer() Then create a background thread KafkaThread ( The actual running thread is Sender,KafkaThread It's right Sender Encapsulation ) scanning RecordAccumulator Is there any news in the news .
- call KafkaProducer.send() Send a message , It's actually saving the message to RecordAccumulator in , It's actually saved to a Map in (ConcurrentMap<TopicPartition, Deque>), This message will be recorded in the same record batch ( The same subject, the same partition, the same batch ) Inside , All messages of this batch will be sent to the same subject and partition .
- Independent threads in the background scan to RecordAccumulator When there's news in the news , Will send a message to Kafka In the cluster ( Not as soon as there is a message , It depends on whether the news ready)
- If the transmission is successful ( The message was successfully written to Kafka), Just go back to one RecordMetaData object , It includes subject and partition information , And the offset recorded in the partition .
- If the write fails , It will return an error , The producer will try to resend the message after receiving the error ( If allowed , The message will be saved to RecordAccumulator in ), After several times, if it still fails, an error message is returned .
Okay , This paper deals with Kafka Producer The source code is analyzed , The next article will detail metadata And in Producer End metadata The update mechanism of . Coming soon ~
版权声明
本文为[Lao Zhou chat architecture]所创,转载请带上原文链接,感谢
https://cdmana.com/2021/09/20210909124112647r.html