编程知识 cdmana.com

Kafka's consumption and heartbeat

Reading guide kafka It's a distribution , The partition , More copies of the , Multi subscriber message publishing and subscribing system ( Distributed MQ System ), Can be used to search logs , Monitoring log , Access logs, etc .kafka It's a distribution , The partition , More copies of the , Multi subscriber message publishing and subscribing system ( Distributed MQ System ), Can be used to search logs , Monitoring log , Access logs, etc . Today, I'd like to show you how to learn Kafka Consumption and heartbeat mechanism .

1、Kafka consumption

First , Let's look at consumption .Kafka Provides very simple consumption API, The user just needs to initialize Kafka Of Broker Server Address , Then instantiate KafkaConsumer Class Topic Data in . A simple Kafka The consumption example code is shown below :

public class JConsumerSubscribe extends Thread { 
    public static void main(String[] args) {        JConsumerSubscribe jconsumer = new JConsumerSubscribe();        jconsumer.start();    }    /**  initialization Kafka Cluster information . */    private Properties configure() {        Properties props = new Properties();        props.put("bootstrap.servers", "dn1:9092,dn2:9092,dn3:9092");//  Appoint Kafka The cluster address  
        props.put("group.id", "ke");//  Designated consumer groups  
        props.put("enable.auto.commit", "true");//  Turn on auto submit  
        props.put("auto.commit.interval.ms", "1000");//  Time interval for automatic submission  
        //  Deserialize message primary key         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        //  Deserialize consumption records         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        return props; 
    }    /**  Implementing a single threaded consumer . */    @Override    public void run() {        //  Create a consumer instance object         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configure());        //  Subscribe to a collection of consumer topics         consumer.subscribe(Arrays.asList("test_kafka_topic")); 
        //  Real time consumption logo         boolean flag = true; 
        while (flag) { 
            //  Get topic message data             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); 
            for (ConsumerRecord<String, String> record : records) 
                //  Loop print message record                 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); 
        }        //  Abnormal closing of consumer object         consumer.close(); 
    }} 

We can easily get the above code Topic Data in . however , When we call poll Method to pull data ,Kafka Broker Server Did those things . Next , We can take a look at the implementation details of the source code . The core code is as follows :
org.apache.kafka.clients.consumer.KafkaConsumer

private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) { 
        acquireAndEnsureOpen();        try { 
            if (timeoutMs < 0) throw new IllegalArgumentException("Timeout must not be negative"); 
            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { 
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); 
            }            // poll for new data until the timeout expires 
            long elapsedTime = 0L; 
            do { 
                client.maybeTriggerWakeup();                final long metadataEnd;                if (includeMetadataInTimeout) { 
                    final long metadataStart = time.milliseconds();                    if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) { 
                        return ConsumerRecords.empty(); 
                    }                    metadataEnd = time.milliseconds();                    elapsedTime += metadataEnd - metadataStart;                } else { 
                    while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) { 
                        log.warn("Still waiting for metadata"); 
                    }                    metadataEnd = time.milliseconds();                }                final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));                if (!records.isEmpty()) { 
                    // before returning the fetched records, we can send off the next round of fetches 
                    // and avoid block waiting for their responses to enable pipelining while the user 
                    // is handling the fetched records. 
                    // 
                    // NOTE: since the consumed position has already been updated, we must not allow 
                    // wakeups or any other errors to be triggered prior to returning the fetched records. 
                    if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { 
                        client.pollNoWakeup();                    }                    return this.interceptors.onConsume(new ConsumerRecords<>(records)); 
                }                final long fetchEnd = time.milliseconds();                elapsedTime += fetchEnd - metadataEnd;            } while (elapsedTime < timeoutMs); 
            return ConsumerRecords.empty(); 
        } finally { 
            release();        }    } 

There is a code in the above method pollForFetches, Its implementation logic is as follows :

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) { 
        final long startMs = time.milliseconds(); 
        long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs); 
        // if data is available already, return it immediately 
        final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); 
        if (!records.isEmpty()) { 
            return records; 
        } 
        // send any new fetches (won't resend pending fetches) 
        fetcher.sendFetches(); 
        // We do not want to be stuck blocking in poll if we are missing some positions 
        // since the offset lookup may be backing off after a failure 
        // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call 
        // updateAssignmentMetadataIfNeeded before this method. 
        if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) { 
            pollTimeout = retryBackoffMs; 
        } 
        client.poll(pollTimeout, startMs, () -> { 
            // since a fetch might be completed by the background thread, we need this poll condition 
            // to ensure that we do not block unnecessarily in poll() 
            return !fetcher.hasCompletedFetches(); 
        }); 
        // after the long poll, we should check whether the group needs to rebalance 
        // prior to returning data so that the group can stabilize faster 
        if (coordinator.rejoinNeededOrPending()) { 
            return Collections.emptyMap(); 
        } 
        return fetcher.fetchedRecords(); 
    } 

The bold position in the above code , We can see that every time a consumer client pulls data , adopt poll Method , First call fetcher Medium fetchedRecords function , If you can't get the data , It will launch a new sendFetches request . And when it comes to consumer data , Each batch from Kafka Broker Server There is a limit to the maximum amount of data to pull data from , The default is 500 strip , Attribution property (max.poll.records) control , You can set the attribute value in the client to adjust the amount of data pulled each time we consume .

Tips : What needs to be noted here is ,max.poll.records Back to a poll Total data requested , It's not about how many partitions . therefore , Each consumption is pulled from all partitions Topic The total number of pieces of data will not exceed max.poll.records The value set .

And in the Fetcher In the class , stay sendFetches There are restrictions in the method to pull data capacity , Attribution property (max.partition.fetch.bytes), Default 1MB. There may be a scene like this , When satisfied max.partition.fetch.bytes Limiting conditions , if necessary Fetch Out 10000 Bar record , Every time you default 500 strip , So we need to implement 20 This time, you can send all the requests through the network Fetch complete .

here , Some students may have questions , We can't put the default max.poll.records Set the attribute value to 10000 Do you ? It can be adjusted , But there's another attribute that needs to work together , This is every time poll Timeout for (Duration.ofMillis(100)), Here you need to set the timeout according to your actual capacity of each data , If you set the maximum to 10000, When you have a lot of capacity per record , The overtime is still 100ms, Then it is possible to pull less data than 10000 strip .

And here , There's another thing to watch out for , It's the problem of session timeout .session.timeout.ms The default is 10s,group.min.session.timeout.ms The default is 6s,group.max.session.timeout.ms The default is 30min. When you're dealing with the business logic of consumption , If in 10s I haven't finished processing , Then the consumer client will interact with Kafka Broker Server To break off , Consumption data , Produced offset You can't submit it to Kafka, because Kafka Broker Server At this point, the consumer program is considered disconnected , And even if you set the auto submit property , Or set up auto.offset.reset attribute , When you consume, there will still be repeated consumption , That's why session.timeout.ms It's caused by overtime .

2、 heartbeat

When it's at the end , When it comes to session timeout, messages are consumed repeatedly , Why there are overtime ? Some students have such questions , My consumer thread is clearly started , And I didn't quit , Why can't we consume Kafka The news of ? The consumer group can't find my ConsumerGroupID Well ? This may be caused by overtime , and Kafka It's through the heartbeat mechanism to control the timeout , The heartbeat mechanism is insensitive to the consumer client , It's an asynchronous thread , When we start a consumer instance , The heartbeat thread starts to work .

stay org.apache.kafka.clients.consumer.internals.AbstractCoordinator Will start a HeartbeatThread Thread to send the heartbeat and detect the status of the consumer . Every consumer has one org.apache.kafka.clients.consumer.internals.ConsumerCoordinator, And each ConsumerCoordinator Will start a HeartbeatThread Threads to maintain the heartbeat , Heartbeat information is stored in org.apache.kafka.clients.consumer.internals.Heartbeat in , Declarative Schema As shown below :

private final int sessionTimeoutMs; 
    private final int heartbeatIntervalMs; 
    private final int maxPollIntervalMs; 
    private final long retryBackoffMs; 
    private volatile long lastHeartbeatSend;  
    private long lastHeartbeatReceive; 
    private long lastSessionReset; 
    private long lastPoll; 
    private boolean heartbeatFailed; 

In the heartbeat thread run Method implementation code is as follows :

public void run() { 
            try { 
                log.debug("Heartbeat thread started"); 
                while (true) { 
                    synchronized (AbstractCoordinator.this) { 
                        if (closed) 
                            return; 
                        if (!enabled) { 
                            AbstractCoordinator.this.wait(); 
                            continue; 
                        }                        if (state != MemberState.STABLE) { 
                            // the group is not stable (perhaps because we left the group or because the coordinator 
                            // kicked us out), so disable heartbeats and wait for the main thread to rejoin. 
                            disable(); 
                            continue; 
                        } 
                        client.pollNoWakeup(); 
                        long now = time.milliseconds(); 
                        if (coordinatorUnknown()) { 
                            if (findCoordinatorFuture != null || lookupCoordinator().failed()) 
                                // the immediate future check ensures that we backoff properly in the case that no 
                                // brokers are available to connect to. 
                                AbstractCoordinator.this.wait(retryBackoffMs); 
                        } else if (heartbeat.sessionTimeoutExpired(now)) { 
                            // the session timeout has expired without seeing a successful heartbeat, so we should 
                            // probably make sure the coordinator is still healthy. 
                            markCoordinatorUnknown(); 
                        } else if (heartbeat.pollTimeoutExpired(now)) { 
                            // the poll timeout has expired, which means that the foreground thread has stalled 
                            // in between calls to poll(), so we explicitly leave the group. 
                            maybeLeaveGroup(); 
                        } else if (!heartbeat.shouldHeartbeat(now)) { 
                            // poll again after waiting for the retry backoff in case the heartbeat failed or the 
                            // coordinator disconnected 
                            AbstractCoordinator.this.wait(retryBackoffMs); 
                        } else { 
                            heartbeat.sentHeartbeat(now); 
                            sendHeartbeatRequest().addListener(new RequestFutureListener() { 
                                @Override 
                                public void onSuccess(Void value) { 
                                    synchronized (AbstractCoordinator.this) { 
                                        heartbeat.receiveHeartbeat(time.milliseconds()); 
                                    } 
                                } 
                                @Override 
                                public void onFailure(RuntimeException e) { 
                                    synchronized (AbstractCoordinator.this) { 
                                        if (e instanceof RebalanceInProgressException) { 
                                            // it is valid to continue heartbeating while the group is rebalancing. This 
                                            // ensures that the coordinator keeps the member in the group for as long 
                                            // as the duration of the rebalance timeout. If we stop sending heartbeats, 
                                            // however, then the session timeout may expire before we can rejoin. 
                                            heartbeat.receiveHeartbeat(time.milliseconds()); 
                                        } else { 
                                            heartbeat.failHeartbeat(); 
                                            // wake up the thread if it's sleeping to reschedule the heartbeat 
                                            AbstractCoordinator.this.notify(); 
                                        } 
                                    } 
                                } 
                            }); 
                        } 
                    } 
                } 
            } catch (AuthenticationException e) { 
                log.error("An authentication error occurred in the heartbeat thread", e); 
                this.failed.set(e); 
            } catch (GroupAuthorizationException e) { 
                log.error("A group authorization error occurred in the heartbeat thread", e); 
                this.failed.set(e); 
            } catch (InterruptedException | InterruptException e) { 
                Thread.interrupted(); 
                log.error("Unexpected interrupt received in heartbeat thread", e); 
                this.failed.set(new RuntimeException(e)); 
            } catch (Throwable e) { 
                log.error("Heartbeat thread failed due to unexpected error", e); 
                if (e instanceof RuntimeException) 
                    this.failed.set((RuntimeException) e); 
                else 
                    this.failed.set(new RuntimeException(e)); 
            } finally { 
                log.debug("Heartbeat thread has closed"); 
            } 
        } 

In the heartbeat thread, there are two of the most important timeout functions , They are sessionTimeoutExpired and pollTimeoutExpired.

public boolean sessionTimeoutExpired(long now) { 
        return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeoutMs; 
}public boolean pollTimeoutExpired(long now) { 
        return now - lastPoll > maxPollIntervalMs; 
} 

2.1、sessionTimeoutExpired

If it is sessionTimeout Overtime , Is marked as the current coordinator processing break , here , Will remove the consumer , Reallocate the relationship between partitions and consumers . stay Kafka Broker Server in ,Consumer Group Defined 5 in ( If you include Unknown, Should be 6 States ) state ,org.apache.kafka.common.ConsumerGroupState, As shown in the figure below :

Kafka Consumption and heartbeat Kafka Consumption and heartbeat

2.2、pollTimeoutExpired

If it triggers poll Overtime , At this point, the consumer client will exit ConsumerGroup, When again poll When , Will rejoin ConsumerGroup, Trigger RebalanceGroup. and KafkaConsumer Client It won't help us repeat poll Of , We need to constantly call in our own consumption logic poll Method .

3. Partitioning and consuming threads

On the corresponding relationship between consumption partition and consumption thread , Theoretically, the number of threads consumed should be less than or equal to the number of partitions . There was such a view before , A consumer thread corresponds to a partition , When consuming threads equals the number of partitions, it maximizes the utilization of threads . Use it directly KafkaConsumer Client example , There's really no problem using it like this . however , If we have rich CPU, In fact, you can also use threads larger than the number of partitions , To improve the consumption ability , This requires us to KafkaConsumer Client Example to transform , Realize consumption strategy precomputing , Take advantage of the extra CPU Open more threads , To realize the consumption task segmentation .Linux It's time to learn

版权声明
本文为[osc_ kzisjxiy]所创,转载请带上原文链接,感谢

Scroll to Top