编程知识 cdmana.com

Flink SQL client reads Kafka + stream computing (DDL mode + code embedded DDL / SQL mode)

###################################################################################################################

Purpose

This paper is a reference [1] The high version of is reproduced on

###################################################################################################################

Environment and configuration

Components edition
Flink 1.12
Hive 3.1.2
mysql 8.0.22-0ubuntu0.20.04.2
Zookeeper 3.6.0
Hadoop 3.1.2
Ubuntu 20.04

 

###################################################################################################################

Steps

service firewalld stop( Turn off firewall )

Start hadoop

Leave security mode

Start zookeeper And kafka Jiji

start-up flink colony

  There is no need for additional .yaml Configuration of files , It's using DDL The way

operation command remarks
see topic $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181

If you want to delete topic, It can be :

 


 

Go to my_topic This topic send out json news $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic my_topic

You may come across it here [2] Wrong report in , Check the port and configuration file in the command server.properties Medium listeners The port of is strictly consistent

[2] The error reported in may also be from a node kafka It's caused by hanging up .

 

May encounter [3]

Pay attention to turn off the firewall

 

 

Use kafka Bring your own consumer to test consumption $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic my_topic

If kafka There's something wrong with your own consumer test , So you don't have to go down there ,

If you use Flink SQL Client There are bound to be problems with consumption

eliminate topic All data in [6]

( because , What if you make a mistake ? Right )

$KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic my_topic

need $KAFKA/config/server.properties Set up

delete.topic.enable=true

kafka Data input from the production side :

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662868", "item_id":"1784", "category_id": "54123654", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662854", "item_id":"1456", "category_id": "12345678", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662858", "item_id":"1457", "category_id": "12345679", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
 

###################################################################################################################################################

SQL Client+DDL The way - experimental result

  DDL/SQL Experimental results
Create a table ( docking kafka) CREATE TABLE user_log1 (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts VARCHAR
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'my_topic',
    'connector.startup-mode' = 'earliest-offset',
    'connector.properties.group.id' = 'testGroup',
    'connector.properties.zookeeper.connect' = 'Desktop:2181,Laptop:2181,Laptop:2183',
    'connector.properties.bootstrap.servers' = 'Desktop:9091',
    'format.type' = 'json'
);
Flow calculation select item_id,count(*) from user_log1 group by item_id;

###################################################################################################################################################

Maven engineering +DDL The way

Code mode reference [2], Self run through the code as follows :

https://gitee.com/appleyuchi/Flink_Code/blob/master/FLINK Read and write all kinds of data sources /Java/src/main/java/KafkaFlinkDDL.java

 

 

 

 

Reference:

[1]Flink adopt SQLClinet establish kafka Source table and real-time calculation

[2]Flink adopt SQLClinet/Java Code creation kafka Source table , Appoint Offset consumption , And do real-time calculations , Last sink To mysql In the table

版权声明
本文为[Applied Sciences]所创,转载请带上原文链接,感谢
https://cdmana.com/2020/12/20201224214810590d.html

Scroll to Top