编程知识 cdmana.com

Building a streaming data lake using Flink Hudi

This paper introduces Flink Hudi The original model based on mini-batch The incremental computing model is constantly optimized and evolved . The user can go through Flink SQL take CDC Data is written in real time Hudi Storage , And in the upcoming 0.9 edition Hudi Native support CDC format. The main contents are :

  1. background
  2. The incremental ETL
  3. demonstration

One 、 background

Near real time

from 2016 Year begins ,Apache Hudi The community began to pass Hudi Of UPSERT Ability to explore use cases for near real-time scenarios [1]. adopt MR/Spark Batch model for , Users can achieve hourly data injection HDFS/OSS. In a pure real-time scenario , Users through the stream computing engine Flink + KV/OLAP The storage architecture can achieve end-to-end seconds (5 Minutes of class ) Real time analysis . However, on the second scale (5 Minutes of class ) There are still a large number of use cases in the hourly scenario , We call it NEAR-REAL-TIME ( Near real time ).

img

In practice, a large number of cases belong to the category of near real-time :

  1. Minute level big screen ;
  2. Various BI analysis (OLAP);
  3. Machine learning minute level feature extraction .

Incremental calculation

The near real-time solution is currently relatively open .

  • The delay of stream processing is low , however SQL Of pattern It's quite fixed , Query side capabilities ( Indexes 、ad hoc) lack ;

  • The data warehouse of batch processing has rich capacity, but the data delay is large .

therefore Hudi The community proposed based on mini-batch Incremental computing model :

The incremental Data sets => The incremental The result of the calculation is merge Saved results => External storage

This model is stored through the lake snapshot Pull incremental data sets ( Two commits Previous data sets ), adopt Spark/Hive The results of the incremental calculation of the equal batch framework ( For example, simple count) Again merge To saved results .

Core issues

The core problem of incremental model :

  1. **UPSERT Ability :** similar KUDU and Hive ACID,Hudi It also provides minute level update capability ;
  2. ** The incremental consumption :**Hudi Much stored through the lake snapshots Provide incremental pull .

be based on mini-batch The incremental computing model can improve the delay of some scenes 、 Save computing costs , But there's a big limitation : Yes SQL Of pattern There are requirements . Because the calculation is a batch , The batch calculation itself does not maintain the status , This requires that the calculated indicators can be easily merge, ordinary count、sum You can do it , however avg、count distinct These still need to pull the full amount of data for recalculation .

With the popularity of stream computing and real-time data warehouse ,Hudi The community is also actively embracing change , The original model based on mini-batch The incremental computing model is constantly optimized and evolved : stay 0.7 This version introduces streaming data into the lake , stay 0.9 Version supports native CDC format.

Two 、 The incremental ETL

DB Data into the lake

With CDC Maturity of Technology ,debezium In this way CDC Tools are becoming more and more popular ,Hudi The community has also integrated streaming , Streaming reading ability . The user can go through Flink SQL take CDC Data is written in real time Hudi Storage :

img

  • Users can use Flink CDC connector Direct will DB Data import Hudi;

  • You can also put CDC Data import Kafka, Re pass Kafka connector Import Hudi.

The second scheme has better fault tolerance and scalability .

Data Lake CDC

In the coming 0.9 edition ,Hudi Native support CDC format, One record All change records can be saved , Based on this ,Hudi And stream computing system , Can stream read CDC data [2]:

img

The source of the CDC All message changes of the stream are saved after entering the lake , Used for streaming consumption .Flink Real time accumulation of calculation results (state), By streaming Hudi Synchronize calculated changes to Hudi Lake storage , Then continue docking Flink Flow consumption Hudi Stored changelog, Realize the next level of stateful Computing . Near real-time end-to-end ETL pipeline:

img

This architecture will be end-to-end ETL The delay is reduced to minutes , And the storage format of each layer can be through compaction Compressed into columns (Parquet、ORC) In order to provide OLAP Analytical ability , Due to the openness of the data Lake , The compressed format can be connected to various query engines :Flink、Spark、Presto、Hive etc. .

a sheet Hudi The data Lake table has two forms :

  • ** Surface morphology :** Query the latest snapshot results , At the same time, it provides an efficient column storage format
  • ** Flow pattern :** Streaming consumption changes , You can specify any bit stream after reading changelog

3、 ... and 、 demonstration

We passed a paragraph Demo demonstration Hudi Two forms of tables .

Environmental preparation

  • Flink SQL Client

  • Hudi master pack hudi-flink-bundle jar

  • Flink 1.13.1

Here, prepare a paragraph in advance debezium-json Format CDC data

{"before":null,"after":{"id":101,"ts":1000,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}
{"before":null,"after":{"id":102,"ts":2000,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":103,"ts":3000,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":104,"ts":4000,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":105,"ts":5000,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":106,"ts":6000,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":107,"ts":7000,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":108,"ts":8000,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":109,"ts":9000,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":{"id":106,"ts":6000,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"after":{"id":106,"ts":10000,"name":"hammer","description":"18oz carpenter hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589361987000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":362,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589361987936,"transaction":null}
{"before":{"id":107,"ts":7000,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"after":{"id":107,"ts":11000,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362099000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362099505,"transaction":null}
{"before":null,"after":{"id":110,"ts":12000,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362210230,"transaction":null}
{"before":null,"after":{"id":111,"ts":13000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362243000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1394,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362243428,"transaction":null}
{"before":{"id":110,"ts":12000,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"after":{"id":110,"ts":14000,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362293000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1707,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362293539,"transaction":null}
{"before":{"id":111,"ts":13000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"after":{"id":111,"ts":15000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362330000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2090,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362330904,"transaction":null}
{"before":{"id":111,"ts":16000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"after":null,"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362344000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"d","ts_ms":1589362344455,"transaction":null}
 Copy code 

adopt Flink SQL Client Create a table to read CDC Data files

Flink SQL> CREATE TABLE debezium_source(
>   id INT NOT NULL,
>   ts BIGINT,
>   name STRING,
>   description STRING,
>   weight DOUBLE
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = '/Users/chenyuzhao/workspace/hudi-demo/source.data',
>   'format' = 'debezium-json'
> );
[INFO] Execute statement succeed.
 Copy code 

perform SELECT Observations , You can see that there are 20 Bar record , There are some in the middle UPDATE s, The last message is DELETE

Flink SQL> select * from debezium_source;
+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+
| op |          id |                   ts |                           name |                    description |                         weight |
+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |         101 |                 1000 |                        scooter |          Small 2-wheel scooter |              3.140000104904175 |
| +I |         102 |                 2000 |                    car battery |                12V car battery |              8.100000381469727 |
| +I |         103 |                 3000 |             12-pack drill bits | 12-pack of drill bits with ... |              0.800000011920929 |
| +I |         104 |                 4000 |                         hammer |        12oz carpenter's hammer | 0.75 | | +I | 105 | 5000 | hammer | 14oz carpenter's hammer |                          0.875 |
| +I |         106 |                 6000 |                         hammer |        16oz carpenter's hammer | 1.0 | | +I | 107 | 7000 | rocks | box of assorted rocks | 5.300000190734863 | | +I | 108 | 8000 | jacket | water resistent black wind ... | 0.10000000149011612 | | +I | 109 | 9000 | spare tire | 24 inch spare tire | 22.200000762939453 | | -U | 106 | 6000 | hammer | 16oz carpenter's hammer |                            1.0 |
| +U |         106 |                10000 |                         hammer |          18oz carpenter hammer |                            1.0 |
| -U |         107 |                 7000 |                          rocks |          box of assorted rocks |              5.300000190734863 |
| +U |         107 |                11000 |                          rocks |          box of assorted rocks |              5.099999904632568 |
| +I |         110 |                12000 |                         jacket | water resistent white wind ... |            0.20000000298023224 |
| +I |         111 |                13000 |                        scooter |           Big 2-wheel scooter  |              5.179999828338623 |
| -U |         110 |                12000 |                         jacket | water resistent white wind ... |            0.20000000298023224 |
| +U |         110 |                14000 |                         jacket | new water resistent white w... |                            0.5 |
| -U |         111 |                13000 |                        scooter |           Big 2-wheel scooter  |              5.179999828338623 |
| +U |         111 |                15000 |                        scooter |           Big 2-wheel scooter  |              5.170000076293945 |
| -D |         111 |                16000 |                        scooter |           Big 2-wheel scooter  |              5.170000076293945 |
+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+
Received a total of 20 rows
 Copy code 

establish Hudi surface , Here, set the form of the table to MERGE_ON_READ And turn on changelog Mode properties changelog.enabled

Flink SQL> CREATE TABLE hoodie_table(
>   id INT NOT NULL PRIMARY KEY NOT ENFORCED,
>   ts BIGINT,
>   name STRING,
>   description STRING,
>   weight DOUBLE
> ) WITH (
>   'connector' = 'hudi',
>   'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1',
>   'table.type' = 'MERGE_ON_READ',
>   'changelog.enabled' = 'true',
>   'compaction.async.enabled' = 'false'
> );
[INFO] Execute statement succeed.
 Copy code 

Inquire about

adopt INSERT Statement to import data into Hudi, Turn on streaming mode , And execute the query observation

Flink SQL> select * from hoodie_table/*+ OPTIONS('read.streaming.enabled'='true')*/;
+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+
| op |          id |                   ts |                           name |                    description |                         weight |
+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |         101 |                 1000 |                        scooter |          Small 2-wheel scooter |              3.140000104904175 |
| +I |         102 |                 2000 |                    car battery |                12V car battery |              8.100000381469727 |
| +I |         103 |                 3000 |             12-pack drill bits | 12-pack of drill bits with ... |              0.800000011920929 |
| +I |         104 |                 4000 |                         hammer |        12oz carpenter's hammer | 0.75 | | +I | 105 | 5000 | hammer | 14oz carpenter's hammer |                          0.875 |
| +I |         106 |                 6000 |                         hammer |        16oz carpenter's hammer | 1.0 | | +I | 107 | 7000 | rocks | box of assorted rocks | 5.300000190734863 | | +I | 108 | 8000 | jacket | water resistent black wind ... | 0.10000000149011612 | | +I | 109 | 9000 | spare tire | 24 inch spare tire | 22.200000762939453 | | -U | 106 | 6000 | hammer | 16oz carpenter's hammer |                            1.0 |
| +U |         106 |                10000 |                         hammer |          18oz carpenter hammer |                            1.0 |
| -U |         107 |                 7000 |                          rocks |          box of assorted rocks |              5.300000190734863 |
| +U |         107 |                11000 |                          rocks |          box of assorted rocks |              5.099999904632568 |
| +I |         110 |                12000 |                         jacket | water resistent white wind ... |            0.20000000298023224 |
| +I |         111 |                13000 |                        scooter |           Big 2-wheel scooter  |              5.179999828338623 |
| -U |         110 |                12000 |                         jacket | water resistent white wind ... |            0.20000000298023224 |
| +U |         110 |                14000 |                         jacket | new water resistent white w... |                            0.5 |
| -U |         111 |                13000 |                        scooter |           Big 2-wheel scooter  |              5.179999828338623 |
| +U |         111 |                15000 |                        scooter |           Big 2-wheel scooter  |              5.170000076293945 |
| -D |         111 |                16000 |                        scooter |           Big 2-wheel scooter  |              5.170000076293945 |
 Copy code 

You can see Hudi The change record of each line is kept , Include change log Of operation type , Here we open TABLE HINTS function , It is convenient to dynamically set table parameters .

Continue to use batch Reading mode , Execute the query and observe the output , You can see that the changes in the middle are merged .

Flink SQL> select * from hoodie_table;
2021-08-20 20:51:25,052 INFO  org.apache.hadoop.conf.Configuration.deprecation             [] - mapred.job.map.memory.mb is deprecated. Instead, use mapreduce.map.memory.mb
+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+
| op |          id |                   ts |                           name |                    description |                         weight |
+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+
| +U |         110 |                14000 |                         jacket | new water resistent white w... |                            0.5 |
| +I |         101 |                 1000 |                        scooter |          Small 2-wheel scooter |              3.140000104904175 |
| +I |         102 |                 2000 |                    car battery |                12V car battery |              8.100000381469727 |
| +I |         103 |                 3000 |             12-pack drill bits | 12-pack of drill bits with ... |              0.800000011920929 |
| +I |         104 |                 4000 |                         hammer |        12oz carpenter's hammer | 0.75 | | +I | 105 | 5000 | hammer | 14oz carpenter's hammer |                          0.875 |
| +U |         106 |                10000 |                         hammer |          18oz carpenter hammer |                            1.0 |
| +U |         107 |                11000 |                          rocks |          box of assorted rocks |              5.099999904632568 |
| +I |         108 |                 8000 |                         jacket | water resistent black wind ... |            0.10000000149011612 |
| +I |         109 |                 9000 |                     spare tire |             24 inch spare tire |             22.200000762939453 |
+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+
Received a total of 10 rows
 Copy code 

polymerization

Bounded Source Calculation in read mode count(*)

Flink SQL> select count (*) from hoodie_table;
+----+----------------------+
| op |               EXPR$0 |
+----+----------------------+
| +I |                    1 |
| -U |                    1 |
| +U |                    2 |
| -U |                    2 |
| +U |                    3 |
| -U |                    3 |
| +U |                    4 |
| -U |                    4 |
| +U |                    5 |
| -U |                    5 |
| +U |                    6 |
| -U |                    6 |
| +U |                    7 |
| -U |                    7 |
| +U |                    8 |
| -U |                    8 |
| +U |                    9 |
| -U |                    9 |
| +U |                   10 |
+----+----------------------+
Received a total of 19 rows
 Copy code 

Streaming Calculation in read mode count(*)

Flink SQL> select count (*) from hoodie_table/*+OPTIONS('read.streaming.enabled'='true')*/;
+----+----------------------+
| op |               EXPR$0 |
+----+----------------------+
| +I |                    1 |
| -U |                    1 |
| +U |                    2 |
| -U |                    2 |
| +U |                    3 |
| -U |                    3 |
| +U |                    4 |
| -U |                    4 |
| +U |                    5 |
| -U |                    5 |
| +U |                    6 |
| -U |                    6 |
| +U |                    7 |
| -U |                    7 |
| +U |                    8 |
| -U |                    8 |
| +U |                    9 |
| -U |                    9 |
| +U |                    8 |
| -U |                    8 |
| +U |                    9 |
| -U |                    9 |
| +U |                    8 |
| -U |                    8 |
| +U |                    9 |
| -U |                    9 |
| +U |                   10 |
| -U |                   10 |
| +U |                   11 |
| -U |                   11 |
| +U |                   10 |
| -U |                   10 |
| +U |                   11 |
| -U |                   11 |
| +U |                   10 |
| -U |                   10 |
| +U |                   11 |
| -U |                   11 |
| +U |                   10 |
 Copy code 

You can see batch and streaming The calculation results under the mode are consistent .

Reference

[1] www.oreilly.com/content/ube…

[2] hudi.apache.org/blog/2021/0…

版权声明
本文为[Flink_ China]所创,转载请带上原文链接,感谢
https://cdmana.com/2021/09/20210909124112694y.html

Scroll to Top