编程知识 cdmana.com

The keyedprocessfunction of Flink

KeyedProcessFunction Method : It's based on KeyedStream Stream processing method , Can be flexible for each input KeyedStream Do logical processing , And support timer time events

Sample environment

java.version: 1.8.x
flink.version: 1.11.1

Sample data source  ( Project code cloud download )

Flink System examples And Build development environment and data

KeyedProcess.java

import com.flink.examples.DataSource;
import com.google.gson.Gson;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.util.Date;
import java.util.List;

/**
 * @Description KeyedProcessFunction Method : It's based on KeyedStream Stream processing method , Can be flexible for each input KeyedStream Do logical processing , And support timer time events 
 */
public class KeyedProcess {

    /**
     *  Ergodic set , Print gender specific information separately , For execution timeout , Automatic trigger timer 
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Use event time, You need to specify the timestamp of the event 
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
        DataStream<String> dataStream = env.fromCollection(tuple3List)
                .keyBy((KeySelector<Tuple3<String, String, Integer>, String>) k -> k.f1)
                // Deal with every keyBy After each data stream ,process Methods are usually applied to KeyedStream Type of data stream processing 
                .process(new KeyedProcessFunction<String, Tuple3<String, String, Integer>, String>() {
                    // You can cache the same keyBy Context state under 
                    private ValueState<String> name ;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        // Initialize custom context , And specify the type as String
                        name = getRuntimeContext().getState(new ValueStateDescriptor<String>("name", String.class));
                    }
                    /**
                     *   Processing every input data stream 
                     *   ProcessFunction  It can be regarded as a kind of providing for  KeyedState  And timer access  FlatMapFunction.
                     *    Every time an event is received in the input stream , This function will be called to handle .
                     *    For fault tolerant state ,ProcessFunction  Can pass  RuntimeContext  visit  KeyedState, Similar to other stateful function access  KeyedState.
                     * @param value      Data stream object 
                     * @param ctx        Context information 
                     * @param out        The set of output results 
                     * @throws Exception
                     */
                    @Override
                    public void processElement(Tuple3<String, String, Integer> value, Context ctx, Collector<String> out) throws Exception {
                        // Simulation timeout , If Wang Wu , Then pause 5s
                        if (value.f0.equals(" Wang Wu ")){
                            name.update(value.f0);
                            // register ProcessingTime Timer for processing time , Trigger when the time reaches the specified value onTimer event , Such as : The current element is due to 3s Internal processing finished , Timeout triggers onTimer Method 
                            ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + (3 * 1000));
                            // simulation 5s Overtime 
                            Thread.sleep(5 * 1000);
                        }
                        out.collect(new Gson().toJson(value));
                    }
                    /**
                     *  Called when the registered timer triggers 
                     *  Timers  The timer can deal with the change of processing time and event time .
                     *   Every time you call  processElement()  You can get one  Context  object , Through this object, you can access the event timestamp of the element as well as  TimerService.
                     *  TimerService  It can be time for events that have not yet happened / Processing time real registration callback .
                     *   When the timer reaches a certain moment , Would call  onTimer()  Method .
                     *   During the call , All States are again limited to the keys created by the timer , Allow timer to operate  KeyedState.
                     * @param timestamp  The trigger time stamp set by the timer 
                     * @param ctx        Context information 
                     * @param out        The set of output results 
                     * @throws Exception
                     */
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        super.onTimer(timestamp, ctx, out);
                        String timeStr = DateFormatUtils.format(new Date(timestamp), "yyyy-MM-dd HH:mm:ss");
                        out.collect(ctx.getCurrentKey() + ", user :"+ name.value() + ", Current task processing timed out :" + timeStr);
                        // Delete timer trigger time 
                        ctx.timerService().deleteProcessingTimeTimer(timestamp);
                        name.update(null);
                    }
                });
        dataStream.print();
        env.execute("flink Filter job");
    }
}

Print the results

{"f0":" Zhang San ","f1":"man","f2":20}
{"f0":" Li Si ","f1":"girl","f2":24}
{"f0":" Wang Wu ","f1":"man","f2":29}
man, user : Wang Wu , Current task processing timed out :2020-09-16 11:17:15
{"f0":" Liu Liu ","f1":"girl","f2":32}
{"f0":" Wu Qi ","f1":"girl","f2":18}
{"f0":" Wu ba ","f1":"man","f2":30}

 

版权声明
本文为[The little dragon who can't fly]所创,转载请带上原文链接,感谢
https://cdmana.com/2020/12/20201225110425815s.html

Scroll to Top