编程知识 cdmana.com

Flink's sink: customization

Welcome to visit mine GitHub

https://github.com/zq2599/blog_demos

Content : All original articles classified summary and supporting source code , involve Java、Docker、Kubernetes、DevOPS etc. ;

An overview of this article

Flink Official sink Services may not meet our needs , At this point, you can develop a custom sink, The text comes together to fight ;

Full series Links

  1. 《Flink Of sink One of the real battles : On 》
  2. 《Flink Of sink The second part of the actual battle :kafka》
  3. 《Flink Of sink The third part of the actual battle :cassandra3》
  4. 《Flink Of sink The fourth part of the actual battle : Customize 》

Inheritance relationships

  1. Before the official coding , Make sure you are right first sink How is ability realized , We've actually fought before print、kafka、cassandra etc. sink operation , The inheritance relationship of the core class is shown in the figure below :  Insert picture description here
  2. Visible implementation sink The key to competence , It's the realization of <font color="blue">RichFunction and SinkFunction</font> Interface , The former is used for resource control ( Such as open、close Wait for the operation ), The latter is responsible for sink The specific operation of , Take a look at the simplest PrintSinkFunction How does the class implement SinkFunction Interface invoke Method :
@Override
public void invoke(IN record) {
  writer.write(record);
}
  1. Right now sink The basic logic of , You can start coding the actual battle ;

Content and version

This actual battle is very simple : Customize sink, Used to write data to MySQL, The version information involved is as follows :

  1. jdk:1.8.0_191
  2. flink:1.9.2
  3. maven:3.6.0
  4. flink The operating system :CentOS Linux release 7.7.1908
  5. MySQL:5.7.29
  6. IDEA:2018.3.5 (Ultimate Edition)

Source download

If you don't want to write code , The source code of the whole series can be found in GitHub Download to , The address and link information is shown in the following table (https://github.com/zq2599/blog_demos):

name link remarks
Project home page https://github.com/zq2599/blog_demos The project is in progress. GitHub Home page on
git Warehouse address (https) https://github.com/zq2599/blog_demos.git The warehouse address of the source code of the project ,https agreement
git Warehouse address (ssh) git@github.com:zq2599/blog_demos.git The warehouse address of the source code of the project ,ssh agreement

This git Multiple folders in project , The application of this chapter in <font color="blue">flinksinkdemo</font> Under the folder , As shown in the red box below :  Insert picture description here

Database preparation

Please send MySQL Get ready , And execute the following sql, For creating databases flinkdemo And table student:

create database if not exists flinkdemo;
USE flinkdemo;
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(25) COLLATE utf8_bin DEFAULT NULL,
  `age` int(10) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

code

  1. Use 《Flink Of sink The second part of the actual battle :kafka》 Created in the flinksinkdemo engineering ;
  2. stay pom.xml add mysql Dependence :
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>8.0.11</version>
</dependency>
  1. Create and database student The entity class corresponding to the table Student.java:
package com.bolingcavalry.customize;

public class Student {
    private int id;
    private String name;
    private int age;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public Student(String name, int age) {
        this.name = name;
        this.age = age;
    }
}
  1. Create custom sink class MySQLSinkFunction.java, This is the core of this article , About database connection 、 To break off 、 Write data is concentrated here :
package com.bolingcavalry.customize;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class MySQLSinkFunction extends RichSinkFunction<Student> {

    PreparedStatement preparedStatement;

    private Connection connection;

    private ReentrantLock reentrantLock = new ReentrantLock();

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        // Prepare database related instances 
        buildPreparedStatement();
    }

    @Override
    public void close() throws Exception {
        super.close();

        try{
            if(null!=preparedStatement) {
                preparedStatement.close();
                preparedStatement = null;
            }
        } catch(Exception e) {
            e.printStackTrace();
        }

        try{
            if(null!=connection) {
                connection.close();
                connection = null;
            }
        } catch(Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void invoke(Student value, Context context) throws Exception {
        preparedStatement.setString(1, value.getName());
        preparedStatement.setInt(2, value.getAge());
        preparedStatement.executeUpdate();
    }

    /**
     *  Get ready connection and preparedStatement
     *  obtain mysql The connection instance , Consider multithreading synchronization ,
     *  no need synchronize Because getting a database connection is a remote operation , Time is uncertain 
     * @return
     */
    private void buildPreparedStatement() {
        if(null==connection) {
            boolean hasLock = false;
            try {
                hasLock = reentrantLock.tryLock(10, TimeUnit.SECONDS);

                if(hasLock) {
                    Class.forName("com.mysql.cj.jdbc.Driver");
                    connection = DriverManager.getConnection("jdbc:mysql://192.168.50.43:3306/flinkdemo?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC", "root", "123456");
                }

                if(null!=connection) {
                    preparedStatement = connection.prepareStatement("insert into student (name, age) values (?, ?)");
                }
            } catch (Exception e) {
                // Careful use of production environment 
                e.printStackTrace();
            } finally {
                if(hasLock) {
                    reentrantLock.unlock();
                }
            }
        }
    }
}
  1. The above code is simple , Just notice that locks are used to control multithreading synchronization when creating connections , And the higher version mysql Drive the corresponding driver and uri The way of writing is the same as before 5.x Differences in versions ;
  2. Create a task class StudentSink.java, Used to create a flink Mission , Inside through ArrayList Created a dataset , And then directly addSink, In order to see DAG, call disableChaining The method cancelled operator chain:
package com.bolingcavalry.customize;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;

public class StudentSink {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // The degree of parallelism is 1
        env.setParallelism(1);

        List<Student> list = new ArrayList<>();
        list.add(new Student("aaa", 11));
        list.add(new Student("bbb", 12));
        list.add(new Student("ccc", 13));
        list.add(new Student("ddd", 14));
        list.add(new Student("eee", 15));
        list.add(new Student("fff", 16));

        env.fromCollection(list)
            .addSink(new MySQLSinkFunction())
            .disableChaining();

        env.execute("sink demo : customize mysql obj");
    }
}
  1. stay flink web Page submit task , And set the task class :  Insert picture description here
  2. When the task is completed ,DAG The figure shows that both the task and the number of records are as expected :  Insert picture description here
  3. Go check the database , Discovery data has been written :

 Insert picture description here

thus , Customize sink The actual battle of has been completed , I hope this article can give you some reference ;

Welcome to the official account : Xinchen, programmer

WeChat search 「 Xinchen, programmer 」, I'm Xinchen , Looking forward to traveling with you Java The world ... https://github.com/zq2599/blog_demos

版权声明
本文为[Programmer Xinchen]所创,转载请带上原文链接,感谢

Scroll to Top