编程知识 cdmana.com

Flink Handling Function Real War II: processfunction class, Java thread interview subject

FlinkFonction de traitement combat réel 2:ProcessFunctionCatégorie,javaQuestions d'entrevue par fil_Java

 Créer un projet

Exécutez la commande suivante pour créer unflink-1.9.2Ingénierie des applications:

mvn \

archetype:generate \

-DarchetypeGroupId=org.apache.flink \

-DarchetypeArtifactId=flink-quickstart-java \

-DarchetypeVersion=

《Grandes usines de première ligneJavaAnalyse des questions d'entrevue+Notes d'apprentissage pour le développement de l'arrière - plan+La dernière vidéo d'architecture+Document d'information sur le code source du projet en direct》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 Partage open source du contenu complet

1.9.2

Saisissez à l'invitegroupId:com.bolingcavalry,architectid:flinkdemo

 Le premierdemo

Le premierdemoPour expérimenter les deux caractéristiques suivantes:

  1. Travailler avec des éléments individuels;

  2. TIMESTAMP d'accès;

CréationSimple.java,Il se lit comme suit::

package com.bolingcavalry.processfunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.TimeCharacteristic;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.ProcessFunction;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import org.apache.flink.util.Collector;

public class Simple {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// Parallélisme1

env.setParallelism(1);

// Définir la source de données, Trois éléments

DataStream<Tuple2<String,Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {

@Override

public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {

for(int i=1; i<4; i++) {

String name = “name” + i;

Integer value = i;

long timeStamp = System.currentTimeMillis();

// Les données et l'horodatage seront imprimés , Utilisé pour valider les données

System.out.println(String.format(“source,%s, %d, %d\n”,

name,

value,

timeStamp));

// Lancer un élément , Et l'horodatage

ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, value), timeStamp);

// Pour que chaque élément ait un horodatage différent , Chaque lancement est retardé 10MS

Thread.sleep(10);

}

}

@Override

public void cancel() {

}

});

// Filtrer les éléments avec des valeurs impaires

SingleOutputStreamOperator<String> mainDataStream = dataStream

.process(new ProcessFunction<Tuple2<String, Integer>, String>() {

@Override

public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {

// f1 Un élément avec un champ impair n'entre pas dans l'opérateur suivant

if(0 == value.f1 % 2) {

out.collect(String.format(“processElement,%s, %d, %d\n”,

value.f0,

value.f1,

ctx.timestamp()));

}

}

});

// Imprimer les résultats, Preuve de chaque élément timestamp En effet. ProcessFunctionAcquis

mainDataStream.print();

env.execute(“processfunction demo : simple”);

}

}

Voici une introduction au code ci - dessus :

  1. Créer une source de données,Chaque10 Milliseconde pour envoyer un élément ,Trois.,Le type estTuple2,f0C'est une chaîne,f1C'est un lifting., Chaque élément est horodaté ;

  2. Lorsque la source de données émet un élément , Mettre l'élément à l'avance f0、f1、 L'horodatage est imprimé , Vérifier la cohérence avec les données suivantes ;

  3. Dans le traitement ultérieur ,CrééProcessFunction Sous - classe anonyme de , Il peut traiter chaque élément envoyé en amont , Et vous pouvez obtenir l'horodatage de chaque élément ( Cette capacité est importante ),Et ensuite,f1 Le champ est un nombre impair d'éléments filtrés ;

  4. Enfin,ProcessFunction Les données traitées sont imprimées , Vérifier que les résultats du traitement sont conformes aux attentes ;

Exécution directeSimpleCatégorie,Les résultats sont les suivants:, Le filtrage visible et l'extraction de l'horodatage ont tous deux réussi :

FlinkFonction de traitement combat réel 2:ProcessFunctionCatégorie,javaQuestions d'entrevue par fil_Java_02

 Deuxièmedemo

Deuxièmedemo Est d'obtenir une sortie de dérivation (Side Outputs),Pour unDataStreamDis, Les données peuvent être exportées par contournement vers d'autres opérateurs , Sans affecter le traitement de l'opérateur original , Voici une démonstration de la sortie de dérivation :

CréationSideOutputCatégorie:

package com.bolingcavalry.processfunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.ProcessFunction;

import org.apache.flink.util.Collector;

import org.apache.flink.util.OutputTag;

import java.util.ArrayList;

import java.util.List;

public class SideOutput {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Parallélisme1

env.setParallelism(1);

// DéfinitionOutputTag

final OutputTag<String> outputTag = new OutputTag<String>(“side-output”){};

// Créer unList,Il y en a deux.Tuple2Élément

List<Tuple2<String, Integer>> list = new ArrayList<>();

list.add(new Tuple2(“aaa”, 1));

list.add(new Tuple2(“bbb”, 2));

list.add(new Tuple2(“ccc”, 3));

//AdoptionListCréationDataStream

DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);

// Tous les éléments entrent dans mainDataStream,f1 Les éléments dont le champ est impair entrent dans SideOutput

SingleOutputStreamOperator<String> mainDataStream = fromCollectionDataStream

.process(new ProcessFunction<Tuple2<String, Integer>, String>() {

@Override

public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {

// L'opérateur suivant dans le courant principal

out.collect("main, name : " + value.f0 + ", value : " + value.f1);

//f1 Les éléments dont le champ est impair entrent dans SideOutput

if(1 == value.f1 % 2) {

ctx.output(outputTag, "side, name : " + value.f0 + ", value : " + value.f1);

}

}

});

// Interdictionchanin, De cette façon, vous pouvez voir clairement l'original sur la page DAG

mainDataStream.disableChaining();

// Obtenir des données de contournement

Enfin

Si vous trouvez cet article utile,Autant me faire un compliment,Fais attention!

FlinkFonction de traitement combat réel 2:ProcessFunctionCatégorie,javaQuestions d'entrevue par fil_Java_03

FlinkFonction de traitement combat réel 2:ProcessFunctionCatégorie,javaQuestions d'entrevue par fil_Interview_04

Cet article a été publié par CODINGProjet Open Source:【Grandes usines de première ligneJavaAnalyse des questions d'entrevue+Résumé de base notes d'étude+Dernière vidéo d'explication+Code source du projet opérationnel】Inclus

版权声明
本文为[Harmonios Learning]所创,转载请带上原文链接,感谢
https://cdmana.com/2021/11/20211125174940970h.html

Scroll to Top