编程知识 cdmana.com

Flink SQL Client读Kafka+流计算(DDL方式+代碼嵌入DDL/SQL方式)

###################################################################################################################

目的

本文是对参考文献[1]在高版本上的的复现

###################################################################################################################

环境与配置

组件 版本
Flink 1.12
Hive 3.1.2
mysql 8.0.22-0ubuntu0.20.04.2
Zookeeper 3.6.0
Hadoop 3.1.2
Ubuntu 20.04

 

###################################################################################################################

步驟

service firewalld stop(关闭防火墙)

啓動hadoop

離開安全模式

啓動zookeeper與kafka集羣

启动flink集群

 该实验不需要额外的.yaml文件的配置,采用的是DDL方式

操作 命令 备注
查看topic $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181

如果想删除topic,可以是:

 


 

my_topic 这个 topic发送 json 消息 $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic my_topic

这里可能碰到[2]中的报错,注意检查命令中端口与配置文件server.properties中的listeners的端口严格保持一致

[2]中的报错还可能是某个节点的kafka挂掉导致的.

 

可能碰到[3]

注意关闭防火墙

 

 

使用kafka自带消费端测试下消费 $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic my_topic

如果kafka自带消费者测试有问题,那么就不用继续往下面做了,

此时如果使用Flink SQL Client来消费也必然会出现问题

清除topic中所有数据[6]

(因为,万一你输错了呢?对吧)

$KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic my_topic

需要$KAFKA/config/server.properties设置

delete.topic.enable=true

kafka生產端輸入的數據:

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662868", "item_id":"1784", "category_id": "54123654", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662854", "item_id":"1456", "category_id": "12345678", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662858", "item_id":"1457", "category_id": "12345679", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
 

###################################################################################################################################################

SQL Client+DDL方式-实验结果

  DDL/SQL 实验效果
建立表(对接kafka) CREATE TABLE user_log1 (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts VARCHAR
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'my_topic',
    'connector.startup-mode' = 'earliest-offset',
    'connector.properties.group.id' = 'testGroup',
    'connector.properties.zookeeper.connect' = 'Desktop:2181,Laptop:2181,Laptop:2183',
    'connector.properties.bootstrap.servers' = 'Desktop:9091',
    'format.type' = 'json'
);
流计算 select item_id,count(*) from user_log1 group by item_id;

###################################################################################################################################################

Maven工程+DDL方式

代碼方式參考[2],自己運行通過的代碼如下:

https://gitee.com/appleyuchi/Flink_Code/blob/master/FLINK读写各种数据源/Java/src/main/java/KafkaFlinkDDL.java

 

 

 

 

Reference:

[1]Flink通过SQLClinet创建kafka源表并进行实时计算

[2]Flink通过SQLClinet/Java代码创建kafka源表,指定Offset消费,并进行实时计算,最后sink到mysql表中

版权声明
本文为[Applied Sciences]所创,转载请带上原文链接,感谢
https://yuchi.blog.csdn.net/article/details/111651273

Scroll to Top