编程知识 cdmana.com

Flink处理函数实战之二:ProcessFunction类,java线程面试题目

Flink处理函数实战之二:ProcessFunction类,java线程面试题目_Java

 创建工程

执行以下命令创建一个flink-1.9.2的应用工程:

mvn \

archetype:generate \

-DarchetypeGroupId=org.apache.flink \

-DarchetypeArtifactId=flink-quickstart-java \

-DarchetypeVersion=

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享

1.9.2

按提示输入groupId:com.bolingcavalry,architectid:flinkdemo

 第一个demo

第一个demo用来体验以下两个特性:

  1. 处理单个元素;

  2. 访问时间戳;

创建Simple.java,内容如下:

package com.bolingcavalry.processfunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.TimeCharacteristic;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.ProcessFunction;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import org.apache.flink.util.Collector;

public class Simple {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// 并行度为1

env.setParallelism(1);

// 设置数据源,一共三个元素

DataStream<Tuple2<String,Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {

@Override

public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {

for(int i=1; i<4; i++) {

String name = “name” + i;

Integer value = i;

long timeStamp = System.currentTimeMillis();

// 将将数据和时间戳打印出来,用来验证数据

System.out.println(String.format(“source,%s, %d, %d\n”,

name,

value,

timeStamp));

// 发射一个元素,并且戴上了时间戳

ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, value), timeStamp);

// 为了让每个元素的时间戳不一样,每发射一次就延时10毫秒

Thread.sleep(10);

}

}

@Override

public void cancel() {

}

});

// 过滤值为奇数的元素

SingleOutputStreamOperator<String> mainDataStream = dataStream

.process(new ProcessFunction<Tuple2<String, Integer>, String>() {

@Override

public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {

// f1字段为奇数的元素不会进入下一个算子

if(0 == value.f1 % 2) {

out.collect(String.format(“processElement,%s, %d, %d\n”,

value.f0,

value.f1,

ctx.timestamp()));

}

}

});

// 打印结果,证明每个元素的timestamp确实可以在ProcessFunction中取得

mainDataStream.print();

env.execute(“processfunction demo : simple”);

}

}

这里对上述代码做个介绍:

  1. 创建一个数据源,每个10毫秒发出一个元素,一共三个,类型是Tuple2,f0是个字符串,f1是整形,每个元素都带时间戳;

  2. 数据源发出元素时,提前把元素的f0、f1、时间戳打印出来,和后面的数据核对是否一致;

  3. 在后面的处理中,创建了ProcessFunction的匿名子类,里面可以处理上游发来的每个元素,并且还能取得每个元素的时间戳(这个能力很重要),然后将f1字段为奇数的元素过滤掉;

  4. 最后将ProcessFunction处理过的数据打印出来,验证处理结果是否符合预期;

直接执行Simple类,结果如下,可见过滤和提取时间戳都成功了:

Flink处理函数实战之二:ProcessFunction类,java线程面试题目_Java_02

 第二个demo

第二个demo是实现旁路输出(Side Outputs),对于一个DataStream来说,可以通过旁路输出将数据输出到其他算子中去,而不影响原有的算子的处理,下面来演示旁路输出:

创建SideOutput类:

package com.bolingcavalry.processfunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.ProcessFunction;

import org.apache.flink.util.Collector;

import org.apache.flink.util.OutputTag;

import java.util.ArrayList;

import java.util.List;

public class SideOutput {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 并行度为1

env.setParallelism(1);

// 定义OutputTag

final OutputTag<String> outputTag = new OutputTag<String>(“side-output”){};

// 创建一个List,里面有两个Tuple2元素

List<Tuple2<String, Integer>> list = new ArrayList<>();

list.add(new Tuple2(“aaa”, 1));

list.add(new Tuple2(“bbb”, 2));

list.add(new Tuple2(“ccc”, 3));

//通过List创建DataStream

DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);

//所有元素都进入mainDataStream,f1字段为奇数的元素进入SideOutput

SingleOutputStreamOperator<String> mainDataStream = fromCollectionDataStream

.process(new ProcessFunction<Tuple2<String, Integer>, String>() {

@Override

public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {

//进入主流程的下一个算子

out.collect("main, name : " + value.f0 + ", value : " + value.f1);

//f1字段为奇数的元素进入SideOutput

if(1 == value.f1 % 2) {

ctx.output(outputTag, "side, name : " + value.f0 + ", value : " + value.f1);

}

}

});

// 禁止chanin,这样可以在页面上看清楚原始的DAG

mainDataStream.disableChaining();

// 取得旁路数据

最后

如果觉得本文对你有帮助的话,不妨给我点个赞,关注一下吧!

Flink处理函数实战之二:ProcessFunction类,java线程面试题目_Java_03

Flink处理函数实战之二:ProcessFunction类,java线程面试题目_面试_04

本文已被 CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录

版权声明
本文为[HarmonyOS学习]所创,转载请带上原文链接,感谢
https://blog.51cto.com/u_15438507/4690206

Scroll to Top