编程知识 cdmana.com

17-Flink消费Kafka写入Mysql

戳更多文章:

在这里插入图片描述

你可能感兴趣的文章:
1-Flink入门
2-本地环境搭建&构建第一个Flink应用
3-DataSet API
4-DataSteam API
5-集群部署
6-分布式缓存
7-重启策略
8-Flink中的窗口
9-Flink中的Time
Flink时间戳和水印
Broadcast广播变量
FlinkTable&SQL
Flink实战项目实时热销排行
Flink写入RedisSink
Flink消费Kafka写入Mysql
Flink组件和逻辑计划
Flink执行计划生成
JobManager中的基本组件(1)
JobManager中的基本组件(2)
JobManager中的基本组件(3)
TaskManager
算子
网络
水印WaterMark
CheckPoint
任务调度与负载均衡
异常处理
Alibaba Blink新特性
Java高级特性增强-集合
Java高级特性增强-多线程
Java高级特性增强-Synchronized
Java高级特性增强-volatile
Java高级特性增强-并发集合框架
Java高级特性增强-分布式
Java高级特性增强-Zookeeper
Java高级特性增强-JVM
Java高级特性增强-NIO
Java高级特性增强-Netty

本文介绍消费Kafka的消息实时写入Mysql。

  1. maven新增依赖:
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.39</version>
</dependency>

2.重写RichSinkFunction,实现一个Mysql Sink

public class MysqlSink extends
    RichSinkFunction<Tuple3<Integer, String, Integer>> {
private Connection connection;
private PreparedStatement preparedStatement;
String username = "";
String password = "";
String drivername = "";   //配置改成自己的配置
String dburl = "";

@Override
public void invoke(Tuple3<Integer, String, Integer> value) throws Exception {
    Class.forName(drivername);
    connection = DriverManager.getConnection(dburl, username, password);
    String sql = "replace into table(id,num,price) values(?,?,?)"; //假设mysql 有3列 id,num,price
    preparedStatement = connection.prepareStatement(sql);
    preparedStatement.setInt(1, value.f0);
    preparedStatement.setString(2, value.f1);
    preparedStatement.setInt(3, value.f2);
    preparedStatement.executeUpdate();
    if (preparedStatement != null) {
        preparedStatement.close();
    }
    if (connection != null) {
        connection.close();
    }
}
}
  1. Flink主类
public class MysqlSinkTest {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");

// 1,abc,100  类似这样的数据,当然也可以是很复杂的json数据,去做解析
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
env.getConfig().disableSysoutLogging();  //设置此可以屏蔽掉日记打印情况
env.getConfig().setRestartStrategy(
        RestartStrategies.fixedDelayRestart(5, 5000));
env.enableCheckpointing(2000);
DataStream<String> stream = env
        .addSource(consumer);

DataStream<Tuple3<Integer, String, Integer>> sourceStream = stream.filter((FilterFunction<String>) value -> StringUtils.isNotBlank(value))
                                                                        .map((MapFunction<String, Tuple3<Integer, String, Integer>>) value -> {
    String[] args1 = value.split(",");
    return new Tuple3<Integer, String, Integer>(Integer
            .valueOf(args1[0]), args1[1],Integer
            .valueOf(args1[2]));
});

sourceStream.addSink(new MysqlSink());
env.execute("data to mysql start");
}
}

所有代码,我放在了我的公众号,回复Flink可以下载

在这里插入图片描述

版权声明
本文为[蜡笔小新v]所创,转载请带上原文链接,感谢
https://blog.51cto.com/u_9928699/2892999

Scroll to Top