编程知识 cdmana.com

Big data development spark RDD case HTTP log analysis

1. In production environment , How to handle configuration files && Table data processing

The configuration file , Or configuration table , It's usually online db, such as mysql Relational database , Or backstage rd Just throw you a document , The amount of data is very small compared with the large table of the whole offline data warehouse , So in this case , The general practice is to put a small watch , Or small files broadcast out , So here's an example , The use of broadcast table solves ip Address mapping problem

Data address : link :https://pan.baidu.com/s/1FmFxSrPIynO3udernLU0yQ Extraction code :hell

2. Log analysis case 1

2.1 Data description

http.log

Logs generated by users visiting the website . The format of the log is : Time stamp 、IP Address 、 Visit website 、 Access data 、 Browser information, etc , A sample of :

file

ip.dat:ip Segment data , There are records of ip The position of the segment range , The total amount is about 11 Ten thousand , The amount of data is also very small , A sample of

file

file location :data/http.log、data/ip.dat

link :https://pan.baidu.com/s/1FmFxSrPIynO3udernLU0yQ Extraction code :hell

requirement : take http.log In the document ip Translate to address . Such as the 122.228.96.111 Wenzhou , And statistics of the total number of visits to each city

2.2. The implementation idea and code are as follows

There are three key points ,http.log The key message of this is ip Address , So according to the principle of data reduction , Read only ip that will do , in addition ip When mapping comparison ,ip The address mapping file is sorted , So in order to improve the search efficiency , Using will ip Address to long type , And then use dichotomy to find , Find the address and map it to the address .

package com.hoult.work

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

/**
 *  data source :1.ip Access log of address  2.ip Address map 
 *  You need to broadcast the mapping table , Address conversion to long Compare types 
 */
object FindIp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName(this.getClass.getCanonicalName)
      .getOrCreate()
    val sc = spark.sparkContext

    import spark.implicits._
    val ipLogsRDD = sc.textFile("data/http.log")
      .map(_.split("\\|")(1))


    val ipInfoRDD = sc.textFile("data/ip.dat").map {
      case line: String => {
        val strSplit: Array[String] = line.split("\\|")
        Ip(strSplit(0), strSplit(1), strSplit(7))
      }
    }


    val brIPInfo = sc.broadcast(ipInfoRDD.map(x => (ip2Long(x.startIp), ip2Long(x.endIp), x.address))collect())

    // The result of correlation rdd
    ipLogsRDD
      .map(x => {
        val index  = binarySearch(brIPInfo.value, ip2Long(x))
        if (index != -1 )
          brIPInfo.value(index)._3
        else
          "NULL"
      }).map(x => (x, 1))
      .reduceByKey(_ + _)
      .map(x => s" City :${x._1},  Traffic volume :${x._2}")
      .saveAsTextFile("data/work/output_ips")

  }

  //ip Turn into long type 
  def ip2Long(ip: String): Long = {
    val fragments = ip.split("[.]")
    var ipNum = 0L
    for (i <- 0 until fragments.length) {
      ipNum = fragments(i).toLong | ipNum << 8L
    }
    ipNum
  }

  // Dichotomy matches ip The rules 
  def binarySearch(lines: Array[(Long, Long, String)], ip: Long): Int = {
    var low = 0
    var high = lines.length - 1
    while (low <= high) {
      val middle = (low + high) / 2
      if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))
        return middle
      if (ip < lines(middle)._1)
        high = middle - 1
      else {
        low = middle + 1
      }
    }
    -1
  }

}

case class Ip(startIp: String, endIp: String, address: String)

The screenshot of the result is as follows :

file

3. Log analysis case 2

3.1 Data description

Log format :IP shooting (Hit/Miss) Response time request time request method request URL Request protocol status code response size referer The user agent

Log file location :data/cdn.txt

data case:

file

Mission

2.1、 The calculation is independent IP Count

2.2、 Count each video independently IP Count ( The logo of the video : In some parts of the log file *.mp4, Represents a video file )

2.3、 Count the traffic in each hour of the day

analysis : Just started looking for green time jod-time analysis , Looking around, I don't know how to write , Later found that it only takes hours , Use regularization to extract , Pay attention to video The interview of ip When , It can be used aggregateByKey To improve performance

3.2 Implementation code

package com.hoult.work

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

/**
 *  Read log table to rdd
 *  Get the fields you need :ip,  Access time : Hours ,  Video name video_name (url Medium xx.mp4),
 *  analysis :
 * 1. The calculation is independent IP Count 
 * 2. Count each video independently IP Count ( The logo of the video : In some parts of the log file  *.mp4, Represents a video file )
 * 3. Count the traffic in each hour of the day 
 */
object LogAnaylse {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName(this.getClass.getCanonicalName)
      .getOrCreate()
    val sc = spark.sparkContext


    val cdnRDD = sc.textFile("data/cdn.txt")

    // The calculation is independent ips
//    aloneIPs(cdnRDD.repartition(1))

    // Each video is independent ip Count 
//    videoIPs(cdnRDD.repartition(1))

    // Hourly flow 
    hourPoor(cdnRDD.repartition(1))
  }



  /**
   *  Independent ip Count 
   */
  def aloneIPs(cdnRDD: RDD[String]) = {
    // matching ip Address 
    val IPPattern = "((?:(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d)))\\.){3}(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d))))".r

    val ipnums = cdnRDD
      .flatMap(x => (IPPattern findFirstIn x))
      .map(y => (y,1))
      .reduceByKey(_+_)
      .sortBy(_._2,false)

    ipnums.saveAsTextFile("data/cdn/aloneIPs")
  }

  /**
   *  Video independence ip Count 
   */
  def videoIPs(cdnRDD: RDD[String]) = {
    // matching  http  Response code and request data size 
    val httpSizePattern = ".*\\s(200|206|304)\\s([0-9]+)\\s.*".r


    //[15/Feb/2017:11:17:13 +0800]   matching  2017:11  Statistics by the number of plays per hour 
    val timePattern = ".*(2017):([0-9]{2}):[0-9]{2}:[0-9]{2}.*".r

    import scala.util.matching.Regex

    // Entering paste mode (ctrl-D to finish)

    def isMatch(pattern: Regex, str: String) = {
      str match {
        case pattern(_*) => true
        case _ => false
      }
    }

    def getTimeAndSize(line: String) = {
      var res = ("", 0L)
      try {
        val httpSizePattern(code, size) = line
        val timePattern(year, hour) = line
        res = (hour, size.toLong)
      } catch {
        case ex: Exception => ex.printStackTrace()
      }
      res
    }

    val IPPattern = "((?:(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d)))\\.){3}(?:25[0-5]|2[0-4]\\d|((1\\d{2})|([1-9]?\\d))))".r

    val videoPattern = "([0-9]+).mp4".r

    val res = cdnRDD
      .filter(x => x.matches(".*([0-9]+)\\.mp4.*"))
      .map(x => (videoPattern findFirstIn x toString,IPPattern findFirstIn x toString))
      .aggregateByKey(List[String]())(
        (lst, str) => (lst :+ str),
        (lst1, lst2) => (lst1 ++ lst2)
      )
      .mapValues(_.distinct)
      .sortBy(_._2.size,false)

      res.saveAsTextFile("data/cdn/videoIPs")
  }

  /**
   *  The amount of traffic per hour of the day 
   *
   */
  def hourPoor(cdnRDD: RDD[String]) = {
    val httpSizePattern = ".*\\s(200|206|304)\\s([0-9]+)\\s.*".r
    val timePattern = ".*(2017):([0-9]{2}):[0-9]{2}:[0-9]{2}.*".r
    import scala.util.matching.Regex

    def isMatch(pattern: Regex, str: String) = {
      str match {
        case pattern(_*) => true
        case _ => false
      }
    }

    def getTimeAndSize(line: String) = {
      var res = ("", 0L)
      try {
        val httpSizePattern(code, size) = line
        val timePattern(year, hour) = line
        res = (hour, size.toLong)
      } catch {
        case ex: Exception => ex.printStackTrace()
      }
      res
    }

    cdnRDD
      .filter(x=>isMatch(httpSizePattern,x))
      .filter(x=>isMatch(timePattern,x))
      .map(x=>getTimeAndSize(x))
      .groupByKey()
      .map(x=>(x._1,x._2.sum))
      .sortByKey()
      .map(x=>x._1+" when  CDN Traffic ="+x._2/(102424*1024)+"G")
      .saveAsTextFile("data/cdn/hourPoor")
  }
}

Screenshot of operation result :

file file file

4. Advertising exposure analysis case

Suppose you click on the log file (click.log) And exposure logs imp.log, The format of each line in is as follows

// Click log 
INFO 2019-09-01 00:29:53 requestURI:/click?app=1&p=1&adid=18005472&industry=469&adid=31
INFO 2019-09-01 00:30:31 requestURI:/click?app=2&p=1&adid=18005472&industry=469&adid=31
INFO 2019-09-01 00:31:03 requestURI:/click?app=1&p=1&adid=18005472&industry=469&adid=32
INFO 2019-09-01 00:31:51 requestURI:/click?app=1&p=1&adid=18005472&industry=469&adid=33

// Exposure log 
INFO 2019-09-01 00:29:53 requestURI:/imp?app=1&p=1&adid=18005472&industry=469&adid=31
INFO 2019-09-01 00:29:53 requestURI:/imp?app=1&p=1&adid=18005472&industry=469&adid=31
INFO 2019-09-01 00:29:53 requestURI:/imp?app=1&p=1&adid=18005472&industry=469&adid=34 

use Spark-Core Realize the statistics of each adid The number of exposures and hits , The idea is simple , Go straight to the code

Code :

package com.hoult.work

import org.apache.spark.sql.SparkSession

object AddLog {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName(this.getClass.getCanonicalName)
      .getOrCreate()
    val sc = spark.sparkContext

    val clickRDD = sc.textFile("data/click.log")
    val impRDD = sc.textFile("data/imp.log")

    val clickRes = clickRDD.map{line => {
      val arr = line.split("\\s+")
      val adid = arr(3).substring(arr(3).lastIndexOf("=") + 1)
      (adid, 1)
    }}.reduceByKey(_ + _)

    val impRes = impRDD.map { line =>
      val arr = line.split("\\s+")
      val adid = arr(3).substring(arr(3).lastIndexOf("=") + 1)
      (adid, 1)
    }.reduceByKey(_ + _)

    // Save to hdfs
    clickRes.fullOuterJoin(impRes)
      .map(x => x._1 + "," + x._2._1.getOrElse(0) + "," + x._2._2.getOrElse(0))
      .repartition(1)
//      .saveAsTextFile("hdfs://linux121:9000/data/")
      .saveAsTextFile("data/add_log")

    sc.stop()
  }
}

analysis : There were two shuffle, fulljon It can be changed to union + reduceByKey, take shuffle Cut it down to one

5. Use spark-sql Complete the following transformation

A The table has three fields :ID、startdate、enddate, Yes 3 Data :

1 2019-03-04 2020-02-03

2 2020-04-05 2020-08-04

3 2019-10-09 2020-06-11

Write SQL( need SQL and DSL) Change the above data into :

2019-03-04 2019-10-09

2019-10-09 2020-02-03

2020-02-03 2020-04-05

2020-04-05 2020-06-11

2020-06-11 2020-08-04

2020-08-04 2020-08-04

analysis : Observe , You can get , The first column is actually startdate and enddate The result of the superposition of two columns , And the second column is the next , It can be used lead

Window function

The code is as follows

package com.hoult.work

import org.apache.spark.sql.{DataFrame, SparkSession}

object DataExchange {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("DateSort")
      .master("local[*]")
      .getOrCreate()
    spark.sparkContext.setLogLevel("warn")

    //  The original data 
    val tab = List((1, "2019-03-04", "2020-02-03"),(2, "2020-04-05", "2020-08-04"),(3, "2019-10-09", "2020-06-11"))
    val df: DataFrame = spark.createDataFrame(tab).toDF("ID", "startdate", "enddate")

    val dateset: DataFrame = df.select("startdate").union(df.select("enddate"))
    dateset.createOrReplaceTempView("t")

    val result: DataFrame = spark.sql(
      """
        |select tmp.startdate, nvl(lead(tmp.startdate) over(partition by col order by tmp.startdate), startdate) enddate from
        |(select "1" col, startdate from t) tmp
        |""".stripMargin)

    result.show()
  }

}

Running results

file Big data development , Pay more attention to see your profile

版权声明
本文为[Hoult_ Wu Xie]所创,转载请带上原文链接,感谢
https://cdmana.com/2021/01/20210131170357646h.html

Scroll to Top