编程知识 cdmana.com

Springboot integrates Kafka

One 、 background

Here is a brief record SpringBoot and Kafka Integration of .

Two 、 Implementation steps

1、 introduce jar package

<dependency>
   <groupid>org.springframework.kafka</groupid>
    <artifactid>spring-kafka</artifactid>
</dependency>

2、 Write the configuration of producers and consumers

3、 Producer allocation

spring.application.name=kafka-springboot
#  To configure  kafka  The address of the server , Multiple are separated by commas 
spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094
#  Producer allocation 
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.acks=1
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432

4、 Consumer configuration

#  Consumer configuration 
#  Turn off auto submit  ack
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.max-poll-records=500
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#  Configure listening for manual submission  ack , After consuming a piece of data , Submit... Immediately 
spring.kafka.listener.ack-mode=manual_immediate
#  After testing, it is also submitted in batch ack ,  When consumption is over  spring.kafka.consumer.max-poll-records  With so much data , Submit 
#spring.kafka.listener.ack-mode=manual
spring.kafka.listener.poll-timeout=500S

5、 Consumers manually submit ack

1、spring.kafka.consumer.enable-auto-commit Modified into false
2、spring.kafka.listener.ack-mode Modified into
            |- manual: Indicates manual submission , But after testing, it was found that it was a batch submission
            |- manual_immediate: Indicates manual submission , When calling Acknowledgment#acknowledge And then immediately submit .

3、 Write producer code

@Component
public class KafkaProducer implements CommandLineRunner {

    @Autowired
    private KafkaTemplate<string, string> kafkaTemplate;

    @Override
    public void run(String... args) {
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -&gt;
                {
                    kafkaTemplate.send(KafkaConstant.TOPIC, String.valueOf(System.currentTimeMillis()))
                            .addCallback(new SuccessCallback<sendresult<string, string>&gt;() {
                                @Override
                                public void onSuccess(SendResult<string, string> result) {
                                    if (null != result.getRecordMetadata()) {
                                        System.out.println(" Consumption sent successfully  offset:" + result.getRecordMetadata().offset());
                                        return;
                                    }
                                    System.out.println(" Message sent successfully ");
                                }
                            }, new FailureCallback() {
                                @Override
                                public void onFailure(Throwable throwable) {
                                    System.out.println(" Failed to send consumption :" + throwable.getMessage());
                                }
                            });
                },
                0, 1, TimeUnit.SECONDS);
    }
}

1、 Sending and using of consumption KafkaTemplate .
2、 According to the result of sending , Message sent successfully or failed .

4、 Write consumer code

@Component
public class KafkaConsumer {

    @KafkaListener(topics = KafkaConstant.TOPIC, groupId = "kafka-springboot-001")
    public void consumer(ConsumerRecord<string, string> record, Acknowledgment ack) throws InterruptedException {
        System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + " Received kafka news ,partition:" + record.partition() + ",offset:" + record.offset() + "value:" + record.value());
        TimeUnit.SECONDS.sleep(1);
        ack.acknowledge();
    }
}

KafkaListener:
      topic: Indicates the name of the queue to listen to
      groupId: Represents the consumer group id

3、 ... and 、 Running results

 Running results

Four 、 Reference documents

1、https://docs.spring.io/spring-boot/docs/2.4.2/reference/htmlsingle/#boot-features-kafka

5、 ... and 、 Code path

https://gitee.com/huan1993/rabbitmq/tree/master/kafka-springboot/src/main/java/com/huan/study/kafka</string,></string,></sendresult<string,></string,>

版权声明
本文为[Yu Yan]所创,转载请带上原文链接,感谢
https://cdmana.com/2021/01/20210131193720640p.html

Scroll to Top