编程知识 cdmana.com

Read Kafka data by Flink DDL

Steps :

service firewalld stop( Turn off firewall )

Start hadoop

Leave security mode

Start zookeeper And kafka Jiji

operation command remarks
see topic $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181

If you want to delete topic, It can be :

 


 

Go to kafka_ddl This topic send out json news $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic kafka_ddl

You may come across it here [2] Wrong report in , Check the port and configuration file in the command server.properties Medium listeners The port of is strictly consistent

[2] The error reported in may also be from a node kafka It's caused by hanging up .

 

May encounter [3]

Pay attention to turn off the firewall

 

 

Use kafka Bring your own consumer to test consumption $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic kafka_ddl

If kafka There's something wrong with your own consumer test , So you don't have to go down there ,

If you use Flink SQL Client There are bound to be problems with consumption

eliminate topic All data in [6]( because , What if you make a mistake ? Right ) $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic kafka_ddl

need $KAFKA/config/server.properties Set up

delete.topic.enable=true

kafka Data input from the production side :

{"name": "apple1","age": "18","city":  "NingBo","address": "100.00","ts": "1556420980000"}
{"name": "apple2","age": "20","city": "JiaXing","address": "130.00","ts": "1556421040000"}
{"name": "apple3","age": "18","city": "JiangXi","address": "120.00","ts": "1556421100000"}
{"name": "apple4","age": "19","city": "JiangXi","address": "100.00","ts": "1556421120000"}
{"name": "apple5","age": "18","city":  "NingBo","address": "150.00","ts": "1556421480000"}
{"name": "apple6","age": "18","city":  "NingBo","address": "110.00","ts": "1556421510000"}
{"name": "apple7","age": "19","city": "JiaXing","address": "110.00","ts": "1556421570000"}
{"name": "apple8","age": "20","city":  "NingBo","address": "100.00","ts": "1556421630000"}
{"name": "apple9","age": "17","city": "JiangXi","address": "110.00","ts": "1556421655000"}

 

The complete project is as follows :

 

 

Reference:

[1]FlinkSQL Use DDL Sentence creation kafka Source table

[2]Kafka The connection server appears :Connection to node 1 (localhost/127.0.0.1:9092) could not be established.

[6]Is there a way to delete all the data from a topic or delete the topic before every run?

 

 

版权声明
本文为[Applied Sciences]所创,转载请带上原文链接,感谢
https://cdmana.com/2020/12/20201224152600378p.html

Scroll to Top