编程知识 cdmana.com

Hive quick start for data warehouse offline & real time data warehouse architecture

Data warehouse VS database

Definition of data warehouse :

  • Data warehouse is to pass data from multiple data sources through ETL(Extract( extract )、Transform( transformation )、Load( load )) After that , A structured data environment integrated according to certain topics to provide decision support and online analysis applications

Data warehouse VS database :

  • Database is a transaction oriented design , Data warehouse is subject oriented design
  • Databases generally store online transaction data , Data warehouse usually stores historical data
  • Database design is to avoid redundancy , Using the rules of three paradigms to design , Data warehouse is designed to introduce redundancy , Design in an anti normal way

OLTP VS OLAP:

  • On line transaction processing OLTP Is the main application of traditional relational database , Basically 、 Routine business , For example, bank transactions
  • On line analytical processing OLAP Is the main application of data warehouse system , Support complex analysis operations , Focus on decision support , And provide intuitive and easy to understand query results

Conventional Warehouse Architecture :
 Data warehouse Hive Quick start  -  offline & Real time data warehouse architecture

Why build a data warehouse :

  • Different business data are inconsistent , The data relationship is chaotic
  • Business systems are generally for OLTP, And data warehouse can be realized OLAP analysis
  • Data warehouse is a multi-source complex environment , It can analyze the data of multiple businesses in a unified way

The goal of data warehouse construction is :

  • Integrating multiple sources of data , Data sources and whereabouts are traceable , Sort out the blood relationship
  • Reduce redevelopment , Save generic intermediate data , Avoid double counting
  • Shield the underlying business logic , Provide consistent with the outside world 、 Structured data

How to achieve :

  • Implement universal data ETL Tools
  • Establish a reasonable data hierarchy model according to the business

Layered construction of data warehouse

Background of digital warehouse construction :

  • Data construction has just started , Most of the data is directly connected to the business after rough data access
  • Data construction has developed to a certain stage , Find that the use of data is messy , All kinds of business are calculated directly from the original data .
  • All kinds of double counting , Serious waste of computing resources , Need to optimize performance

Why do we stratify silos :

  • Clear data structure : Each data tier has a corresponding scope
  • Data bloodline : Track the data table conversion between layers , Establish kinship
  • Reduce redevelopment : Standardize data tiering , Develop common middle tier data
  • Shield the exception of the original data : Control data quality through data hierarchy
  • Block the impact of business : You don't have to change the service once, you need to re access the data
  • Simplify complex problems : Decompose the complex data warehouse architecture into multiple data layers

Common hierarchical meanings :
 Data warehouse Hive Quick start  -  offline & Real time data warehouse architecture

STG layer

  • Raw data layer : Store raw data , The data structure is consistent with the collected data
  • Storage cycle : Save all the data
  • Table naming conventions :stg_ The theme _ Table content _ Table rules

ODS layer

  • Data operation layer : Yes STG Layer data for preliminary processing , Such as removing dirty data , Remove useless fields .
  • Storage cycle : Default to keep near 30 Day data
  • Table naming conventions :ods_ The theme _ Table content _ Table rules

DWD layer

  • Data detail layer : A wide table after data processing , The goal is to meet 80% Business needs
  • Storage cycle : Keep all the data from history to the present
  • Table naming conventions :dwd_ Business description time granularity

DWS layer

  • Data aggregation layer : Summary data , Solve the problem of data aggregation calculation and data integrity
  • Storage cycle : Keep all the data from history to the present
  • Table naming conventions :dws_ Business description _ Time granularity _sum

DIM layer

  • The public dimension layer : Store public information data , be used for DWD、DWS Data Association
  • Storage cycle : Store on demand , In general, all data from history to the present day is preserved
  • Table naming conventions :dim_ Dimension description

DM layer

  • Data mart layer : be used for BI、 Multidimensional analysis 、 label 、 Data mining, etc
  • Storage cycle : Store on demand ,-- It keeps all the data from history to the present
  • Table naming conventions :dm_ The theme _ Table content _ Table rules

Data flow between layers :
 Data warehouse Hive Quick start  -  offline & Real time data warehouse architecture


Hive What is it?

Hive brief introduction :

  • Hive Is based on Hadoop Data warehouse tools for , Provide classes SQL grammar (HiveQL)
  • Default to MR As a computing engine ( It also supports other computing engines , for example tez)、HDFS As a storage system , Provide computing for very large datasets / Expand capabilities
  • Hive It's mapping data into databases and tables , The metadata information of database and table generally exists in relational database

Hive The simple architecture of :
 Data warehouse Hive Quick start  -  offline & Real time data warehouse architecture

Hive VS Hadoop:

  • Hive data storage :Hive The data is stored in HDFS. Upper ,Hive The library and table of are right for HDFS. Mapping of data on
  • Hive Metadata Store : Metadata is usually stored in an external relational library ( Mysql ) And Presto Impala To share
  • Hive Statement execution : take HQL Convert to MapReduce Task run

Hive And relational databases Mysql The difference between

product positioning

Hive It's data warehouse , Designed for offline analysis of massive data , I won't support it OLTP( Key functions required for online transaction processing ACID, And closer to OLAP( Online analysis technology )), It is suitable for offline processing of large data sets . and MySQL Is a relational database , It's designed for real-time business .

Extensibility

Hive The data stored in HDFS(Hadoop Distributed file system ),metastore Metadata 1 Generally stored in a separate relational database , and MySQL It's the local file system of the server . therefore Hive With good scalability , Database due to ACID The strict limitation of semantics , Scalability is very limited .

Read write mode

Hive For the mode of reading time , Data validation is done at query time , This is good for importing large datasets , Read time mode makes data load very fast , Data loading is only file copying or moving .MySQL For write time mode , Data is checked against schema as it is written to the database . When it comes to improving the performance of queries , Because the database can index columns .

Data update

Hive It is designed for data warehouse application , The content of the storehouse is to read more and write less ,Hive Data rewriting is not supported in , All data is determined at the time of loading . And the data in the database usually needs to be modified frequently .

Indexes

Hive Support the index , however Hive The index of is not the same as that in a relational database , such as ,Hive Primary or foreign keys are not supported .Hive Provides limited indexing capabilities , It can be for - These fields are indexed , The index data of one table is stored in another . Because of the high latency of data access ,Hive Not suitable for online data query . The database is in data access under specific conditions of few stars , Indexing can provide lower latency .

Calculation model

Hive The default model is MapReduce( It's fine too on spark、on tez), and MySQL It was designed by myself Executor Calculation model

 Data warehouse Hive Quick start  -  offline & Real time data warehouse architecture


Hive Installation and deployment

Reference resources :


Hive Basic use ( On )Hive data type / Partition / Basic grammar

Hive data type :

  • Basic data type :int、 float、 double、 string、 boolean、 bigint etc.
  • Complex type :array、map、 struct

Hive Partition :

  • Hive Partition massive data into several fields , It is not necessary to load all the data when querying
  • The partition corresponds to HDFS Namely HDFS The catalog of .
  • Partition is divided into static partition and dynamic partition

Hive Common basic grammar :

  • USE DATABASE_NAME
  • CREATE DATABASE IF NOT EXISTS DB NAME
  • DESC DATABASE DB NAME
  • CREATE TABLE TABLE_ NAME (..) ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t" STORE AS TEXTFILE
  • SELECT * FROM TABLE NAME
  • ALTER TABLE TABLE_NAME RENAME TO NEW_TABLE_NAME

Write a Python The script generates some test data :

import json
import random
import uuid

name = ('Tom', 'Jerry', 'Jim', 'Angela', 'Ann', 'Bella', 'Bonnie', 'Caroline')
hobby = ('reading', 'play', 'dancing', 'sing')
subject = ('math', 'chinese', 'english', 'computer')

data = []
for item in name:
    scores = {key: random.randint(60, 100) for key in subject}
    data.append("|".join([uuid.uuid4().hex, item, ','.join(
        random.sample(set(hobby), 2)), ','.join(["{0}:{1}".format(k, v) for k, v in scores.items()])]))

with open('test.csv', 'w') as f:
    f.write('\n'.join(data))

Execute the script , Generate test data file :

[root@hadoop01 ~/py-script]# python3 gen_data.py
[root@hadoop01 ~/py-script]# ll -h
...
-rw-r--r--. 1 root root  745 11 month   9 11:09 test.csv
[root@hadoop01 ~/py-script]# 

We can take a look at the generated data :

[root@hadoop01 ~/py-script]# cat test.csv 
f4914b91c5284b01832149776ca53c8d|Tom|reading,dancing|math:91,chinese:86,english:67,computer:77

...
  • The data to | To divide , The first two fields are string type , The third field is array type , The fourth field is map type

Create a database for testing :

0: jdbc:hive2://localhost:10000> create database hive_test;
No rows affected (0.051 seconds)
0: jdbc:hive2://localhost:10000> use hive_test;
No rows affected (0.06 seconds)
0: jdbc:hive2://localhost:10000> 

Create test table :

CREATE TABLE test(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

Load local data into Hive in :

0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table test;
No rows affected (0.785 seconds)
0: jdbc:hive2://localhost:10000> 

Query data :
 Data warehouse Hive Quick start  -  offline & Real time data warehouse architecture

Hive take HQL Convert to MapReduce The process of

I understand Hive Medium SQL After basic operation , Let's see Hive How will SQL Convert to MapReduce Mission , The whole conversion process is divided into six stages :

  1. Antr Definition SQL Grammatical rules , complete SQL morphology , Syntax parsing , take SQL Translate to abstract syntax tree AST Tree
  2. Traverse AST Tree, Abstract out the basic components of query QueryBlock
  3. Traverse QueryBlock, Execution tree OperatorTree
  4. The logic layer optimizer does OperatorTree Transformation , Merge unnecessary ReduceSinkOperator, Reduce shufle Data volume
  5. Traverse OperatorTree, Translated into MapReduce Mission
  6. The physical layer optimizer does MapReduce Change of task , Generate final execution plan

 Data warehouse Hive Quick start  -  offline & Real time data warehouse architecture

And ordinary SQL equally , We can pass in HQL prefix explain Keyword view HQL Implementation plan of :

explain select * from test where id > 10 limit 1000

Hive This sentence will be parsed into Operator,Operator Namely Hive The smallest unit after parsing , Every Operator In fact, they all correspond to one MapReduce Mission . for example , The above sentence is Hive After the parsing , It's made up of the following Operator form :
 Data warehouse Hive Quick start  -  offline & Real time data warehouse architecture

meanwhile ,Hive The optimizer is implemented for these Operator To optimize the order of , Help us improve query efficiency .Hive There are three main types of optimizers in :

  • RBO(Rule-Based Optimizer): Rule based optimizer
  • CBO(Cost-Based Optimizer): Cost based optimizer , This is the default optimizer
  • dynamic CBO: The way to dynamically optimize the execution plan generation process

Hive Basic use ( in ) Internal table / External table / Partition table / Bucket watch

Internal table :

And traditional databases Table Similar concepts , Corresponding HDFS Store directory on , When deleting a table , Delete metadata and table data . Data from internal tables , Will be stored in a HDFS In a specific position in , This can be specified through a configuration file . When you drop a table , Data files will also be deleted . It is applicable to the temporarily created intermediate table .

External table :

Point to something that already exists HDFS data , When deleting, only metadata information is deleted . For those who want to be in Hive In addition to the use of table data , When you delete External Table when , Just delete the metadata of the table , Its data has not been deleted . It is suitable for multi department data sharing . Use when creating tables create external table. Appoint external Key words can be used .

Partition table :

Partition Corresponding to the normal database to Partition Intensive index of columns , Set the data according to Partition Columns are stored in different directories , It is convenient for parallel analysis , Reduce the amount of data . The partition fields need to be specified when the partition table is created .

The difference between a partitioned field and a normal field : The partition field will be in HDFS A directory that generates a partition field name under the table directory , Normal fields do not , The query can be used as a normal field , Generally not directly related to the business .

Bucket watch :

On data hash, Put it in different file storage , Convenient sampling and join Inquire about . You can put the internal table , External and partitioned tables are further organized into bucket tables , The columns of the table can be passed through Hash The algorithm is further decomposed into different file stores .

It's easy to understand the concepts and application scenarios of internal and external tables , We need to focus on partitioned tables and bucket tables . Why should we create partition tables and bucket tables ?HQL adopt where Clause to restrict the conditional extraction of data , So instead of traversing a large table , It's better to split this large table into multiple small tables , And scan a small part of the table with the appropriate index , This idea is used in both partitioning and bucket distribution .

Partitions create physical directories , And it can have subdirectories ( Usually according to the time 、 Regional divisions ), The directory name is as follows Section name = value Form naming , for example :create_time=202011. The partition name is used as a pseudo column in the table , This way where Add partition restrictions to the sentence to scan only the data in the corresponding subdirectory . adopt partitioned by (feld1 type, ...) Create partition Columns .

The bucket can continue to partition small tables on the basis of partition , A bucket determines the distribution of data according to the hash value ( namely MapReducer Partition in ), For example, part of the data under the partition can be further divided into multiple buckets according to the partition , In this way, the hash value of the corresponding column is calculated and the bucket number is calculated , Just scan the data in the corresponding bucket . Divide the barrel through clustered by(field) into n buckets establish .

Next, we will briefly demonstrate the operations of these tables , First, upload the test data file generated in the previous section to hdfs in :

[root@hadoop01 ~]# hdfs dfs -mkdir /test
[root@hadoop01 ~]# hdfs dfs -put py-script/test.csv /test
[root@hadoop01 ~]# hdfs dfs -ls /test
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 11:34 /test/test.csv
[root@hadoop01 ~]# 

Internal table

Build table SQL:

CREATE TABLE test_table(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

take hdfs Data loaded into Hive in :

0: jdbc:hive2://localhost:10000> load data inpath '/test/test.csv' overwrite into table test_table;
No rows affected (0.169 seconds)
0: jdbc:hive2://localhost:10000> 

View the created table stored in hdfs Which directory of :

0: jdbc:hive2://localhost:10000> show create table test_table;
+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE TABLE `test_table`(                         |
|   `user_id` string,                                |
|   `user_name` string,                              |
|   `hobby` array<string>,                           |
|   `scores` map<string,int>)                        |
| ROW FORMAT SERDE                                   |
|   'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'  |
| WITH SERDEPROPERTIES (                             |
|   'collection.delim'=',',                          |
|   'field.delim'='|',                               |
|   'line.delim'='\n',                               |
|   'mapkey.delim'=':',                              |
|   'serialization.format'='|')                      |
| STORED AS INPUTFORMAT                              |
|   'org.apache.hadoop.mapred.TextInputFormat'       |
| OUTPUTFORMAT                                       |
|   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' |
| LOCATION                                           |
|   'hdfs://hadoop01:8020/user/hive/warehouse/hive_test.db/test_table' |
| TBLPROPERTIES (                                    |
|   'bucketing_version'='2',                         |
|   'transient_lastDdlTime'='1604893190')            |
+----------------------------------------------------+
22 rows selected (0.115 seconds)
0: jdbc:hive2://localhost:10000> 

stay hdfs You can view the data file in :

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/test_table
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 11:34 /user/hive/warehouse/hive_test.db/test_table/test.csv
[root@hadoop01 ~]# 

Delete table :

0: jdbc:hive2://localhost:10000> drop table test_table;
No rows affected (0.107 seconds)
0: jdbc:hive2://localhost:10000> 

see hdfs You will find that the storage directory corresponding to this table has also been deleted :

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/
Found 2 items
drwxr-xr-x   - root supergroup          0 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table
drwxr-xr-x   - root supergroup          0 2020-11-09 11:23 /user/hive/warehouse/hive_test.db/test
[root@hadoop01 ~]# 

External table

Build table SQL, The difference from internal tables is that external keyword :

CREATE external TABLE external_table(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

Load the data file into Hive in :

0: jdbc:hive2://localhost:10000> load data inpath '/test/test.csv' overwrite into table external_table;
No rows affected (0.182 seconds)
0: jdbc:hive2://localhost:10000> 

You will find hdfs Data files in will be moved to hive Under the directory of :

[root@hadoop01 ~]# hdfs dfs -ls /test
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_table
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv
[root@hadoop01 ~]# 

Delete table :

0: jdbc:hive2://localhost:10000> drop table external_table;
No rows affected (0.112 seconds)
0: jdbc:hive2://localhost:10000> 

see hdfs You will find that the storage directory corresponding to the table still exists :

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_table
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv
[root@hadoop01 ~]# 

Partition table

Create table statement :

CREATE TABLE partition_table(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
PARTITIONED BY (create_time string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

Load the data file into Hive in , And specify the partition :

0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table partition_table partition (create_time='202011');
No rows affected (0.747 seconds)
0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table partition_table partition (create_time='202012');
No rows affected (0.347 seconds)
0: jdbc:hive2://localhost:10000> 

The implementation is as follows sql, You can count the results from different partitions :

0: jdbc:hive2://localhost:10000> select count(*) from partition_table;
+------+
| _c0  |
+------+
| 16   |
+------+
1 row selected (15.881 seconds)
0: jdbc:hive2://localhost:10000> select count(*) from partition_table where create_time='202011';
+------+
| _c0  |
+------+
| 8    |
+------+
1 row selected (14.639 seconds)
0: jdbc:hive2://localhost:10000> select count(*) from partition_table where create_time='202012';
+------+
| _c0  |
+------+
| 8    |
+------+
1 row selected (15.555 seconds)
0: jdbc:hive2://localhost:10000> 

Partition table in hdfs Storage structure in :

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table
Found 2 items
drwxr-xr-x   - root supergroup          0 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time=202011
drwxr-xr-x   - root supergroup          0 2020-11-09 12:09 /user/hive/warehouse/hive_test.db/partition_table/create_time=202012
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table/create_time=202011
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time=202011/test.csv
[root@hadoop01 ~]# 

Bucket watch

Create table statement :

CREATE TABLE bucket_table(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
clustered by (user_name) sorted by (user_name) into 2 buckets
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

take test The data in the table is inserted into bucket_table in :

0: jdbc:hive2://localhost:10000> insert into bucket_table select * from test;
No rows affected (17.393 seconds)
0: jdbc:hive2://localhost:10000> 

Sample inquiry :
 Data warehouse Hive Quick start  -  offline & Real time data warehouse architecture

The bucket watch is in hdfs The storage directory of is as follows :

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/bucket_table
Found 2 items
-rw-r--r--   1 root supergroup        465 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000000_0
-rw-r--r--   1 root supergroup        281 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000001_0
[root@hadoop01 ~]# 

Hive Basic use ( Next ) Built in functions / Custom function / Realization UDF

Hive Common built-in functions :

  • String type :concat、substr、 upper、 lower
  • Time type :year、month、 day
  • Complex type :size、 get_json_object

The query engine comes with some functions to help us solve some complex data calculation or data conversion operations in the query process , However, sometimes the function function can not meet the needs of the business . At this time, we need to develop our own custom functions to help complete , This is called a user-defined function UDF(User-Defined Functions).Hive Support three types of custom functions :

  • UDF: Common user-defined functions . Used to process a line of input , Output a line of operations , similar Map operation . Such as converting string case , Get string length, etc
  • UDAF: User defined aggregate function (User-defined aggregate function), Used to process multiple lines of input , Output a line of operations , similar Reduce operation . such as MAX、COUNT function .
  • UDTF: User defined table generation function (User defined table-generating function), Used to process a line of input , Output multiple lines ( It's a watch ) The operation of , Not very often

UDF Function is actually a program that follows a certain interface specification . In the process of execution Hive take SQL Convert to MapReduce Program , In the process of implementation, we are implementing our UDF function .

This section simply demonstrates how to customize UDF function , First create an empty Maven project , Then add hive-exec rely on , Version is the same as the one you installed Hive The version should correspond to . complete pom The contents of the document are as follows :

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>hive-udf-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

First create an inheritance UDF Class , The custom function we implemented is to simply get the length of the field :

package com.example.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class StrLen extends UDF {

    public int evaluate(final Text col) {
        return col.getLength();
    }
}

The above custom functions can only support processing ordinary types of data , If you want to process complex types of data, you need to inherit GenericUDF, And implement its abstract methods . for example , We implement a pair of test data scores Field averaging function :

package com.example.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.text.DecimalFormat;

public class AvgScore extends GenericUDF {

    /**
     *  Name of function 
     */
    private static final String FUNC_NAME = "AVG_SCORE";

    /**
     *  The type of field the function acts on , Here is map type 
     */
    private transient MapObjectInspector mapOi;

    /**
     *  Control accuracy only returns two decimal places 
     */
    DecimalFormat df = new DecimalFormat("#.##");

    @Override
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
        //  In this method, we can do some pre verification , For example, check the number of function parameters 、 Check function parameter type 
        mapOi = (MapObjectInspector) objectInspectors[0];
        //  Specifies the output type of the function 
        return PrimitiveObjectInspectorFactory.javaDoubleObjectInspector;
    }

    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        //  The core logic of the function , Take out map Medium value To average , And return a Double The result value of type 
        Object o = deferredObjects[0].get();
        double v = mapOi.getMap(o).values().stream()
                .mapToDouble(a -> Double.parseDouble(a.toString()))
                .average()
                .orElse(0.0);

        return Double.parseDouble(df.format(v));
    }

    @Override
    public String getDisplayString(String[] strings) {
        return "func(map)";
    }
}

Package the project , And upload it to the server :

[root@hadoop01 ~/jars]# ls
hive-udf-test-1.0-SNAPSHOT.jar
[root@hadoop01 ~/jars]# 

take jar Packages uploaded to hdfs in :

[root@hadoop01 ~/jars]# hdfs dfs -mkdir /udfs
[root@hadoop01 ~/jars]# hdfs dfs -put hive-udf-test-1.0-SNAPSHOT.jar /udfs
[root@hadoop01 ~/jars]# hdfs dfs -ls /udfs
Found 1 items
-rw-r--r--   1 root supergroup       4030 2020-11-09 14:25 /udfs/hive-udf-test-1.0-SNAPSHOT.jar
[root@hadoop01 ~/jars]# 

stay Hive Add the jar package :

0: jdbc:hive2://localhost:10000> add jar hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar;
No rows affected (0.022 seconds)
0: jdbc:hive2://localhost:10000> 

Then register the temporary function , Temporary functions will only be in the current session Enter into force :

0: jdbc:hive2://localhost:10000> CREATE TEMPORARY FUNCTION strlen as "com.example.hive.udf.StrLen";
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000> CREATE TEMPORARY FUNCTION avg_score as "com.example.hive.udf.AvgScore";
No rows affected (0.008 seconds)
0: jdbc:hive2://localhost:10000> 

Use custom functions to handle :

0: jdbc:hive2://localhost:10000> select user_name, strlen(user_name) as length, avg_score(scores) as avg_score from test;
+------------+---------+------------+
| user_name  | length  | avg_score  |
+------------+---------+------------+
| Tom        | 3       | 80.25      |
| Jerry      | 5       | 77.5       |
| Jim        | 3       | 83.75      |
| Angela     | 6       | 84.5       |
| Ann        | 3       | 90.0       |
| Bella      | 5       | 69.25      |
| Bonnie     | 6       | 76.5       |
| Caroline   | 8       | 84.5       |
+------------+---------+------------+
8 rows selected (0.083 seconds)
0: jdbc:hive2://localhost:10000> 

Delete registered temporary functions :

0: jdbc:hive2://localhost:10000> drop temporary function strlen;
No rows affected (0.01 seconds)
0: jdbc:hive2://localhost:10000> drop temporary function avg_score;
No rows affected (0.009 seconds)
0: jdbc:hive2://localhost:10000> 

Temporary functions will only be in the current session Enter into force , If you need to register as a permanent function, you just need to put TEMPORARY Key words can be removed . As shown below :

0: jdbc:hive2://localhost:10000> create function strlen as 'com.example.hive.udf.StrLen' using jar 'hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar';
No rows affected (0.049 seconds)
0: jdbc:hive2://localhost:10000> create function avg_score as 'com.example.hive.udf.AvgScore' using jar 'hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar';
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000> 

To delete a permanent function is also to put TEMPORARY Key words can be removed . As shown below :

0: jdbc:hive2://localhost:10000> drop function strlen;
No rows affected (0.031 seconds)
0: jdbc:hive2://localhost:10000> drop function avg_score;
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000> 

Hive Storage structure - OrcFile

Hive Supported storage formats :
 Data warehouse Hive Quick start  -  offline & Real time data warehouse architecture

  • TextFile Is the default storage format , You can use simple separators for csv The type of file to be parsed . But in practice, it is usually used OrcFile Format , because ORCFile It's a columnar storage format , More suitable for big data query scenarios .

We all know that relational databases basically use row storage as the storage format , In the field of big data, column storage is more used , Because big data analysis scenarios usually need to read a large number of rows , But only a few columns are needed . That's why you usually use OrcFile As Hive The reason for the storage format of . thus it can be seen , Most of the application scenarios of big data are OLAP scene .

OLAP The features of the scene

Read more than write

Different from transaction processing (OLTP) Scene , For example, shopping carts are added to e-commerce scenes 、 Place an order 、 Payments, etc. need to be done a lot in place insert、update、delete operation , Data analysis (OLAP) The scenario is usually after importing data in batches , Flexible exploration of any dimension 、BI Tool insight 、 Report making, etc .

After the data is written once , Analysts need to try to mine data from all angles 、 analysis , Until you discover the commercial value of it 、 Business trends and other information . It's a trial and error 、 Constant adjustment 、 The process of continuous optimization , The number of data read is far more than the number of writes . This requires the underlying database to do a special design for this feature , Instead of blindly adopting the technical architecture of traditional database .

A wide watch , Read a lot of lines but a few columns , Smaller result set

stay OLAP Scene , There is usually one or several large tables with multiple columns , There are hundreds or even thousands of columns . When analyzing and processing data , Select a few of these columns as dimension columns 、 A few other columns are used as indicators , Then do aggregate calculation for the whole table or a large range of data . This process scans a lot of row data , But only a few of these columns are used . The result set of aggregate computing is compared with billions of raw data , And obviously much smaller .

Write data in bulk , And the data is not updated or less updated

OLTP Class services for delay (Latency) More demanding , To avoid business losses caused by customers waiting ; and OLAP Class business , Because of the huge amount of data , Usually more focused on write throughput (Throughput), Massive data should be imported as soon as possible . Once the import is complete , Historical data is often used as an archive , No more updates 、 Delete operation .

No business is needed , Data consistency requirements are low

OLAP Class business needs less transaction , Usually import historical log data , Or with a transactional database and real-time data synchronization from the transactional database . Most of the OLAP Systems support ultimate consistency .

Flexible , Not suitable for pre modeling

Analysis of the scene , Adjust the analysis dimension as the business changes 、 Mining methods , To discover the value of data as soon as possible 、 Update business metrics . The data warehouse usually stores a large amount of historical data , The cost of adjustment is very high . Although the pre modeling technology can speed up the calculation in specific scenarios , But it can't meet the flexible development needs of the business , Maintenance costs are too high .

Row storage and column storage

Comparison of row storage and column storage :
 Data warehouse Hive Quick start  -  offline & Real time data warehouse architecture

Unlike row storage, which stores data in each row continuously , Column storage stores the data of each column continuously . Compared to row storage , Columnar storage has many excellent features in analysis scenarios :

  1. As mentioned earlier , Analysis scenarios often require a large number of rows to be read, but a few columns . In the bank deposit mode , Data is stored continuously in rows , All column data is stored in a block in , The columns not involved in the calculation are listed in IO Read it all out , The read operation was seriously magnified . And in column storage mode , Just read the columns involved in the calculation , It's greatly reduced IO cost, Speed up queries .
  2. The data in the same column belongs to the same type , The compression effect is remarkable . Column memory often has a compression ratio as high as ten times or even higher , Save a lot of storage space , Reduced storage costs .
  3. Higher compression ratio means smaller data size, It takes less time to read the corresponding data from the disk .
  4. Free choice of compression algorithm . Different columns of data have different data types , The applicable compression algorithm is not the same . It can be used for different column types , Choose the most appropriate compression algorithm .
  5. High compression ratio , It means that the same size of memory can hold more data , System cache better .

OrcFile

OrcFile Storage format :
 Data warehouse Hive Quick start  -  offline & Real time data warehouse architecture

Orc Column storage advantages :

  • When querying, you only need to read the columns involved in the query , Reduce IO Consume , At the same time, save the statistics of each column , Implement partial predicate pushdown
  • Each column has the same data type , It can adopt its efficient compression algorithm for different data types
  • The columnar storage format assumes that the data will not change , Support fragmentation 、 Streaming read , Better adapt to the characteristics of distributed file storage

except Orc Outside ,Parquet It is also a common column storage format .Orc VS Parquet:

  • OrcFile and Parquet All are Apache Top projects
  • Parquet I won't support it ACID、 Update not supported ,Orc With limited support ACID And update
  • Parquet The compression capacity of is higher ,Orc The query efficiency of is higher

Offline warehouse VS Real time data warehouse

 Data warehouse Hive Quick start  -  offline & Real time data warehouse architecture

Offline warehouse :

  • Offline data warehouse is mainly based on Hive Wait for technology to build T+1 Of offline data
  • The incremental data is pulled and imported into Hive In the table
  • Create subject dimension data related to each business , External provision T+1 Data query interface

Offline data warehouse architecture :

  • The data source is imported into the offline data warehouse through offline mode
  • Data tier architecture :ODS、DWD、 DM
  • Downstream applications choose to read directly according to business requirements DM

Real time data warehouse :

  • Real time data warehouse is based on data collection tools , Write raw data to Kafka Wait for the data channel
  • Data is eventually written to something like HBase This supports fast read and write storage systems
  • Provide minute level to the outside world 、 Even second level query scheme

Real time data warehouse architecture :

  • Business real-time requirements continue to improve , Real time processing has gone from a minor part to a major part
  • Lambda framework : An acceleration layer is added to the offline big data architecture , Using stream processing technology to complete real-time index calculation
  • Kappa framework : With real-time event processing as the core , Unified data processing

The illustration Lambda Architecture data flow

Lambda framework (Lambda Architecture) By Twitter Engineer Nansen · Matts (Nathan Marz) Proposed big data processing architecture . This framework is proposed based on mats's BackType and Twitter Experience of distributed data processing system on .

Lambda Architecture enables developers to build large-scale distributed data processing systems . It has good flexibility and scalability , It also has good fault tolerance for hardware failure and human error .

Lambda The architecture consists of three layers : Batch layer (Batch Layer), Speed processing layer (Speed Layer), And the service layer for responding to queries (Serving Layer).
 Data warehouse Hive Quick start  -  offline & Real time data warehouse architecture

stay Lambda Architecture , Each level has its own task . Batch layer Storage management master data set ( Immutable data sets ) And pre batch calculated views . Batch layer Precomputing results using distributed processing systems that can handle large amounts of data . It processes all existing historical data to achieve data accuracy . This means that it is recalculated based on the complete data set , Can fix any mistakes , Then update the existing data view . Output is usually stored in a read-only database , The update completely replaces the existing pre calculated view .

Speed processing layer Will process new data in real time . The speed layer minimizes latency by providing a real-time view of the latest data . The data view generated by the speed layer may not be as accurate or complete as the view finally generated by the batch layer , But they are available almost immediately after receiving the data . And when the same data is processed in the batch layer , The data in the velocity layer can be replaced .

Essentially , The speed layer makes up for the data view lag caused by the batch layer . for instance , Each task in the batch layer requires 1 Hours to complete , And here 1 In an hour , We can't get the data view given by the latest tasks in the batch layer . The speed layer can process data in real time and give results , To make up for this 1 An hour lag .

All the results processed in batch layer and speed layer are stored in service layer , The service layer responds to queries by returning pre calculated data views or building data views from the speed layer processing .

All new user behavior data can flow into both batch and speed layers . The batch layer will persist the data and preprocess the data , Get the user behavior model we want and write it to the service layer . The speed layer also processes the behavior data of new users , Get a real-time user behavior model .

And when “ What kind of advertising should be put on users ” As a query (Query) When it comes , We query the batch output model saved in the service layer from the service layer , It also queries the real-time behavior processed in the speed layer , So we can get a complete history of user behavior .

A query is shown in the following figure , Both through the batch layer to take into account the integrity of the data , The high latency of batch processing layer can also be compensated by speed layer , Make the whole query real-time .
 Data warehouse Hive Quick start  -  offline & Real time data warehouse architecture


Kappa framework VS Lambda

Lambda The lack of Architecture

although Lambda The architecture is very flexible , And it can be used in many application scenarios , But in practice ,Lambda There are also some shortcomings in the architecture , Its maintenance is very complicated .

Use Lambda Architecture , Architects need to maintain two complex distributed systems , And ensure that they logically produce the same output to the service layer . Let's give you an example , We are deploying Lambda When it comes to architecture , Can deploy Apache Hadoop To the batch layer , Deploy at the same time Apache Flink To the velocity layer .

We all know , Programming in a distributed framework is very complicated , In particular, we will carry out special optimization for different frameworks . So almost every architect agrees with ,Lambda The architecture has certain complexity to maintain in practice .

How to solve this problem ? Let's think about it first , What is the root cause of the complexity of maintaining this architecture ?

maintain Lambda The complexity of architecture is that we need to maintain two systems architecture at the same time : Batch layer and speed layer . We have already said , Adding a batch layer to the architecture is due to the high accuracy of the results obtained from the batch layer , The speed layer is added because it has low latency when processing large-scale data .

Then can we improve the architecture of one layer , Let it have another layer of architectural features ? for example , Improve the system of batch layer so that it has lower latency , Or improve the speed layer system , Make the data view it produces more accurate and closer to the historical data ?

Another architecture commonly used in large-scale data processing ——Kappa framework (Kappa Architecture), It is under such thinking that .

Kappa framework

Kappa Architecture is made up of LinkedIn Jay, former chief engineer of · Cresps (Jay Kreps) An idea of architecture proposed . Cresps is a famous open source project ( Include Apache Kafka and Apache Samza Such a flow processing system ) One of the authors of , And now Confluent Big data's CEO.

Krapps made an improvement Lambda Architecture Perspective :

Can we improve Lambda System performance of speed layer in architecture , So that it can also deal with the integrity and accuracy of data ? Can we improve Lambda Speed layer in architecture , So that it can process real-time data , At the same time, it also has the ability to reprocess the previously processed historical data when the business logic is updated ?

Based on his many years of experience in architecture, he found that , We can make such improvements . We know things like Apache Kafka Such a flow processing platform has the function of keeping data log permanently . adopt Kafka This characteristic of , We can reprocess the historical data deployed in the speed layer architecture .

I'll take Kafka For example, we will introduce the whole process of the new architecture .

First step , Deploy Kafka, And set the retention period of data log (Retention Period).

The retention period here refers to the time interval of historical data that you want to be able to reprocess . for example , If you want to reprocess historical data for up to one year , Then we can put Apache Kafka The retention period in is set to 365 God . If you want to be able to process all the historical data , Then we can put Apache Kafka The retention period in is set to “ permanent (Forever)”.

The second step , If we need to improve the existing logic algorithm , That means we need to reprocess the historical data . All we need to do is restart one Kafka Examples of assignments (Instance). This job instance will start all over again , Recalculate the preserved historical data , And output the results to a new data view .

We know Kafka The bottom layer is the use of Log Offset To determine which data block has been processed , So just put Log Offset Set to 0, The new job instance will start processing historical data again .

The third step , When the data processed by the new data view catches up with the old data view , Our application can then switch to read from the new data view .

Step four , Stop the old version of the job instance , And delete the old data view .

The architecture is shown in the figure below .
 Data warehouse Hive Quick start  -  offline & Real time data warehouse architecture

And Lambda The difference in architecture is ,Kappa The architecture removes the batch layer architecture , And only the velocity layer . You only need to reprocess the data when the business logic changes or the code changes .Kappa Architecture unifies the way data is processed , No longer maintain offline and real-time two sets of code logic .

Kappa The lack of Architecture

Kappa Architecture also has its own shortcomings . because Kappa The architecture only keeps the speed layer and lacks the batch layer , Processing large-scale data on the speed layer may lead to data update errors , This requires us to spend more time dealing with these error exceptions . If the demand changes or historical data needs to be reprocessed, it must be done through upstream replay . And the throughput of reprocessing history will be lower than that of batch processing .

And a little bit more ,Kappa Batch and stream processing of the architecture are put on the speed layer , This leads to an architecture that uses the same set of code to handle algorithmic logic . therefore Kappa The architecture is not suitable for scenarios where batch and stream code logic are inconsistent .

Lambda VS Kappa

 Data warehouse Hive Quick start  -  offline & Real time data warehouse architecture


The real-time data warehouse architecture of mainstream large companies

Ali rookie real-time data warehouse

 Data warehouse Hive Quick start  -  offline & Real time data warehouse architecture
 Data warehouse Hive Quick start  -  offline & Real time data warehouse architecture

Meituan real time data warehouse

 Data warehouse Hive Quick start  -  offline & Real time data warehouse architecture

Real time data warehouse construction features

  • The overall architecture is designed as OLAP Query sharing pressure
  • Complex computing is done in real-time computing layer , Avoid giving OLAP Query brings too much pressure
  • Sum up the calculation by OLAP Data query engine for
  • In the whole architecture, real-time computing is common yes Spark+Flink coordination
  • Message queue Kafka A big family , coordination HBase、ES、 Mysq| Put the data on the disk
  • OLAP field Presto、Druid、 Clickhouse、 Greenplum And so on

版权声明
本文为[osc_3dm2dqof]所创,转载请带上原文链接,感谢

Scroll to Top