编程知识 cdmana.com

flink DDL读取kafka数据

步驟:

service firewalld stop(关闭防火墙)

啓動hadoop

離開安全模式

啓動zookeeper與kafka集羣

操作 命令 备注
查看topic $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181

如果想删除topic,可以是:

 


 

往kafka_ddl 这个 topic发送 json 消息 $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic kafka_ddl

这里可能碰到[2]中的报错,注意检查命令中端口与配置文件server.properties中的listeners的端口严格保持一致

[2]中的报错还可能是某个节点的kafka挂掉导致的.

 

可能碰到[3]

注意关闭防火墙

 

 

使用kafka自带消费端测试下消费 $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic kafka_ddl

如果kafka自带消费者测试有问题,那么就不用继续往下面做了,

此时如果使用Flink SQL Client来消费也必然会出现问题

清除topic中所有数据[6](因为,万一你输错了呢?对吧) $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic kafka_ddl

需要$KAFKA/config/server.properties设置

delete.topic.enable=true

kafka生產端輸入的數據:

{"name": "apple1","age": "18","city":  "NingBo","address": "100.00","ts": "1556420980000"}
{"name": "apple2","age": "20","city": "JiaXing","address": "130.00","ts": "1556421040000"}
{"name": "apple3","age": "18","city": "JiangXi","address": "120.00","ts": "1556421100000"}
{"name": "apple4","age": "19","city": "JiangXi","address": "100.00","ts": "1556421120000"}
{"name": "apple5","age": "18","city":  "NingBo","address": "150.00","ts": "1556421480000"}
{"name": "apple6","age": "18","city":  "NingBo","address": "110.00","ts": "1556421510000"}
{"name": "apple7","age": "19","city": "JiaXing","address": "110.00","ts": "1556421570000"}
{"name": "apple8","age": "20","city":  "NingBo","address": "100.00","ts": "1556421630000"}
{"name": "apple9","age": "17","city": "JiangXi","address": "110.00","ts": "1556421655000"}

 

完整工程如下:

 

 

Reference:

[1]FlinkSQL使用DDL语句创建kafka源表

[2]Kafka连接服务器出现:Connection to node 1 (localhost/127.0.0.1:9092) could not be established.

[6]Is there a way to delete all the data from a topic or delete the topic before every run?

 

 

版权声明
本文为[Applied Sciences]所创,转载请带上原文链接,感谢
https://yuchi.blog.csdn.net/article/details/111620302

Scroll to Top