Master graduated from Xi'an University of Electronic Science and technology , Worked in alicloud storage department , Mainly engaged in the design and development of storage services related functions . On 2016 Join seven cow clouds in , Mainly responsible for streaming computing and offline computing services pipeline The architecture and development of . at present pipeline The company carries more than 100 billion yuan every day 、 More than 100 TB Data processing of .
Today's sharing mainly focuses on the big data platform developed by seven cattle in the last year , At present, our platform has carried the operation of the company's core business ; About our products , We will introduce it from a scene , It includes the challenges and solutions we encounter in the design process . We also welcome you to exchange and discuss with us based on these issues .
scene . product
For operation and maintenance personnel , In the daily routine online operation and maintenance , The fluctuation of visits in a day in the log 、 Online error distribution 、 The data of other business indicators is not a transparent process for operation and maintenance personnel , So how to visualize these things , Or collect these data for unified processing and analysis , In fact, it is a relatively complex and difficult process to realize . This is called operation and maintenance log analysis , It's also the scene we mentioned before . Details of our product solution scenarios , Further analysis will be carried out below . We use Nginx-log For example, for our Pandora Product description .
Data access Pandora—logkit Configure to run
The first step in any data analysis is data access .Pandora Development of data access tools logkit, It can help users to put data into Pandora In platform ; You need to download it at the beginning logkit, Configure and run （ chart 1）
logkit The tool supports multiple data sources , For example, yes. Nginx-log、kafka Data collection , And into our data processing platform . Here's the picture 1 Explain in detail , First , We need to look at the log format , Include the name of the log format . In the figure 1 in , We have made clear the path and format of log storage . Finally, enter the configuration file , Configure the information that needs to be configured , And indicate the path where the data needs to be stored , If you need to call a message queue , You need to configure the key and run it , Then this data will be collected and included in our platform .
* Log retrieval
chart 2 This is an intuitive visual interface , It supports drag and drop , On the left side of the page you can see “ data source ” And “ Log retrieval ” These two contents , Configured logkit After running , All the data will go into “ data source ” in . The name of each field in the data source is displayed on the right side of the page 、 Format and other information .
chart 3 It shows “ Log retrieval ” Content display page , adopt “ Log retrieval ” We can see some business logic clearly , Fill in your search criteria in the search box , You can do Full-text Retrieval , When you need to look at a time in the past, the response exceeds 3s All requests for , Then through the “ Log retrieval ” The page can also be clearly queried and displayed . chart 3 Just showing the status of a full-text search , On the function page, you can also view the histogram of relevant data distribution .
* Log aggregation
Pictured 4 Shown , Data into the data source , You can go through a period of SQL Compute aggregation at the granularity of every minute . There are a lot of things that can be aggregated , The Tathagata comes from IP The number of requests , It can also be other related operations , After the aggregation , The data will flow back to our data source again . Simply speaking , We use a calculation to return the data to the data source for the next step of analysis and processing , Calculation 、 The process of backflow can be cascaded continuously , Can achieve a lot of relatively complex data processing .
* Data back to the platform
The data flow back to the data source mentioned above is a way of processing , Users build their own set of HTTP service , Data is passed through HTTP It's another way of data processing that the interface of the system flows back into its own system . Data returned in this way , Users can precipitate the analysis results on their own platform , The operation page is shown in the figure 5 Shown .
* Real time data display and monitoring
chart 6 This is a visual display of our monitoring page , The monitoring service needs to be started later Grafana Configuration page , The basic configuration of the page is provided in our official documents , Users can directly download and import .
chart 7 The show is right Nginx After the log analysis, the data display chart . The orange box in the upper left corner （visits by 0） Shows the total number of visits , The green bar chart in the upper right corner shows the number of requests and response times in the past , The pie chart in the lower right corner shows the proportion of related user visits . The style and location of these graphs can be configured .
* Architecture design
chart 8 This shows Pandora Business architecture of . Data is passed through Portal/Logkit/SDK/API You can import into our platform , Enter the message queue , The data in the message queue can flow repeatedly between the computing task and the message queue after calculation , Of course , The data can also be exported directly . The exported data goes through the downstream system （ Log retrieval / Time series data, etc ） Processing can eventually generate data reports , That's the whole flow of data .
In the initial design of each system, design objectives and corresponding problems need to be solved . Let's talk about our design goal first , First, the system must support fast data access 、 High throughput 、 Low latency ; Second, as a cloud service , It must support massive user concurrent access and massive message queue ; To provide a real-time computing and offline computing framework to meet the needs of computing ; In the end, it must be a visual operation to meet the needs of users . After the design goal is put forward , We need to plan for the selection of models , We need to choose a storage system with high throughput , Of course, at present, the storage system of seven cattle is undoubtedly the most satisfying one ; Secondly, we need a powerful and flexible big data processing engine ; Finally, developers must ensure that the final product can be developed quickly and iteratively . Based on these requirements , We easily choose the appropriate technical support , Use Kafka To meet our needs for massive message queue design ; Use Spark As a computing engine ; In terms of language selection, we choose the one we have accumulated Golang, Final , After determining the selection of these technologies , We started building the system .
chart 9 Shown , It is our Pipeline The overall architecture design , It is responsible for pandora Access and processing of data in . Data is passed through Logkit And so on , That is to say apiserver. adopt apiserver The data will enter the message queue , After that, through the read and write back operation of the computing engine , Finally, it is imported into the downstream system （LogDB/TSDB/HTTP/ Seven cattle cloud storage ） Today we focus on the direction of data flow guided by the green arrow , I will refer to the relevant key points inside for detailed explanation . Throughout the flow of data , There are several factors that may determine the efficiency of the system , Like stability 、 Performance etc. . So I'll go from the user to the message queue , After calculation, the task returns to the message queue , Finally, the whole process of exporting data is explained .
* Data access layer
chart 10 This shows the data access layer . Data is passed through apiserver Import , The scheduler is used to manage the source data of some user message queues , This includes how the data is written to the message queue .logkit The reason this tool is here , It's not because the data goes through apisever flow logkit Finally, it flows to the message queue , It's because it can collect all kinds of data , Here we use it to collect system audit logs and monitoring information . It's easy to manage and configure .
When we first designed this system , Expansion is a problem that bothers us . Because the basic access is internal users , The access speed is relatively fast , So it needs to be expanded at least once or twice a week , This is a heavy burden in operation and maintenance . Then we use the container scheme , Because the whole data access layer is a stateless component , So we containerized it , Use our container cloud products to solve . Pictured 11 Shown , every last pod in , We will all apisever And logkit Put them together , By monitoring data , We summarize the information of each container, including the whole cluster, in the scheduler . The scheduler carries the load and total resources of the whole cluster , It can dynamically expand and shrink capacity according to the information .
* Data write optimization
chart 12 This is the process of optimizing data writing . The first generation of data writing processes , The serial mode is adopted , After the data is imported, it is parsed line by line , After all parsing, write the data to the message queue , But the efficiency of this method is very low . So we use go Characteristics of language , Adopted line channel, Data keeps coming in channel, Then in channel There are many downstream parser, Parallel parsing of data . That is to say, we use channel Turn processing into a concurrent process , Finally, it improved CPU Utilization ratio , The delay rate of user response is reduced , Greatly optimized performance .
Pictured 13 Shown , Our calculations are based on Spark Realization , Provides a relatively simple SQL, The user is shielded from the underlying details .
* Export optimization
Data flows into the whole system , In the system, whether it's computing or storage , If the processed data needs to work , It's going to go downstream , therefore “ Derived data ” This process serves as a link between upstream and downstream , A link between the preceding and the following . chart 14 It's the general architecture of the system , Because there was no fine-grained task segmentation for the export service at that time , And single server It can't handle too many user tasks , So in the rush hour , It will cause the delay to increase , Based on this , After a month of development, we finally launched a new version .
Pictured 15 Shown , It's an improved overall architecture . At the top of the picture is our master, Use it to control the scheduling management of all tasks . All tasks are forwarded to master, from master To evaluate the load on each machine , And then according to some state of the machine itself （CPU Usage rate 、 network bandwidth 、 The performance of the mission ） To do the corresponding scheduling , In addition, we have done more fine-grained segmentation of tasks .
The primary consideration in the design of scheduling method is resource orientation , Second, we need to make full use of heterogeneous machines , And can meet the automatic adjustment . Facing resources, everyone can understand , Make full use of heterogeneous machines , It's because our machines have many specifications , The intensity of the tasks that can be solved is inconsistent , We need to make full use of the machine's resources , You can't let it handle tasks , Yes " Machine resources " The occurrence of inadequacy or waste ; As for automatic adjustment , That is to ensure that in the face of sudden increase or decrease in the number of users, when the unexpected situation occurs , We have the ability to automatically adjust the distribution of tasks , The ultimate goal is to make full use of resources .
* Task assignment
chart 16 It's a process diagram of task allocation . Suppose the initial task （T1-T7） They're all evenly distributed on three machines , Now there are two other tasks （T8-T9） Get into , So we need to find some relatively idle machines （S1 or S2） Prioritize these two tasks to them . This is just an adjustment to a relatively balanced situation .
* Automatic adjustment
Of course, there will be imbalances （ chart 17-18） Then we need to do some automatic adjustment at this time , For example, a user has deleted many of his tasks , So the S1 And S2 relative S3 I'll be more free , Then we need to go through server towards master Report the heartbeat , This content includes the occupation of resources and the distribution of tasks , Make an adjustment to the idle machine according to the result , Maintain a state of relative balance .
* Horizontal expansion
chart 19 It's a problem with horizontal expansion . All the machines are in a busy state at present , At this time, if a new task comes （T13）, But before 12 All tasks have been distributed on these three machines , There is no spare machine to handle new tasks , At this time, we need to expand the capacity of the machine .
Pictured 20 Shown , In the first three machines are all in “ Be busy ” In the state of , We need to add server4, Once started S4, It will be to master Report the heartbeat , then master You'll feel the presence of the task and S4 The existence of , Re evaluate the distribution and utilization of the whole resources , take T13 Allocated to the more idle S4 On , It can even be in S1、S2、S3 Tasks waiting to be processed are assigned to S4 On .
- Resource gap leave
actually , It's not just about automatically adjusting tasks, it's very important to share the pressure of machine processing , For some special tasks , How to ensure that the sudden increase of user traffic will not affect other relatively small users , Or when data is exported to cloud storage for compression （ The compression process is very expensive CPU resources ） How to make sure it doesn't affect other tasks , These are the problems we need to deal with . To solve these problems, we propose the concept of resource isolation （ chart 21） Isolate machines from tasks , Provide scheduling group （ A scheduling group is a group of similar machines or a class of tasks ） function , By physically isolating them , So that they don't influence each other , And make full use of resources .
* master High availability
To sum up, we can see that our system is a one to many state （ One master For many server） So in this case , How to ensure the high availability of service in case of single point of failure . Pictured 22 To map 23 Shown , It's a core of our design , We can see at the bottom of the picture is a zookeeper colony , We simulate a lock by creating a temporary file , Multiple machines can grab this lock at the same time , Seize the successful master Will be a master master, If the preemption is not successful, it will be used as a backup , I'll be free at ordinary times , But once S1 Lose the lock ,master2 Will seize the lock , Take over the whole scheduling task and some cluster management tasks , This is it. master High availability ideas .
* Server High availability
server High availability , We use a similar approach . We will master As a highly available node , every last server All need to master Report the heartbeat , The content of heartbeat includes the survival of the machine itself and the execution of corresponding tasks . Pictured 24 Shown ,master Once you feel it S3 Downtime , Then it will be S3 Two tasks performed on （T5-T6） All transferred , And it would think S1 And S2 It's a relatively suitable choice , And will transfer these two tasks to the corresponding server On , That's it server High availability targets for .
* System level expansion
It was mentioned at the beginning that our entire message queue uses kafka Realized ,kafka In fact, there is an upper limit , In the beginning, we also adopted kafka A single cluster （ chart 25） It turned out , Once the volume of business comes up , Once the message queue data reaches a certain level , The system will avalanche . So we made an extension to a single cluster （ chart 26） Will single kafka The cluster is directly split into multiple clusters , Let every one kafka Clusters are kept in a relatively small scale , In this way, the performance will be greatly improved , chart 26 This is the case after expansion , By three kafka The information provided will be aggregated into our scheduler , The scheduler uses pressure or the number of message queues , Assign new tasks created by users and new data sources , Assign to the right kafka In the cluster .
Upstream and downstream protocol optimization
In practice, there will still be lower performance between upstream and downstream . At the very beginning , We use Json To do the upstream and downstream data transmission , But the problem exposed in log retrieval is , Doing so consumes a lot of network , So we decided to use Protobuf Upstream and downstream data transmission . chart 27 Show the use of Json And Protobuf when , From serialization 、 Data comparison results from the perspective of deserialization , As you can see from the diagram , Use Protobuf It takes less time , Especially in deserialization , its CPU Consumption has been reduced by nearly an order of magnitude . therefore , In this way , No matter from the cluster computing resource utilization or from the improvement of network bandwidth, the efficiency has been improved several times .
* Pipeline processing
As for the processing of the pipeline , The original design was actually a serial operation , The export service pulls data from the message queue , After processing, do a push , Continue to work like this , The processing operation is very fast , But pull and push are relatively slow , Such a process , The efficiency of execution is actually very low , And because the processing time of each operation is different , Some are fast and some are slow , As a result, the trend chart of the network monitored on the monitoring chart is high and low , This leads to a reduction in utilization . In view of this , We optimized the pipeline operation , Using parallel operation （ chart 28） Results show , As a result, the efficiency of push and pull is higher than that of the above method .
Our whole language selection is based on Golang Of , And it's a belt GC Language , In fact, there are still many situations , There will be 1/10 I don't work and I'm doing garbage collection . So at the code level, we've made some improvements , One is sync.Pool The use of garbage collection can reduce the frequency of garbage collection ; Second, reuse objects , Reuse an object as much as possible , thus , every time GC And the amount of it will decrease . And then we talked about Golang Version upgrade , To upgrade to 1.8 We looked at it again after the release GC Time consuming , It's almost two orders of magnitude higher . This is code level optimization .
Limited resources hypothesis
Finally, we describe an optimization on resource assumption , That is to establish a concept of limited resources hypothesis . Some time ago, due to the large amount of data access , We need to do our own operations , Customers with burst access , It makes it easy for the system to run full , At this time, we will find a way to add machines or do some adjustment and Optimization in scheduling , But this is not a long-term solution after all . So we will make a resource limited assumption at the beginning , It is to assess how we should do when resources are limited at the beginning . For example, we need to predict in advance 10M How many users' tasks can bandwidth correspond to , There must be a data , And on this basis, we need to do a resource estimation and cluster resource planning . According to the estimated data , To set a water level standard , After exceeding the water level standard , Consider whether it is necessary to expand the capacity . Customers also need to communicate clearly about our existing processing capacity , such , To ensure that the entire cluster / Service is in a relatively healthy state .
Above we mentioned our architecture implementation and an overall optimization , Now the result is ： We support trillions of data points , And it can handle hundreds of them every day TB The data of , And support massive users . Our system currently has very low latency , High processing efficiency ; Because we have realized automatic operation and maintenance , So the labor cost is also greatly reduced , Our expectation is that we can write code without being disturbed by operation and maintenance ; As for usability , It has reached 3 individual 9（99.9%） The results .
That's all I'm sharing today .