After the Netease cloud music real-time warehouse platform goes online , After a year and a half of development , The overall real-time data warehouse has begun to take shape , We have a real-time warehouse table 300+, The number of running tasks is 1200+. among 1000 The task of the left and right is SQL Mission , Kafka The total outlet flow reaches 18GB/S, The total number of users has reached 200+.
The growth of data volume and users has also brought more and more challenges to the ease of use and stability of the data platform , contain Kafka The stability of 、 The stability of clusters 、 The challenges of operation and maintenance and many early technical debts ; Business growth , Exposed the weakness of infrastructure , It has also accumulated a lot of experience in platform construction and operation and maintenance .
The overall functions of our platform can be referred to 《 Cloud music real-time data warehouse technology transformation and some future plans 》, Here we will mainly introduce some of our latest work ：
“ My task has been delayed , You can't expand it anyway , Why is that ？”
This is a problem we often encounter in our daily operation and maintenance work , It is often a time-consuming problem . There are many reasons for this problem , To solve this problem , We have done some work to enhance our O & M capability .
IO The indicators are perfect
IO The problem is one of the reasons why the above problems often occur , Include message read efficiency 、 Dimension table JOIN efficiency 、SINK Efficiency and so on , Performance and stability of third-party storage , Directly affect the stability of real-time tasks , In order to quickly locate related problems , We added a lot IO relevant Metric indicators .
1. Kafka Some performance indicators on the consumer side
2. Read deserialization index
Proportion of deserialization errors
stay Format On the other hand, we developed a set of Format agent , Support without modifying the original format In the case of code , Report relevant metirc indicators , Ignore functions such as error data . Just add properties format.proxy Specifying a proxy class can support different ways of Format encapsulation .
For example, we specify format.proxy=magina, The above performance indicators can be reported ; Appoint format.proxy=ds You can support parsing ds Encapsulated log format , Use the proxy Format analysis DS Medium Body part , There is no need to separate DS Encapsulated log format development Format, Performance related indicators will also be reported , Support functions such as ignoring error messages .
3. Dimension table JOIN Related indicators
In the dimension table JOIN Side , We added ：
Response time of data query
Hit rate of local cache
The percentage of queries retried
success JOIN The proportion of data on
5. Some performance indicators of data writing
Data serialization RT
Average response time of data writing to external data source, etc
A complete set of IO Realization of relevant indicators , We are all in Flink Connector The top-level interface of has made some public encapsulation , Refactored the related Connector Code for , As long as we implement it according to our own interface Connector, No need to care about the reporting of detailed indicators , These indicators will be automatically reported .
Kafka Partition problem
Kafka The limitation of partitions is often the reason why our program performance cannot be extended , Out of Exactly Once The implementation of the 、 Read performance 、 And read stability considerations ,Flink Read by active pulling Kafka news , This way limits our ability to read Kafka Number of tasks for message , Greatly limit the expansion of our mission performance , With this case For example ：
SET 'table.exec.state.ttl' = '1h';
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '10s';
SET 'table.exec.mini-batch.size' = '100000';
INSERT INTO music_kudu_online.music_kudu_internal.ads_ab_rtrs_user_metric_hour
from_unixtime(`timestamp`, 'yyyy-MM-dd') as dt,
from_unixtime(`timestamp`, 'HH') as `hour`,
os, sceneid, parent_exp, `exp`, exp_type, userid,
INNER JOIN abtest_online.abtest.abtest_sence_metric_relation
FOR SYSTEM_TIME AS OF user_metric.proctime
ON ods_rtrs_ab_log.sceneid = abtest_sence_metric_relation.sceneid
GROUP BY from_unixtime(`timestamp`, 'yyyy-MM-dd'),
os, sceneid, parent_exp, `exp`, exp_type, userid
This is a real-time fully aggregated task , In the original FLINK In this paragraph SQL Executive DAG It looks something like this ：
If we read the stream table ods_rtrs_ab_log Yes 5 Zones , our SQL There are seven concurrent tasks , Due to Kafka The number of partitions , add FLINK Optimization of its own operation chain , Reading of our messages 、 Dimension table JOIN、MINI BATCH All the operations were Kafka The impact of zoning , Can't expand , Especially for dimension tables JOIN such IO In terms of operation , The concurrency of tasks seriously affects the performance of the whole program , At this time, I can only expand the capacity Kafka Number of partitions to improve performance .
But this operation is very heavy , And it may affect other tasks of reading this flow table ; To solve this problem , We are right. Kafka Of Connector Made some changes , Support adding one more step through configuration Shuffle operation , For example, in the above configuration, we added the configuration ：
'connector.rebalance.keys' = 'sceneid,parent_exp,userid'
The message will be read according to sceneid,parent_exp,userid And other fields hash Fragmentation , This greatly improves the performance and scalability of the whole program , And by specifying the field keyBy operation , It can greatly improve the dimension table JOIN Cache hit rate , Improve MINI BATCH Performance and efficiency of .
In addition to the above configuration , We also support adding random Rebalance operation 、Rescale Operation and disassembly of parsing behavior , To further improve the overall program performance , What needs to be noted here is the additional Shuffle operation , It will bring more thread and network overhead , While configuring these operations, you need to pay attention to the load of the machine , Add extra Shuffle Although operation can improve the scalability of the program , But because of the extra network and thread overhead , If the performance of the machine itself is not good , It's likely to backfire , Performance becomes worse with the same resources , This needs to be configured according to your own program and environment .
Kafka Use optimization
With the rapid growth of traffic Kafka The stability of is also the main problem we face , Include Kafka Cabinet bandwidth problem 、 Cross machine room bandwidth problem 、Kafka Jitter problem of expansion and contraction 、 also Kafka Configuration problems, etc , Basically, we have encountered all the problems we can encounter , In order to solve the above problems, we have done the following work ：
1. Develop image services , Resolve bandwidth issues , Ensure high priority tasks
We go through FLINK I have developed a set of image services , A set of modules are deployed between different computer room modules Kafka colony , Synchronize two sets through the image service Kafak Cluster data , Lord Kafka It is more important to provide P0 Level of real-time tasks , Other tasks that are not particularly important are reading the data of the mirrored cluster .
We go through Yarn Label technology , The machine room where the task is located is controlled through the selection of different queues , To reduce the consumption of cross machine room bandwidth , In order to facilitate users to switch between different Kafka colony , We are Flink Some modifications have also been made on the flow meter side , Supports the simultaneous mounting of multiple streams in one stream table Kafka colony , You can switch freely through simple configuration Kafka colony , After a round of task sorting and switching ,Kafka Bandwidth usage has been greatly improved ：
2. Kafka Perfect monitoring
In daily work , We found a lot of development right Kafka I don't know much about , Due to lack of experience in operation and maintenance, the overall operation and maintenance needs to be improved at the initial stage Kafka Our control is not so strict , It leads to many problems in use . So we integrated the inside of the music Kafka Monitoring service data , Combined with the mission blood of our platform , Developed its own set of Kafka Monitoring service .
At present, the whole system is still relatively primary , Except for the connection Kafka、 Flow meter 、 And tasks , We also actively monitor the following situations ：
Kafka Topic The rationality of the number of partitions , It mainly monitors the situation that the number of message queue partitions is too small or too many , Mainly too few cases , Prevent the number of partitions from being too small , The performance of downstream tasks can't keep up ;
Kafka Partition data production equilibrium problem ： To prevent Kafka The imbalance of partition data leads to poor processing performance of downstream tasks ;
Kafka Partition data consumption equilibrium problem ： To prevent Kafka The partition itself has changed , The downstream task does not turn on partition awareness , As a result, some data are not consumed ;
Flow surge and drop alarm ： Critical queue traffic alarm , Ensure the quality of real-time data .
Kafka Version update ： In order to solve itself Kafka Stability of expansion 、 Resource isolation , Through our music public technology team , stay Kafka 2.X Some secondary development work has been done on the basis of version , take Kafka The whole service is supported by platform , Support Topic The smooth expansion of , Support resource isolation .
similar YARN Of LAEBL technology , Support for different TOPIC The division is different region Machine , Perfect message mirroring service , And support offset Copy ; A unified Kafka Operation and maintenance monitoring platform , This part will be described in detail in subsequent articles .
3. Construction of partition flow table technology
After the real-time data warehouse goes online , We found that the following situations greatly affect the stability of the program and the ease of use of the flow table ：
（1） Most of the time, we just need a flow table 1% The data of , But because there is no way to read on demand , So we have to consume a lot of resources to parse and read other 99% The data of , This leads to the consumption of a large amount of resource bandwidth , A lot of resources are wasted , And in itself SQL The development method itself has no way to parse logs on demand , As a result, we have to parse every message completely , This leads to further consumption of computing resources .
（2） When we follow experience and business , Will be big TOPIC Split into many small TOPIC when , A table becomes many small tables , Users must have a lot of experience and knowledge to understand these schema Which messages are contained in the same small table , Poor ease of use , Such a design does not conform to the overall design logic of the data warehouse , If you want to unify metadata in batch flow tables in the future , The whole is also unlikely
In the offline scenario, we have many means to solve the above problems , Reduce unnecessary IO, Such as data bucket 、 Store orderly data utilization Parquet The ability to push down queries 、 Making partition tables and other means can solve the above problems . But the real-time table Case There seems to be no good method in the existing open scheme ; So in order to solve the above problems , We developed a partitioning scheme for flow tables , Overall and HIVE The partition implementation idea of the table is similar ：
We use Flink Table Souce Provided SupportsFilterPushDown The interface implements a set of its own real-time stream table partition scheme , One partition corresponds to one topic, Push and filter unnecessary partitions through the user's query conditions , Thus, unnecessary data reading is reduced ; At present, the first version has been launched , Preliminarily split the cloud music exposure log , By the way, try to use AVRO Replace the previous data format JSON Format , In practice, the optimization effect is obvious ：
（1） Use AVRO Formats can basically bring at least 30+% Bandwidth optimization , The message parsing performance is twice that of the original log format of music .
（2） Use partitioned flow tables , We have initially moved 4 A consumption task of exposure log , It has been saved 7 Taiwan physics machine , Save computing and bandwidth resources on average 75% above .
Although these are more extreme Case, But from these examples, we can predict that after the partitioned flow table technology is fully rolled out , If you can use it , It is definitely an optimization that can bring qualitative change .
Real time data has always been a big goal of data warehouse construction of our cloud music data platform team , Behind this goal, the integration of batch and flow is also something we can't get around “ Noun ”、“ Concept ”、“ technology ”、 Or it's a “ product ”. Before we officially start sharing our work , First of all, let's share that I once met my algorithm classmate in the elevator room , Then the dialogue with the algorithm students ：
From this conversation, we can see , Algorithmic students don't want any batch flow technology , What they want is real-time, ready-made and available warehouse data , To improve their development efficiency , Behind the integration of batch flow , What are the demands of business parties with different roles ？
For operations 、 product 、 Boss 、 For analysts ：
What they want to see is accurate, real-time and analyzable report data , The key point is analyzability . When the result data fluctuates abnormally , We have to have real-time detailed data to provide analysis and query , To investigate the causes of abnormal fluctuations . When the boss has some new ideas , When you want to do the next analysis of a ready-made report , We have to be able to provide detailed and analyzable data for analysis and give results .
In terms of real-time daily life statistics , Our common means is to send users ID Stored Redis such KV Storage to do weight removal , Or approximate weight removal , Then calculate the real-time daily life data , But when there are abnormal fluctuations in the day , because Reids The data is not analyzable . So it's hard to give a reason quickly , I can't do analysis on the same day , This scheme and result are obviously unqualified .
For data warehouse development ：
Unified real time / Offline data warehouse metadata management 、 The first mock exam 、 Unified storage , Reduce the construction cost of warehouse operation and maintenance , Improve the ease of use of the overall data warehouse ;
Unified development code , A unified set SQL Solve offline / Real time development problems , Reduce development and operation cost , Completely solve the problem because the business understanding is different 、 The difference of real-time offline data results caused by different logic .
For algorithm students ：
There's real time / Offline unified warehouse table can be used , The first mock exam , Lower the threshold of business understanding , Improve the ease of use of the overall data warehouse , Easy to use Data Warehouse Metadata Management Service , It is convenient for algorithm students to carry out secondary feature development , Improve the development efficiency of the model . Provide accurate, real-time and analyzable algorithm model effect data , Improve the efficiency of algorithm classmate model iteration
To sum up, the goal of batch flow integration mainly includes three aspects ：
Unified code ： A set of SQL Complete the development requirements of real-time and offline related businesses ;
Unified data warehouse metadata ： A table can provide offline reading and real-time reading at the same time , Unified model of the first mock exam ;
Real time report data ： This is different from the Unified Data Warehouse Metadata , Product report data needs to provide second level real-time result query capability , The unified data warehouse often only needs real-time storage , Yes OLAP Query efficiency , No, report data is not so sensitive .
1. Unified code
Due to real-time SQL There is no special maturity in itself , A lot of logic that is easy to implement in offline scenarios , In real-time scenarios, it is either impossible to achieve , Or there is a problem with stability .
At present, the industry is still exploring , At present, Alibaba's main way is to use FLINK A set of engines to solve the real-time offline unified problem SQL The problem of , But at present, it is also in practice , At the top ADS In the implementation of layer business logic, some real-time data are shielded through the construction of bottom data warehouse SQL The question of ability , Achieve a unified set of product report development SQL. This is also the direction we can try in the future , In addition to trying to unify the upper level report development SQL outside , We have also done some work and planning in the unified code ：
（1） Unified UDF, Integrate and upgrade the platform framework to FLINK1.12 The new version , Unified offline real-time set UDF;
（2） Unified metadata management ： stay FlinkSQL On the other hand, we inherit the metadata Center Service , Provide catalog.db.table Such data reading and writing methods , To unify metadata , Again, we are right SparkSQL Made a secondary package , It is also integrated with the metadata Center , To achieve the goal of catalog.db.table Reading and writing between heterogeneous data sources in this form .
Unified implementation of scenario configuration batch flow integration , For some simple business logic scenarios , We will develop a scenario based batch flow integration implementation later . Such as batch flow integrated indexing task 、 Batch flow is integrated ETL Cleaning platform, etc , Due to resource problems , It's still under planning .
Batch flow integration SQL Unified under the current technology , Another big premise is the complexity of the log itself , This involves the normalization and integrity of the embedded point of the log itself , Real time computing is not like offline computing , A lot of attributional logic can be , The association logic is processed on the data side , Put aside the issues of rationality and cost , A lot of work can be done in offline scenarios .
But in real-time scenarios , It is very sensitive to performance and stability , If you put a lot of logic on the data side , It will bring many problems that cannot be realized 、 High cost of implementation 、 A lot of stability 、 And the problem of data delay . If you can't manage well , The whole construction of real-time data warehouse is a problem , So cloud music also launched the dawn management project and several teams , Completely reconstruct the implementation of management of various cloud music products , Improve and perfect the standardization and accuracy of management , Reduce the development cost of real-time data warehouse .
2. Unified data warehouse metadata
At present, there are mainly two types of schemes in the industry ：
The first is The scheme of building batch flow mapping layer , At present, Ali's open plan is this kind of plan , It is more suitable for old products with real-time data warehouse and offline data warehouse , Without changing the original warehouse , Build a unified mapping layer view , Provide an integrated use experience through views , The overall principle is shown in the figure below ：
The second scheme is to build a new metadata system , A set of schema Mount multiple storage devices at the same time , Such as HDFS、Kafka etc. , When writing data, write , When reading the scene , Depending on the reading method , Select the appropriate storage , At present, Netease has several sails There are several product teams Developed Arctic That's the solution ：
The overall idea is to encapsulate icberg and Kafka as well as Hbase And so on , Use different storage in different scenarios , in addition arctic still iceberg We have done a lot of secondary development on the basis of , To solve DWS Data update problem , Provide similar Hudi Of CopyOnWrite as well as MergeOnRead And so on , Used to solve Flink It is used to solve the stability problem of total polymerization . At present, cloud music has been tried in some new business scenarios , Dozens of batch flow integrated tables have been launched , If you want to know more about arctic You can find Netease's real-time computing team to understand , There is no more description here .
3. Real time report data
Providing real-time report data mainly depends on OLAP Engine and storage , The storage side needs to provide real-time data update capability at the same time , It also needs to have the query ability to provide second level data , Many times, there is no way to write the results directly to storage . Because the data report itself has many flexible queries , If you write the results directly to storage , You need something like Kylin That kind of real-time Cube Ability , This is important for development and Flink The pressure calculated by itself is too high , It will also bring a lot of waste of resources and storage , There will also be a lot of stability problems and development workload problems , The ability of secondary analysis of data will also be limited ; So on this floor we need OLAP The engine has the ability to provide queries with a second delay of at least 10 billion levels of data , At present, our main storage schemes include Kudu and Clickhouse Two kinds of , Take our old version of ABTest For example , The scheme we adopted is as follows ：
For the real-time results of the latest hour dimension and day dimension, we pass Impala Read in time Kudu Data correlates the latest results ; For the dimension data before one day or two hours in history, we use Spark The pre calculation is stored in the result table , Two pieces of data UNION Together to provide users , Ensure the timeliness of data results , And the user experience of overall data query .
Improvement of operation and maintenance tools
real time SQL The development of reduces the difficulty of developing real-time data statistics , It greatly reduces the threshold of real-time data statistics , On the one hand, because of its real-time SQL Immature and black box , On the other hand, many students take offline SQL Development experience or MYSQL Class database SQL Experience to develop real-time tasks , This brings great operation and maintenance pressure to the platform , Therefore, the construction of operation and maintenance tools , The improvement of task real-time indicators is one of our main thinking directions in the future .
The partition flow table technology is perfect
Partitioned stream table technology is a technology that can be used for cloud music real-time platform resources ,Kafka Pressure and the technology of qualitative change caused by the construction of data warehouse , At present, we have only completed a first edition , In the future, we will focus on the dynamic perception of partitions , Modification of partition , schema Modification of , And continue to improve the operation and maintenance monitoring and promotion .
Integrated construction of scene and batch flow
Such as batch flow integrated index task construction 、 Batch flow integration ETL Tools etc. , Unified log cleaning rules , Lay a good foundation for the integration of batch flow and warehouse .
Exploration of batch stream integrated storage
Research the current solutions in the industry , Business scenarios combined with music , Provide a complete solution , Reduce the development threshold of real-time reports , Improve the development efficiency of real-time reports ;
Batch flow integrated logic layer construction, etc .
Finally, a Netease sail counting team Architecture diagram of real-time computing solution , be based on Apache Flink Build high performance 、 One stop real-time big data processing scheme , It is widely used in streaming data processing scenarios , Interested students can click... At the end of the article “ Read the original ” Learn more about .