编程知识 cdmana.com

Kafka high performance architecture

At the macro level

utilize Partition Parallel processing

Partition Provides parallel processing capabilities

Kafka It's a Pub-Sub Message system , Whether it's publish or subscribe , All must be specified Topic. Such as 《Kafka Design analysis ( One )- Kafka Background and structure Introduction 》 As stated in the article ,Topic It's just a logical concept . Every Topic All contain one or more Partition, Different Partition Can be located at different nodes . meanwhile Partition Physically correspond to a local folder , Every Partition Contains one or more Segment, Every Segment Contains a data file and an index file corresponding to it . Logically , You can put a Partition As a very long array , Through this “ Array ” The index of (offset) To access its data .

One side , Because of the difference Partition Can be located on different machines , So we can make full use of the advantages of cluster , Parallel processing between machines . On the other hand , because Partition Physically correspond to a folder , Even if there are more than one Partition On the same node , You can also configure different nodes on the same node Partition Put in different disk drive On , So as to realize parallel processing between disks , Give full play to the advantages of multiple disks .

The specific way to use multiple disks is , Put different disks mount To different directories , And then in server.properties in , take log.dirs Set to multi Directory ( Separate with commas ).Kafka Will automatically Partition Distribute as evenly as possible to different directories, i.e. different directories ( That is to say, different. disk) On .

notes : Although the smallest physical unit is Segment, but Kafka It doesn't offer the same Partition Inside is different Segment Parallel processing between . Because for writing , I only write Partition One in Segment, And for reading , It will only read the same in sequence Partition Internal differences Segment.

Partition Is the minimum concurrency granularity

Like 《Kafka Design analysis ( Four )- Kafka Consumer Design analysis 》 As stated in the article , many Consumer Consume the same Topic when , The same message Consumer Group One in Consumer What we consume . And data is not allocated on a message by message basis , But rather Partition Assign to units , That is the same Partition The data will only be processed by one Consumer What we consume ( Don't think about Rebalance Under the premise of ).

If Consumer More than Partition The number of , Then there will be some Consumer It's impossible to consume Topic Any data for , That is, when Consumer The number exceeds Partition after , increase Consumer It doesn't increase parallelism .

In short ,Partition The number determines the maximum possible degree of parallelism . As shown in the figure below , because Topic 2 Contains only 3 individual Partition, so group2 Medium Consumer 3、Consumer 4、Consumer 5 Can be consumed separately 1 individual Partition The data of , and Consumer 6 You can't spend Topic 2 Any data for .  Insert picture description here

With Spark consumption Kafka Take the data , If what is consumed Topic Of Partition The number of N, It works Spark The degree of parallelism is also the maximum N. Even if Spark Of Executor The number is set to N+M, At most, there is only N individual Executor This can be handled at the same time Topic The data of .

ISR Dynamic balance of availability and data consistency

CAP theory

CAP Theory means , Distributed system , Uniformity 、 Availability and partition tolerance can only meet two requirements at most .

Uniformity

The result of the write operation through one node is visible to the read operation through other nodes If you update the data , In the case of concurrent access, subsequent read operations can sense the update immediately , It's called strong consistency

If some or all of them are not aware of the update , It's called weak consistency If after a period of time ( Usually the time is not fixed ) after , You must be aware of the update , It's called final consistency

Usability

Any node that does not fail must return a reasonable result in a limited time

Zone tolerance

When some nodes are down or unable to communicate with other nodes , The functions of the distributed system can be maintained among the partitions

generally speaking , All require partition tolerance . So in CAP In theory , What's more, there's a trade-off between usability and consistency .

Common data replication and consistency schemes

Master-Slave

RDBMS The separation of reading and writing is typical Master-Slave programme

Synchronous replication ensures strong consistency but impacts availability

Asynchronous replication provides high availability but reduces consistency

WNR

Mainly used in decentralized distributed system .DynamoDB And Cassandra That is, to adopt this scheme or its variant

N Represents the total number of copies ,W Represents the minimum number of successfully written copies to be guaranteed for each write operation ,R Represents the number of copies to read at least each time

When W+R>N when , Ensure that at least one copy of each read data has the latest data

The order of multiple write operations is hard to guarantee , May result in inconsistent write order between multiple copies .Dynamo Guarantee final consistency through vector clock

Paxos And its variants

Google Of Chubby,Zookeeper Atomic broadcasting protocol of (Zab),RAFT etc.

be based on ISR Data replication solution for

Such as 《 Kafka High Availability( On )》 As stated in the article ,Kafka Data replication of Partition Unit . Data replication between multiple backups , adopt Follower towards Leader Pull data complete . From one point of view ,Kafka The data replication solution of is close to the one mentioned above Master-Slave programme . The difference is ,Kafka Is not a full synchronous replication , It's not a complete asynchronous replication either , It's based on ISR Dynamic replication scheme for .

ISR, That is to say In-sync Replica. Every Partition Of Leader Will maintain such a list , In this list , Contains all the Replica( contain Leader own ). Every time data is written , Only ISR All in Replica It's all copied ,Leader It will be set as Commit, It can be Consumer What we consume .

This program , Very close to synchronous replication . But the difference is , This ISR By Leader Dynamically maintained . If Follower It can't be tight “ keep pace with ”Leader, It will be Leader from ISR Remove , When it comes back “ keep pace with ”Leader after , Will be Leader Add... Again ISR in . Every time it changes ISR after ,Leader Will bring the latest ISR Persist to Zookeeper in .

As for how to judge a certain Follower whether “ keep pace with ”Leader, Different versions Kafka The strategy is slightly different .

  • about 0.8.* edition , If Follower stay replica.lag.time.max.ms Time has not given way to Leader send out Fetch request ( Data replication request ), be Leader Will take it from ISR Remove . If a Follower Keep going to Leader send out Fetch request , But it's connected to Leader The data gap is replica.lag.max.messages above , Will also be Leader from ISR Remove .
  • from 0.9.0.0 Version start ,replica.lag.max.messages Removed , so Leader Stop thinking about Follower The number of messages behind . in addition ,Leader Not only will they judge Follower Whether in replica.lag.time.max.ms Time to send it Fetch request , At the same time, we will also consider Follower Whether to keep in sync with it in that time .
  • 0.10.* Version strategy and 0.9.* Version consistent

about 0.8.* Version of replica.lag.max.messages Parameters , Many readers have left messages asking questions , Since there is only ISR All in Replica After copying, the message is regarded as Commit, Why did it happen Follower And Leader When the gap is too big . The reason lies in ,Leader You don't have to wait for the previous message to be Commit Just received the last message . in fact ,Leader Can receive a large number of messages in order , The latest news Offset Is recorded as LEO(Log end offset). And only by ISR All in Follower It's only when the message is copied Commit,Consumer Can only consume by Commit The news of , The latest by Commit Of Offset Is recorded as High watermark. let me put it another way ,LEO The mark is Leader Of the latest information saved offset, and High watermark Marked with the latest consumable ( Synchronized to ISR Medium Follower) news . and Leader The reception of data and Follower Replication of data is asynchronous , So there will be Hight watermark And LEO Where there is a certain gap .0.8.* In the version replica.lag.max.messages Limit the Leader The maximum value of the gap allowed .

Kafka be based on ISR The principle of the data replication scheme is shown in the figure below . Kafka Replication

As shown in the figure above , In the first step ,Leader A Total received 3 Bar message , So high watermark by 3, But because of ISR Medium Follower It's only synchronized with the second one 1 Bar message (m1), So only m1 By Commit, That is to say, only m1 Can be Consumer consumption . here Follower B And Leader A The gap is 1, and Follower C And Leader A The gap is 2, No more than the default replica.lag.max.messages, So it can be preserved in ISR in . In the second step , Because of the old Leader A Downtime , new Leader B stay replica.lag.time.max.ms Did not receive from within time A Of Fetch request , Therefore, the A from ISR Remove , here ISR={B,C}. meanwhile , Because of the new Leader B There are only 2 Bar message , It does not contain m3(m3 Has never been Leader the Commit), therefore m3 Can't be Consumer consumption . In the fourth step ,Follower A Back to normal , It will go down before Commit Delete all messages of , And then from the end Commit The next news of the past began to catch up with the new Leader B, Until it “ catch up with ” new Leader, To be rejoined with new ISR in .

Use ISR The reason for the plan

  • because Leader Removable and not synchronized in time Follower, Therefore, compared with synchronous replication, it can avoid the slowest Follower Slow down the overall speed , That is to say ISR Improved system availability .
  • ISR All in Follower It's all about Commit I've heard from you , And only Commit The news that has passed will be Consumer consumption , So from Consumer From the point of view ,ISR All in Replica Are always in sync , Compared with the asynchronous replication scheme, the data consistency is improved .
  • ISR It can be adjusted dynamically , In extreme cases , It can only contain Leader, Greatly improves tolerable downtime Follower The number of . And Majority Quorum The scheme is compared with , Tolerate failures of the same number of nodes , Nearly half of the total number of nodes required .

ISR Related configuration instructions

  • Broker Of min.insync.replicas The parameter specifies Broker Required ISR Minimum length , The default value is 1. That is, in the extreme case ISR It can only contain Leader. But at this point, if Leader Downtime , Then Partition Unavailable , Availability is not guaranteed .
  • Only by ISR All in Replica The synchronized messages are Commit, but Producer When publishing data ,Leader It doesn't need to ISR All in Replica Only when the data is synchronized can the received data be confirmed .Producer Can pass acks Parameter specifies the minimum number of Replica Only when the message is received can the message be regarded as successful .acks The default value of is 1, namely Leader Tell... As soon as you receive the message Producer Received the message , At this time, if in ISR Before copying the message in Leader Downtime , Then the message will be lost . If you set the value to 0, be Producer After sending data , Immediately consider the data sent successfully , Don't wait for anything , In fact, the data may fail to be sent , also Producer Of Retry The mechanism will not work . The more recommended approach is , take acks Set to all perhaps -1, At this time only ISR All in Replica All received the data ( That is, the news is Commit),Leader Will tell Producer The message was sent successfully , To ensure that there is no unknown data loss .

Specific implementation level

Efficient use of disks

Write disks in order

according to 《 In some scenarios, sequential disk writing is faster than random memory writing 》 Described , Change the process of writing disk to sequential write , Can greatly improve the utilization of disk .

Kafka In the whole design of ,Partition It's like a very long array , and Broker All received messages are written to this large array in order . meanwhile Consumer adopt Offset Consume these data in sequence , And do not delete the consumed data , Thus, the process of random disk writing is avoided .

Due to limited disks , It is not possible to save all data , Actually, as a messaging system Kafka And there's no need to save all the data , Need to delete old data . And this deletion process , Not by use “ read - Write ” Mode to modify files , It's going to be Partition Divided into several Segment, Every Segment Corresponding to a physical file , Delete by deleting the entire file Partition The data in . This is the way to clean up old data , It also avoids random write to files .

The following code shows that ,Kafka Delete Segment The way , Delete directly Segment Corresponding whole log The document and the whole index File instead of deleting parts of the file .

/**
 * Delete this log segment from the filesystem.
 *
 * @throws KafkaStorageException if the delete fails.
 */
def delete() {
  val deletedLog = log.delete()
  val deletedIndex = index.delete()
  val deletedTimeIndex = timeIndex.delete()
  if(!deletedLog && log.file.exists)
    throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.")
  if(!deletedIndex && index.file.exists)
    throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.")
  if(!deletedTimeIndex && timeIndex.file.exists)
    throw new KafkaStorageException("Delete of time index " + timeIndex.file.getName + " failed.")
}

make the best of Page Cache

Use Page Cache The benefits are as follows :

  • I/O Scheduler Continuous block writes are assembled into large physical writes to improve performance
  • I/O Scheduler Will try to reorder some writes , So as to reduce the moving time of the disk head
  • Make the most of all free memory ( Not JVM Memory ). If you use an application layer Cache( namely JVM Heap memory ), Will increase GC burden
  • Read operations can be performed directly in Page Cache Inside . If consumption and production are at the same speed , You don't even need to go through a physical disk ( Directly through Page Cache) Exchange data
  • If the process restarts ,JVM Internal Cache It will fail , but Page Cache Still available

Broker After receiving the data , Write data to disk only Page Cache, There is no guarantee that the data will be completely written to disk . From this point of view , May cause machine downtime ,Page Cache Data in is not written to disk, resulting in data loss . But this kind of loss only occurs when the operating system does not work due to the power failure of the machine , And this kind of scene can be completely controlled by Kafka Level of Replication Mechanisms to solve . If you force the Page Cache Data in Flush To disk , Performance will be reduced . And that's why ,Kafka Although the flush.messages and flush.ms Two parameters will Page Cache Data enforcement in Flush To disk , however Kafka Not recommended .

If the data consumption speed is equal to the production speed , You don't even need to exchange data through a physical disk , But directly through Page Cache Exchange data . meanwhile ,Follower from Leader Fetch Data time , Can also pass Page Cache complete . The figure below shows a Partition Of Leader Network of nodes / Disk read and write information .

Kafka I/O page cache

You can see from the above picture that , The Broker Every second through the network from Producer Receive an appointment 35MB data , Although there are Follower From the Broker Fetch data , But it's time to Broker Basically no read disk . This is because of the Broker Directly from Page Cache Take out the data and return it to Follower.

Support more Disk Drive

Broker Of log.dirs Configuration item , Allows configuration of multiple folders . If there are more than one on the machine Disk Drive, Can be different Disk Mount to a different directory , Then configure these directories to log.dirs in .Kafka Will try to make different Partition Assign to a different directory , That is, different Disk On , To make the most of Disk The advantages of .

Zero copy

Kafka There is a large amount of network data persistent to disk in (Producer To Broker) And disk files sent over the network (Broker To Consumer) The process of . The performance of this process has a direct impact Kafka The overall throughput of .

Four copies and four context switches in traditional mode

Take sending disk files over the network as an example . In the traditional mode , Generally, the following pseudo code is used to read the file data into memory first , And then through Socket Send out data in memory .

buffer = File.read
Socket.send(buffer)

There are actually four copies of data in this process . First, read the file data into kernel state through system call Buffer(DMA Copy ), The application then places the memory state Buffer Data read in to user status Buffer(CPU Copy ), Then the user program passes Socket Send data in user mode Buffer Copy data to kernel Buffer(CPU Copy ), Finally through DMA Copy copy data to NIC Buffer. meanwhile , There are also four context switches , As shown in the figure below .

BIO  Four copies   Four context switches

sendfile and transferTo Realize zero copy

Linux 2.4+ The kernel passes through sendfile system call , Provides zero copy . Data is passed through DMA Copy to kernel state Buffer after , Directly through DMA copy to NIC Buffer, There is no need to CPU Copy . It's also the source of the term zero copy . In addition to reducing data copies , Because the whole read file - The network is sent by a sendfile Call complete , There are only two context switches in the whole process , So the performance is greatly improved . The zero copy process is shown in the figure below .

BIO  Zero copy   Two context switches

In terms of specific implementation ,Kafka Data transmission through TransportLayer To complete , Its subclass PlaintextTransportLayer adopt Java NIO Of FileChannel Of transferTo and transferFrom Method to achieve zero copy , As shown below .

@Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
    return fileChannel.transferTo(position, count, socketChannel);
}

notes : transferTo and transferFrom No guarantee of zero copy . Whether you can actually use zero copy is related to the operating system , If the operating system provides sendfile Such a zero copy system call , Then these two methods will make full use of the advantages of zero copy through such system call , Otherwise, zero copy cannot be realized through these two methods .

Reduce network overhead

The batch

Batch processing is a common way to improve I/O The way performance works . Yes Kafka for , Batch processing reduces the Overhead, It also improves the efficiency of disk writing .

Kafka 0.8.1 And before Producer Distinguish synchronization Producer And asynchronous Producer. Sync Producer Of send There are two main methods . One is to accept a KeyedMessage As a parameter , One message at a time . The other is to accept a batch of KeyedMessage As a parameter , Send multiple messages at once . And for asynchronous sending , Whatever you use send Method , Implementation will not immediately send messages to Broker, It's stored in the internal queue first , Until the number of messages reaches the threshold or reaches the specified value Timeout To actually send the message out , Thus, the batch sending of messages is realized .

Kafka 0.8.2 Start supporting new Producer API, Will synchronize Producer And asynchronous Producer combination . Although send Look at the interface , Only one can be sent at a time ProducerRecord, Not like the previous version send Accept the message list in the same way , however send Method does not send the message immediately , But through batch.size and linger.ms Control the actual transmission frequency , In order to achieve batch sending .

Due to every network transmission , In addition to transmitting the message itself , But also transfer a lot of network protocol itself ( be called Overhead), So combine multiple messages together for transmission , Can effectively reduce the network transmission Overhead, Thus, the transmission efficiency is improved .

You can see from the diagram in the zero copy chapter , although Broker Continue to receive data from the network , But disk writing doesn't happen every second , Instead, write to the disk at intervals , And the amount of data written to the disk is very large ( Up to 718MB/S).

Data compression reduces network load

Kafka from 0.7 Start , That is, data can be compressed and then transferred to Broker. Except that each message can be compressed separately and then transmitted ,Kafka It also supports batch sending , Will the whole Batch Messages are compressed and transmitted together . One of the basic principles of data compression is , The more duplicate data, the better compression effect . So it will be the whole Batch Data compression together can greatly reduce the amount of data , So as to improve the network transmission efficiency to a greater extent .

Broker After receiving the message , Do not decompress directly , Instead, messages are persisted directly to disk in compressed form .Consumer Fetch Decompress after data . therefore Kafka Not only does the compression of Producer To Broker Network transmission load of , It also reduces Broker Load of disk operation , Also reduced. Consumer And Broker Network traffic between , Thus greatly improving the transmission efficiency , Increased throughput .

Efficient serialization

Kafka News Key and Payload( Or say Value) The type of is customizable , Just provide the corresponding serializer and deserializer at the same time . So users can use fast and compact serialization - Deserialization method ( Such as Avro,Protocal Buffer) To reduce the data size of actual network transmission and disk storage , To improve throughput . Pay attention here , If the serialization method used is too slow , Even if the compression ratio is very high , The ultimate efficiency is not necessarily high . [ Take it with you !Python 3.9 Official Chinese documents , Time limited collection !] (http://dwz.date/dE6v)

[ Time limit ! Quick collar !14 Zhang HD Python Quick reference table , It is necessary to improve efficiency !] (http://dwz.date/dE6w)

[GitHub Star sign 3W+,80 individual Python Case study , Take you easy to play Python Study !] (http://dwz.date/dE64)

版权声明
本文为[Love to learn]所创,转载请带上原文链接,感谢
https://cdmana.com/2020/12/20201224162433976t.html

Scroll to Top