编程知识 cdmana.com

Le chemin avancé du Big Data - - Spark SQL Logging Analysis

Programme de base

Journal du comportement de l'utilisateur:Toutes les données de comportement de l'utilisateur chaque fois qu'il visite le site(Accès à、Parcourir、Recherche、Cliquez sur…)
Piste de comportement de l'utilisateur、Journal de trafic

Contenu des données du Journal:

  • 1)Propriétés du système accessibles: Système d'exploitation、Navigateur, etc
  • 2)Caractéristiques d'accès:Cliquéurl、De qui?urlQui a sauté(referer)、Temps de séjour sur la page, etc
  • 3)Accès aux informations:session_id、Accès àip(Visite de la ville)Attendez.
2013-05-19 13:00:00     http://www.taobao.com/17/?tracker_u=1624169&type=1      B58W48U4WKZCJ5D1T3Z9ZY88RU7QA7B1        http://hao.360.cn/      1.196.34.243 

Processus de traitement des données

  • 1) Acquisition de données
    Flume: webJournal écrit àHDFS

  • 2)Nettoyage des données
    Données sales
    Spark、Hive、MapReduce Ou d'autres cadres informatiques distribués
    Les données après nettoyage peuvent être stockées dansHDFS(Hive/Spark SQL)

  • 3)Traitement des données
    Effectuer des statistiques et des analyses sur les activités en fonction de nos besoins
    Spark、Hive、MapReduce Ou d'autres cadres informatiques distribués

  • 4) Réception des résultats du traitement
    Les résultats peuvent être stockés dans RDBMS、NoSQL

  • 5) Visualisation des données
    Par une présentation graphique :Diagramme circulaire、Histogramme、La carte、Diagramme linéaire
    ECharts、HUE、Zeppelin

Nettoyage des données

D'abord pardebug Trouver les champs correspondants après la Division

  • Erreur signalée
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.

Lors de la première étape du nettoyage des données , Les données peuvent être imprimées , Mais impossible d'écrire dans le fichier local , C'est parce qu'il n'y en a pas ici. hadoop Système pseudo - distribué

Installez un plug - in.

https://hiszm.lanzous.com/iWyqmhrgk0f

Télécharger le plug - in ci - dessus ,Et puis, Créer un nouveau répertoire et le mettre dans le Répertoire
C:\Data\hadoop\bin

Puis ajouter une variable d'environnement système
HADOOP_HOME
C:\Data\hadoop

package org.sparksql

import org.apache.spark.sql.SparkSession

object SparkFormatApp {
    

  def main(args: Array[String]): Unit = {
    

    //SparkSession- Oui.sparkClasse d'entrée pour
    val spark = SparkSession.builder().appName("SparkFormatApp")
                .master("local[2]").getOrCreate()
    val access = spark.sparkContext.textFile("10000_access.log")

    //access.take(10).foreach(println)

    access.map(line=>{
    
      val splits = line.split(" ")
      val ip = splits(0)
      val time = splits(3) + " " + splits(4)
      val traffic = splits(9)
      val url =  splits(11).replace("\"","")
     //(ip,DateUtils.parse(time),traffic,traffic,url)
      DateUtils.parse(time) + "\t" + url + "\t" + traffic + "\t" + ip
    }).saveAsTextFile("output")

    //.take(10).foreach(println)
    //.saveAsTextFile("output")

    spark.stop()

  }
}

Traitement général des journaux , Nous avons besoin de partitions ,
Partitionner en fonction du temps d'accès dans le journal ,Par exemple,:d,h,m5(Chaque5 Une partition par minute )

Nettoyage secondaire

  • Entrée:Heure de la visite、Accès àURL、 Trafic consommé 、Accès àIPAdresse
  • Produits:URL、cmsType(video/article)、cmsId(No.)、Flux、ip、 Informations sur la ville 、Heure de la visite、Oh, mon Dieu.
package org.sparksql

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{
    LongType, StringType, StructField, StructType}

// Classe de conversion de l'outil de journalisation d'accès 
object AccessConvertUtils {
    

  val struct=StructType(
    Array(
      StructField("url",StringType),
      StructField("cmsType",StringType),
      StructField("cmsId",LongType),
      StructField("traffic",LongType),
      StructField("ip",StringType),
      StructField("city",StringType),
      StructField("time",StringType),
      StructField("day",StringType)
    )
  )

// Convertir le style de sortie en fonction de chaque ligne d'information entrée 
  def parseLog(log:String)={
    
    try{
    
      val splits=log.split("\t")
      val url =splits(1)
      val traffic = splits(2).toLong
      val ip = splits(3)

      val domain="http://www.imooc.com/"
      val cms=url.substring(url.indexOf(domain) + domain.length)
      val cmsTypeId = cms.split("/")
      var cmsType = ""
      var cmsId = 0l
      if(cmsTypeId.length > 1){
    
        cmsType = cmsTypeId(0)
        cmsId = cmsTypeId(1).toLong
      }

      val city = IpUtils.getCity(ip)
      val time = splits(0)
      val day =  time.substring(0,10).replaceAll("-","")
      Row(url,cmsType,cmsId,traffic,ip,city,time,day)
    }catch {
    
      case e : Exception => Row(0)
    }
  }
}


  • IP=>Province

Utilisergithub Projets open source existants sur
1)git clone https://github.com/wzhe06/ipdatabase.git

2) Compiler les éléments téléchargés :mvn clean package -DskipTests

3)Installationjar Emballez - le pour vous - même. mavenEntrepôt

mvn install:install-file -Dfile=C:\Data\ipdatabase\target\ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar

  1. Copiez les fichiers pertinents ou une erreur sera signalée.

java.io.FileNotFoundException: file:/Users/rocky/maven_repos/com/ggstar/ipdatabase/1.0/ipdatabase-1.0.jar!/ipRegion.xlsx (No such file or directory)
  1. Tests

package org.sparksql

import org.apache.spark.sql.SparkSession

object SparkCleanApp {
    

  def main(args: Array[String]): Unit = {
    
    //SparkSession- Oui.sparkClasse d'entrée pour
    val spark = SparkSession.builder().appName("SparkFormatApp")
      .master("local[2]").getOrCreate()
    val accessRDD = spark.sparkContext.textFile("access.log")

    //accessRDD.take(10).foreach(println)

    val accessDF = spark.createDataFrame(accessRDD.map(x=>AccessConvertUtils.parseLog(x)),AccessConvertUtils.struct)

    accessDF.printSchema()
    accessDF.show()

    spark.stop
  }


}

root
 |-- url: string (nullable = true)
 |-- cmsType: string (nullable = true)
 |-- cmsId: long (nullable = true)
 |-- traffic: long (nullable = true)
 |-- ip: string (nullable = true)
 |-- city: string (nullable = true)
 |-- time: string (nullable = true)
 |-- day: string (nullable = true)




+--------------------+-------+-----+-------+---------------+----+-------------------+--------+
|                 url|cmsType|cmsId|traffic|             ip|city|               time|     day|
+--------------------+-------+-----+-------+---------------+----+-------------------+--------+
|http://www.imooc....|  video| 4500|    304|  218.75.35.226| Province du Zhejiang|2017-05-11 14:09:14|20170511|
|http://www.imooc....|  video|14623|     69| 202.96.134.133| Province de Guangdong|2017-05-11 15:25:05|20170511|
|http://www.imooc....|article|17894|    115| 202.96.134.133| Province de Guangdong|2017-05-11 07:50:01|20170511|
|http://www.imooc....|article|17896|    804|  218.75.35.226| Province du Zhejiang|2017-05-11 02:46:43|20170511|
|http://www.imooc....|article|17893|    893|222.129.235.182| Pékin|2017-05-11 09:30:25|20170511|
|http://www.imooc....|article|17891|    407|  218.75.35.226| Province du Zhejiang|2017-05-11 08:07:35|20170511|
|http://www.imooc....|article|17897|     78| 202.96.134.133| Province de Guangdong|2017-05-11 19:08:13|20170511|
|http://www.imooc....|article|17894|    658|222.129.235.182| Pékin|2017-05-11 04:18:47|20170511|
|http://www.imooc....|article|17893|    161|   58.32.19.255| Shanghai|2017-05-11 01:25:21|20170511|
|http://www.imooc....|article|17895|    701|    218.22.9.56| Province d'Anhui|2017-05-11 13:37:22|20170511|
|http://www.imooc....|article|17892|    986|  218.75.35.226| Province du Zhejiang|2017-05-11 05:53:47|20170511|
|http://www.imooc....|  video|14540|    987|   58.32.19.255| Shanghai|2017-05-11 18:44:56|20170511|
|http://www.imooc....|article|17892|    610|  218.75.35.226| Province du Zhejiang|2017-05-11 17:48:51|20170511|
|http://www.imooc....|article|17893|      0|    218.22.9.56| Province d'Anhui|2017-05-11 16:20:03|20170511|
|http://www.imooc....|article|17891|    262|   58.32.19.255| Shanghai|2017-05-11 00:38:01|20170511|
|http://www.imooc....|  video| 4600|    465|  218.75.35.226| Province du Zhejiang|2017-05-11 17:38:16|20170511|
|http://www.imooc....|  video| 4600|    833|222.129.235.182| Pékin|2017-05-11 07:11:36|20170511|
|http://www.imooc....|article|17895|    320|222.129.235.182| Pékin|2017-05-11 19:25:04|20170511|
|http://www.imooc....|article|17898|    460| 202.96.134.133| Province de Guangdong|2017-05-11 15:14:28|20170511|
|http://www.imooc....|article|17899|    389|222.129.235.182| Pékin|2017-05-11 02:43:15|20170511|
+--------------------+-------+-----+-------+---------------+----+-------------------+--------+

Avantages de la modulation:

  1. Contrôle la taille de la sortie du fichier : coalesce
  2. Ajustement du type de données pour les champs de partition :spark.sql.sources.partitionColumnTypeInference.enabled
  3. Insérer des données de base de données par lots, Soumettre pour utilisation batchFonctionnement
package org.sparksql

import org.apache.spark.sql.{
    DataFrame, SparkSession}
import org.apache.spark.sql.functions._
object TopNApp {
    
  //Les plus populaires
  def videoAccessTopN(spark: SparkSession, accessDF: DataFrame) = {
    
    import spark.implicits._
    val videoTopNDF = accessDF.filter($"day"==="20170511"&& $"cmsType" === "video")
      .groupBy("day","cmsId").agg(count("cmsId")
      .as("times")).orderBy($"times".desc)
    videoTopNDF.show()

    accessDF.createOrReplaceTempView("access_log")
    val videoTopNDF1 = spark.sql("select day,cmsId,count(1) as times from access_log where day='20170511' and cmsType = 'video' group by day,cmsId order by times desc")

    videoTopNDF1.show()


  }

  def main(args: Array[String]): Unit = {
    
    //SparkSession- Oui.sparkClasse d'entrée pour
    val spark = SparkSession.builder().appName("SparkFormatApp")
      .config("spark.sql.sources.partitionColumnTypeInference.enabled","false")
      .master("local[2]").getOrCreate()

    val accessDF= spark.read.format("parquet").load("output2/")
    accessDF.printSchema()
    accessDF.show(false)

    videoAccessTopN(spark,accessDF)
    spark.stop()
  }




}








+--------+-----+------+
|     day|cmsId| times|
+--------+-----+------+
|20170511|14540|111027|
|20170511| 4000| 55734|
|20170511|14704| 55701|
|20170511|14390| 55683|
|20170511|14623| 55621|
|20170511| 4600| 55501|
|20170511| 4500| 55366|
|20170511|14322| 55102|
+--------+-----+------+

Accès vidéo

package org.sparksql

import java.sql.{
    Connection, DriverManager, PreparedStatement}

object MySqlUtils {
    

  def getConnection() ={
    

// if (!conn.isClosed) System.out.println(" Connecté à la base de données !")
// else System.out.println(" Pas de connexion à la base de données !")
    DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_user?user=root&password=root")
  }
//Libérer les ressources de connexion à la base de données
  def release(connection:Connection,pstmt:PreparedStatement): Unit ={
    
    try{
    
      if(pstmt != null){
    
        pstmt.close()
      }

    }catch{
    
      case e:Exception => e.printStackTrace()

    }finally {
    
      if(connection!=null){
    
        connection.close()
      }
    }
  }

  def main(args: Array[String]): Unit = {
    
    println(getConnection())

  }

}



create table day_video_access_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
times bigint(10) not null,
primary key (day, cms_id)
);

package org.sparksql

import java.sql.{
    Connection, PreparedStatement}

import scala.collection.mutable.ListBuffer

object StatisticsDAO {
    
  def insertDayVideoAccessTopN(list:ListBuffer[DayVideoAccessStatistics]): Unit ={
    

    var connection:Connection = null
    var pstmt:PreparedStatement = null

    try{
    

      connection= MySqlUtils.getConnection()
      // Annuler la soumission automatique 

      connection.setAutoCommit(false)

      val sql = "insert into day_video_access_topn_stat(day,cms_id,times) value (? ,? ,? )"
      pstmt = connection.prepareStatement(sql)

      for(i<-list){
    
        pstmt.setString(1,i.day)
        pstmt.setLong(2,i.cmsId)
        pstmt.setLong(3,i.times)

        pstmt.addBatch()
      }

        pstmt.executeBatch()//Traitement par lots

      //Soumission manuelle
      connection.commit()

    }catch {
    
      case e:Exception=>e.printStackTrace()
    }finally {
    
      MySqlUtils.release(connection,pstmt)
    }



  }

}


package org.sparksql

import org.apache.spark.sql.{
    DataFrame, SparkSession}
import org.apache.spark.sql.functions._

import scala.collection.mutable.ListBuffer
object TopNApp {
    
  //Les plus populaires
  def videoAccessTopN(spark: SparkSession, accessDF: DataFrame) = {
    
    import spark.implicits._
    val videoTopNDF = accessDF.filter($"day"==="20170511"&& $"cmsType" === "video")
      .groupBy("day","cmsId").agg(count("cmsId")
      .as("times")).orderBy($"times".desc)


    videoTopNDF.show()

      try{
    
        videoTopNDF.foreachPartition(partitionOfRecords =>{
    
          val list = new ListBuffer[DayVideoAccessStatistics]

          partitionOfRecords.foreach(info =>{
    
            val day = info.getAs[String]("day")
            val cmsId = info.getAs[Long]("cmsId")
            val times = info.getAs[Long]("times")

            list.append(DayVideoAccessStatistics(day,cmsId,times))

          })
          StatisticsDAO.insertDayVideoAccessTopN(list)
        })


      }catch {
    
        case e:Exception =>e.printStackTrace()
      }

  }




java.sql.SQLException: No value specified for parameter 2

Vérifier si les paramètres et les types d'insertion sont toujours

Par province

create table day_video_city_access_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
city varchar(20) not null,
times bigint(10) not null,
times_rank int not null,
primary key (day, cms_id, city)
);
  def cityAccessTopN(spark: SparkSession, accessDF: DataFrame) = {
    
    import spark.implicits._
    val cityTopNDF = accessDF.filter($"day"==="20170511"&& $"cmsType" === "video")
      .groupBy("day","city","cmsId").agg(count("cmsId")
      .as("times")).orderBy($"times".desc)
    cityTopNDF.show()

   val top3DF =  cityTopNDF.select(
      cityTopNDF("day"),
      cityTopNDF("city"),
      cityTopNDF("cmsId"),
      cityTopNDF("times"),
      row_number().over(Window.partitionBy(cityTopNDF("city"))
        .orderBy(cityTopNDF("times").desc)).as("times_rank")
    ).filter("times_rank <=3")//.show()


    try{
    
      top3DF.foreachPartition(partitionOfRecords =>{
    
        val list = new ListBuffer[DayCityAccessStatistics]
        partitionOfRecords.foreach(info =>{
    
          val day = info.getAs[String]("day")
          val cmsId = info.getAs[Long]("cmsId")
          val city = info.getAs[String]("city")
          val times = info.getAs[Long]("times")
          val timesRank = info.getAs[Int]("times_rank")
          list.append(DayCityAccessStatistics(day,cmsId,city,times,timesRank))
        })
        StatisticsDAO.insertCityVideoAccessTopN(list)
      })
    }catch {
    
      case e:Exception =>e.printStackTrace()
    }
  }


Selon le débit

create table day_video_traffics_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
traffics bigint(20) not null,
primary key (day, cms_id)
);

  def trafficAccessTopN(spark: SparkSession, accessDF: DataFrame) = {
    import spark.implicits._
    val trafficTopNDF = accessDF.filter($"day"==="20170511"&& $"cmsType" === "video")
      .groupBy("day","cmsId").agg(sum("traffic").as("traffics"))
     .orderBy($"traffics".desc)


    trafficTopNDF.show()

    try{
      trafficTopNDF.foreachPartition(partitionOfRecords =>{
        val list = new ListBuffer[DayTrafficAccessStatistics]

        partitionOfRecords.foreach(info =>{
          val day = info.getAs[String]("day")
          val cmsId = info.getAs[Long]("cmsId")
          val traffics = info.getAs[Long]("traffics")

          list.append(DayTrafficAccessStatistics(day,cmsId,traffics))

        })
        StatisticsDAO.insertTrafficVideoAccessTopN(list)
      })
    }catch {
      case e:Exception =>e.printStackTrace()
    }

  }


Optimisation

  • Supprimer les données précédentes à chaque mise à jour
  def deleteData(day:String)={
    
    val tables= Array("day_video_traffics_topn_stat","day_video_city_access_topn_stat","day_video_access_topn_stat")
    var connection:Connection = null
    var pstmt:PreparedStatement = null
    try{
    

      connection = MySqlUtils.getConnection()
      for(table<- tables){
    
        val deleteSQL = s"delete from $table where day = ?"
        pstmt = connection.prepareStatement(deleteSQL)
        pstmt.setString(1,day)
        pstmt.executeUpdate()
      }

    }catch {
    
      case e:Exception => e.printStackTrace()
    }finally {
    
      MySqlUtils.release(connection, pstmt)
    }
  }


Visualisation des données

Visualisation des données: La plus grande valeur d'une image est qu'elle peut rendre ce que nous voyons réellement plus riche que nous ne nous y attendions.

Cadres de visualisation communs
1)echarts
2)highcharts
3)D3.js
4)HUE
5)Zeppelin

echarts

package org.sparkSQL.Utils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class MySqlUtils {
    

    private  static final String USERNAME = "root";
    private  static final String PASSWORD = "root";
    private  static final String DRIVERCLASS = "com.mysql.jdbc.Driver";
    private  static final String URL = "jdbc:mysql://localhost:3306/imooc_user";

    public static Connection getConnection(){
    

        Connection connection =  null;
        try {
    
            Class.forName(DRIVERCLASS);
            connection  = DriverManager.getConnection(URL,USERNAME,PASSWORD);
        }catch (Exception e){
    
            e.printStackTrace();
        }

        return connection;


    }



    public static void  release(Connection connection, PreparedStatement pstmt , ResultSet rs){
    
        if(rs != null){
    
            try{
    
                rs.close();
            }catch (Exception e){
    
                e.printStackTrace();
            }

        }
        if(connection != null){
    
            try{
    
                connection.close();
            }catch (Exception e){
    
                e.printStackTrace();
            }

        }

        if(pstmt != null){
    
            try{
    
                pstmt.close();
            }catch (Exception e){
    
                e.printStackTrace();
            }

        }

    }

    public static void main(String[] args) {
    
        System.out.println(getConnection());
    }


}


<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <title>ECharts</title>
    <!-- Introduction echarts.js -->
    <script src="https://cdn.bootcss.com/echarts/3.7.1/echarts.min.js"></script>
    <script src="https://s3.pstatp.com/cdn/expire-1-M/jquery/3.1.1/jquery.min.js"></script>
</head>
<body>
<!-- PourEChartsPréparer une taille(Largeur et hauteur)DeDom -->
<div id="main" style="width: 600px;height:400px;"></div>
<script type="text/javascript">


    // Sur la base d'une préparationdom,InitialisationechartsExemple
    var myChart = echarts.init(document.getElementById('main'));

    // Spécifier les éléments de configuration et les données du diagramme
    var option = {
    
        title: {
    
            text: 'Les plus populairesTOPN',
            subtext: 'Tests',
            left: 'center'
        },
        tooltip: {
    
            trigger: 'item',
            formatter: '{a} <br/>{b} : {c} ({d}%)'
        },
        legend: {
    
            orient: 'vertical',
            left: 'left',
            data: ['Accès direct', 'Marketing par courrier', 'Publicité de l'Alliance', 'Publicité vidéo', 'Moteur de recherche']
        },
        series: [
            {
    
                name: 'Nombre de visite',
                type: 'pie',
                radius: '55%',
                center: ['50%', '60%'],
                data: (function(){
    
                    var courses= [];
                    $.ajax({
    
                        type:"GET",
                        url:"stat?day=20170511",
                        dataType:'json',
                        async:false,
                        success:function (result){
    
                            for(var i=0;i<result.length;i++){
    
                                courses.push({
    "value":result[i].value,"name":result[i].name})
                            }

                        }
                    })
                    return courses;

                })(),
                emphasis: {
    
                    itemStyle: {
    
                        shadowBlur: 10,
                        shadowOffsetX: 0,
                        shadowColor: 'rgba(0, 0, 0, 0.5)'
                    }
                }
            }
        ]
    };


    // Afficher le diagramme en utilisant les éléments de configuration et les données que vous venez de spécifier.
    myChart.setOption(option);
</script>
</body>
</html>


[hadoop@hadoop001 software]$ tar -zxvf zeppelin-0.7.1-bin-all.tgz -C ~/app/

[hadoop@hadoop001 bin]$ ./zeppelin-daemon.sh start
Log dir doesn't exist, create /home/hadoop/app/zeppelin-0.7.1-bin-all/logs
Pid dir doesn't exist, create /home/hadoop/app/zeppelin-0.7.1-bin-all/run
Zeppelin start       

版权声明
本文为[Sun zhongming]所创,转载请带上原文链接,感谢
https://cdmana.com/2021/10/20211013211945035x.html

Scroll to Top