编程知识 cdmana.com

Springboot integrates Apache spark 2.4.4 and scala 2.12

Catalog

Maven Depend on the configuration

project yml To configure

Spark Java To configure

Main Example of function

Spark Service

Spark Streaming Receiver

Spark Submit task to cluster

Program jar Package run


Apache Spark Is a unified analysis engine for large-scale data processing . It provides Java,Scala,Python and R Advanced in API, And the optimization engine supporting regular execution graph . It also supports a rich set of higher-level tools , Include Spark SQL be used for SQL And structured data processing ,MLlib machine learning ,GraphX For graphic processing , as well as Structured flow Incremental computation and stream processing of .

Please note that , stay Spark 2.0 Before ,Spark The main programming interface is elastic distributed data set (RDD). stay Spark 2.0 after ,RDD By Dataset replace ,Dataset It's like RDD Just as strong , But there's a richer optimization in the background . Still support RDD Interface , You can go to RDD In the programming guide For more detailed reference . however , We strongly recommend that you switch to using datasets , The performance ratio of this dataset is RDD Better . see also SQL Programming Guide For more information about the dataset .

Apache Spark 3.0 Please refer to : Quick start

Maven Depend on the configuration

pom.xml


    <properties>
        <maven.test.skip>true</maven.test.skip>
        <java.version>1.8</java.version>
        <scala.version>2.12</scala.version>
        <spark.version>2.4.4</spark.version>
    </properties>


 <!-- Spark dependency  start -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Spark dependency  end -->

project yml To configure

# ==== Server settings configuration ====
server:
  port: 8090

# ====Spark  Data analysis flow engine configuration ==https://my.oschina.net/woter/blog/1843755==
# ====Spark SQL  The way HiveSql https://www.cnblogs.com/cord/p/9530404.html
spark:
  spark-home: .
  # appName  The parameter is a cluster  UI  The name of the application is shown on the 
  app-name: sparkTest
  # master  It's a  Spark,Mesos  or  YARN  Of  cluster URL, Or designated in  local mode( Local mode ) Running in  “local”  character string 
  master: local[4]

Spark Java To configure

package com.patrol.spark.config;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

/**
 * @Copyright: 2019-2021
 * @FileName: SparkConfiguration.java
 * @Author: PJL
 * @Date: 2020/9/1 16:56
 * @Description: Spark To configure 
 */
@Component
@Configuration
@ConfigurationProperties(prefix = "spark")
@EnableConfigurationProperties
public class SparkConfiguration {

    private String sparkHome = ".";

    /**
     * appName  The parameter is a cluster  UI  The name of the application is shown on the 
     */
    private String appName = "sparkPatrol";

    /**
     * master  It's a  Spark,Mesos  or  YARN  Of  cluster URL, Or designated in  local mode( Local mode ) Running in  “local”  character string 
     */
    private String master = "local";

    @Bean
    @ConditionalOnMissingBean(SparkConf.class)
    public SparkConf sparkConf()  {
        SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
        return conf;
    }

    @Bean
    @ConditionalOnMissingBean(JavaSparkContext.class)
    public JavaSparkContext javaSparkContext()  {
        return new JavaSparkContext(sparkConf());
    }

    public String getSparkHome() {
        return sparkHome;
    }

    public void setSparkHome(String sparkHome) {
        this.sparkHome = sparkHome;
    }

    public String getAppName() {
        return appName;
    }

    public void setAppName(String appName) {
        this.appName = appName;
    }

    public String getMaster() {
        return master;
    }

    public void setMaster(String master) {
        this.master = master;
    }
}

Main Example of function

package com.patrol.spark.examples;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;

/**
 * @Copyright: 2019-2021
 * @FileName: WordCount.java
 * @Author: PJL
 * @Date: 2020/9/1 16:39
 * @Description:  Word counting statistics 
 */
public class WordCount {

    /**
     *  Function entrance 
     *
     * @param args
     */
    public static void main(String[] args) {
        String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
        SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
        Dataset<String> logData = spark.read().textFile(logFile).cache();
        //  Aggregate calculation 
        long numAs = logData.filter("a").count();
        long numBs = logData.filter("b").count();

        System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
        //  Stop job 
        spark.stop();
    }
}

Spark Service

package com.patrol.spark.service;


import com.patrol.spark.receiver.CustomStreamingReceiver;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.spark_project.guava.base.Joiner;
import org.spark_project.guava.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import scala.Tuple2;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

/**
 * @Copyright: 2019-2021
 * @FileName: SparkService.java
 * @Author: PJL
 * @Date: 2020/9/1 17:02
 * @Description: Spark service 
 */
@Slf4j
@Service
public class SparkService {

    private static final Pattern SPACE = Pattern.compile(" ");

    @Autowired
    private JavaSparkContext javaSparkContext;


    public Map<String, Object> calculateTopTen() {

        Map<String, Object> result = new HashMap<>();
        JavaRDD<String> lines = javaSparkContext.textFile("src/test/java/test.txt").cache();

        System.out.println();
        System.out.println("-------------------------------------------------------");
        System.out.println(lines.count());

        JavaRDD<String> words = lines.flatMap(str -> Arrays.asList(SPACE.split(str)).iterator());

        JavaPairRDD<String, Integer> ones = words.mapToPair(str -> new Tuple2<String, Integer>(str, 1));

        JavaPairRDD<String, Integer> counts = ones.reduceByKey((Integer i1, Integer i2) -> (i1 + i2));

        JavaPairRDD<Integer, String> temp = counts.mapToPair(tuple -> new Tuple2<Integer, String>(tuple._2, tuple._1));

        JavaPairRDD<String, Integer> sorted = temp.sortByKey(false).mapToPair(tuple -> new Tuple2<String, Integer>(tuple._2, tuple._1));

        System.out.println();
        System.out.println("-------------------------------------------------------");
        System.out.println(sorted.count());

        //List<Tuple2<String, Integer>> output = sorted.collect();

        //List<Tuple2<String, Integer>> output = sorted.take(10);

        List<Tuple2<String, Integer>> output = sorted.top(10);

        for (Tuple2<String, Integer> tuple : output) {
            result.put(tuple._1(), tuple._2());
        }

        return result;
    }

    /**
     *  practice demo, Be familiar with it API
     */
    public void sparkExerciseDemo() {
        List<Integer> data = Lists.newArrayList(1, 2, 3, 4, 5, 6);
        JavaRDD<Integer> rdd01 = javaSparkContext.parallelize(data);
        rdd01 = rdd01.map(num -> {
            return num * num;
        });
        //data map :1,4,9,16,25,36
        log.info("data map :{}", Joiner.on(",").skipNulls().join(rdd01.collect()).toString());

        rdd01 = rdd01.filter(x -> x < 6);

        //data filter :1,4
        log.info("data filter :{}", Joiner.on(",").skipNulls().join(rdd01.collect()).toString());

        rdd01 = rdd01.flatMap(x -> {
            Integer[] test = {x, x + 1, x + 2};
            return Arrays.asList(test).iterator();
        });

        //flatMap :1,2,3,4,5,6
        log.info("flatMap :{}", Joiner.on(",").skipNulls().join(rdd01.collect()).toString());

        JavaRDD<Integer> unionRdd = javaSparkContext.parallelize(data);

        rdd01 = rdd01.union(unionRdd);

        //union :1,2,3,4,5,6,1,2,3,4,5,6
        log.info("union :{}", Joiner.on(",").skipNulls().join(rdd01.collect()).toString());

        List<Integer> result = Lists.newArrayList();
        result.add(rdd01.reduce((Integer v1, Integer v2) -> {
            return v1 + v2;
        }));

        //reduce :42
        log.info("reduce :{}", Joiner.on(",").skipNulls().join(result).toString());
        result.forEach(System.out::print);

        JavaPairRDD<Integer, Iterable<Integer>> groupRdd = rdd01.groupBy(x -> {
            log.info("======grouby========:{}", x);
            if (x > 10) return 0;
            else return 1;
        });

        List<Tuple2<Integer, Iterable<Integer>>> resultGroup = groupRdd.collect();

        //group by  key:1 value:1,2,3,4,5,6,1,2,3,4,5,6
        resultGroup.forEach(x -> {
            log.info("group by  key:{} value:{}", x._1, Joiner.on(",").skipNulls().join(x._2).toString());
        });

    }

    /**
     * spark streaming  practice 
     */
    public void sparkStreaming() throws InterruptedException {
        JavaStreamingContext jsc = new JavaStreamingContext(javaSparkContext, Durations.seconds(10));// Batch interval time 
        JavaReceiverInputDStream<String> lines = jsc.receiverStream(new CustomStreamingReceiver(StorageLevel.MEMORY_AND_DISK_2()));
        JavaDStream<Long> count = lines.count();
        count = count.map(x -> {
            log.info(" How many pieces of data in this batch :{}", x);
            return x;
        });
        count.print();
        jsc.start();
        jsc.awaitTermination();
        jsc.stop();
    }
}

Spark Streaming Receiver

package com.patrol.spark.receiver;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;

/**
 * @Copyright: 2019-2021
 * @FileName: CustomStreamingReceiver.java
 * @Author: PJL
 * @Date: 2020/9/1 17:07
 * @Description:  Custom reception streaming class 
 */
@Slf4j
public class CustomStreamingReceiver extends Receiver<String> {

    /**
     *
     * @author	hz15041240
     * @date	2018 year 1 month 18 Japan   Afternoon 4:37:22
     * @version
     */
    private static final long serialVersionUID = 5817531198342629801L;

    public CustomStreamingReceiver(StorageLevel storageLevel) {
        super(storageLevel);
    }

    @Override
    public void onStart() {
        new Thread(this::doStart).start();
        log.info(" To start Receiver...");
        //doStart();
    }

    public void doStart() {
        while(!isStopped()) {
            int value = RandomUtils.nextInt(100);
            if(value <20) {
                try {
                    Thread.sleep(1000);
                }catch (Exception e) {
                    log.error("sleep exception",e);
                    restart("sleep exception", e);
                }
            }
            store(String.valueOf(value));
        }
    }


    @Override
    public void onStop() {
        log.info(" About to stop Receiver...");
    }
}

Control layer service interface

package com.patrol.spark.controller;

import com.patrol.spark.service.SparkService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Map;

/**
 * @Copyright: 2019-2021
 * @FileName: SparkController.java
 * @Author: PJL
 * @Date: 2020/9/1 17:11
 * @Description: Spark controller 
 */
@RestController
public class SparkController {

    @Autowired
    private SparkService sparkService;

    @RequestMapping("/spark/top10")
    public Map<String, Object> calculateTopTen() {
        return sparkService.calculateTopTen();
    }

    @RequestMapping("/spark/exercise")
    public void exercise() {
        sparkService.sparkExerciseDemo();
    }

    @RequestMapping("/spark/stream")
    public void streamingDemo() throws InterruptedException {
        sparkService.sparkStreaming();
    }
}

Spark Submit task to cluster

Interface call

package com.patrol.spark.controller;

import com.patrol.spark.threads.InputStreamReaderRunnable;
import org.apache.spark.launcher.SparkLauncher;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;

/**
 * @Copyright: 2019-2021
 * @FileName: SparkLauncherController.java
 * @Author: PJL
 * @Date: 2020/9/1 17:59
 * @Description: Spark Submit task to cluster [https://blog.csdn.net/lyd882/article/details/103806085]
 */
@RestController
public class SparkLauncherController {

    /**
     *  Use the official offer SparkLauncher java Interface to use java Submit code Spark The task is to Spark colony 
     *
     * @return
     */
    @GetMapping(value = "spark/kpi")
    public String submitTast(){
        HashMap env = new HashMap();
        //hadoop、spark Environment variable read 
        env.put("HADOOP_CONF_DIR" ,  System.getenv().getOrDefault("HADOOP_CONF_DIR","/usr/local/hadoop/etc/overriterHaoopConf"));
        env.put("JAVA_HOME", System.getenv().getOrDefault("JAVA_HOME","/usr/local/java/jdk1.8.0_151"));
        // establish SparkLauncher starter 
        SparkLauncher handle = new SparkLauncher(env)
                .setSparkHome("/home/spark/spark-2.4.4-bin-hadoop2.7")
                .setAppResource("/home/sino/spark-model-1.0/lib/spark-model-1.0.jar")
                .setMainClass("com.sinovatio.spark.JobStarter")
                .setMaster("yarn")
                .setDeployMode("client")
                .setConf("spark.yarn.queue","sino")
                .setConf("spark.app.id", "luncher-test")
                .setConf("spark.driver.memory", "1g")
                .setConf("spark.executor.memory", "1g")
                .setConf("spark.executor.instances", "2")
                .setConf("spark.executor.cores", "2")
                .setConf("spark.default.parallelism", "12")
                .setConf("spark.driver.allowMultipleContexts","true")
                .setVerbose(true);

        try {
            // Task submitted 
            Process process = handle.launch();
            // Log trace child thread 
            InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input");
            Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
            inputThread.start();

            InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error");
            Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
            errorThread.start();

            System.out.println("Waiting for finish...");
            int exitCode = process.waitFor();
            System.out.println("Finished! Exit code:" + exitCode);
            return "status: "+exitCode;

        }catch (Exception e){
            e.printStackTrace();
            return "status: "+1;
        }

    }
}

Use Runable Implementation thread

package com.patrol.spark.threads;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

/**
 * @Copyright: 2019-2021
 * @FileName: InputStreamReaderRunnable.java
 * @Author: PJL
 * @Date: 2020/9/1 18:01
 * @Description:  Input stream read thread implementation 
 */
public class InputStreamReaderRunnable implements Runnable {

    private BufferedReader reader;

    private String name;

    public InputStreamReaderRunnable(InputStream is, String name) {
        this.reader = new BufferedReader(new InputStreamReader(is));
        this.name = name;
    }

    public void run() {
        System.out.println("InputStream " + name + ":");
        try {
            String line = reader.readLine();
            while (line != null) {
                System.out.println(line);
                line = reader.readLine();
            }
            reader.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Program jar Package run

utilize spark-submit Command run ( stay spark The installation directory bin Next ):

spark-submit \
    --class com.patrol.spark.PatrolSparkServiceApplication  \
    --executor-memory 4G \
    --num-executors 8 \
    --master yarn-client \
/data/test/patrol-spark-service-0.0.1-SNAPSHOT.jar

Most of the current environment is in Linux Running on the server , Please use Linux experiment .

Project download address :https://download.csdn.net/download/boonya/12800097

版权声明
本文为[boonya]所创,转载请带上原文链接,感谢

Scroll to Top