4.1 brief introduction

storm To ensure that spout Every message sent out will be processed completely . This chapter will describe storm How the system achieves this goal , And it will detail how developers should use storm These mechanisms to achieve reliable data processing .

4.2 Understand that the message is processed completely

A message (tuple) from spout Send it out , It can cause hundreds of messages to be created based on this message .

Let's think a little bit about the flow of “ Word count ” Example :

storm The task starts from the data source (Kestrel queue) Read one complete English sentence at a time ; Break the sentence down into separate words , Last , Real time output of each word and the number of times it appears .

In this case , Each from spout Messages sent out ( Every English sentence ) Will trigger a lot of messages to be created , The words separated from the sentence are the new messages created .

These messages form a tree structure , We call it “tuple tree”, It looks like this 1 Shown :

chart 1 Example tuple tree

Under what conditions ,Storm To think that one from spout The message sent is processed completely ? The answer is that the following conditions are met at the same time :

  • tuple tree No longer growing
  • Any message in the tree is identified as “ Disposed of ”

If within the specified time , It's a message tuple tree Not fully processed , It is considered that the message has not been fully processed . The timeout value can be determined by the task level parameter Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS  To configure , The default timeout value is 30 second .

4.3 The life cycle of a message

If the message is processed completely or not ,Storm How will the next operation be carried out ? In order to find out the problem , Let's look at it from spout Life cycle of outgoing messages . Here's a list spout The interface that should be implemented :

First , Storm Use spout Example of nextTuple() Methods from spout Request a message (tuple). After receiving the request ,spout Use open Method SpoutOutputCollector Send one or more messages to its output stream . Every time you send a message ,Spout Will give this message a message ID, It will be used to identify the message .

Let's suppose we start from kestrel Read messages from the queue ,Spout Will kestrel The queue is set for this message ID As part of this message message ID. towards SpoutOutputCollector The format of the message sent in is as follows :

Take it , These messages will be sent to the subsequent business processing bolts, also Storm New messages generated from this message are tracked . When it detects a message derived from tuple tree After being completely processed ,Storm Would call Spout Medium ack Method , And messageID Pass in as a parameter . Empathy , If a message processing times out , This message corresponds to Spout Of fail Method will be called , When called, the messageID Will be passed in as a parameter .

Be careful : A message will only be sent by the person who sent it spout Task to call ack or fail. If one of the systems spout Run by multiple tasks , The message will only be created by spout Task to answer (ack or fail), It will never be up to the others spout Task to answer .

We continue to use from kestrel The example of reading messages in the queue is used to illustrate the case of high reliability spout What needs to be done ( Suppose this spout The name of is KestrelSpout).

Let's give you a brief introduction kestrel Message queue :

When KestrelSpout from kestrel Read a message from the queue , Represents it “ open ” I got a message in the queue . It means , This message is not actually deleted from the queue , Instead, set this message to “pending” state , It waits for a response from the client , After being answered , This message will be removed from the queue . be in “pending” Status messages will not be seen by other clients . in addition , If a client accidentally disconnects , From this client “ open ” All messages in the queue will be rejoined . When the message is “ open ” When ,kestrel The queue also provides a unique identity for the message .

KestrelSpout That's to use this unique logo as this tuple Of messageID Of . Later when ack or fail When called ,KestrelSpout Will be able to ack perhaps fail together with messageID Send to kestrel queue ,kestrel Will actually delete the message from the queue or put it back in the queue .

4.4 Reliable and relevant API

In order to use Storm Provide reliable processing features , We need to do two things :

  1. Whenever in tuple tree A new node is created in , We need clear notice Storm;
  2. When a single message is processed , We need to tell Storm This tree tuple tree The state of change .

Go through the two steps above ,storm You can detect a tuple tree When it was completely dealt with , And will call the relevant ack or fail Method .Storm Provides a simple and clear way to complete the above two steps .

by tuple tree Add a new node to the node specified in , We call it anchoring (anchoring). Anchoring happens at the same time that we send messages . To make it easier , Let's use the following code as an example . Of this example bolt Decompose the message containing the whole sentence into a series of sub messages , Each sub message contains a word .

Every message is anchored in this way : Take the input message as emit The first parameter of the method . because word The message is anchored to the input message , The input message is spout Sent by tuple tree The root node , If any one word Message processing failed , This is tuple tree that spout The message will be resend .

On the contrary , Let's take a look at using the following emit When the news ,Storm How do you deal with that :

If you send a message in this way , Will cause this message not to be anchored . If so tuple tree Message processing failed in , Derive this tuple tree The root message of will not be resend . Depending on the fault tolerance level of the task , Sometimes it's good to send a non anchored message .

An output message can be anchored to one or more input messages , This is doing it join It's very useful when it's time to get together . A message with multiple anchors failed to process , Can lead to multiple... Associated with it spout The message was resend . Multiple anchoring through emit Method to specify multiple input messages to implement :

Multiple anchors add anchored messages to multiple trees tuple tree On .

Be careful : Multiple binding can break the traditional tree structure , So as to form a DAGs( Directed acyclic graph ), Pictured 2 Shown :

chart 2 A diamond-shaped structure with multiple anchors

Storm The implementation of can be handled like a processing tree DAGs.

Anchoring shows how to add a message to the specified tuple tree in , Highly reliable processing API The next part of will describe to you when it's done tuple tree What should we do when we have a single message in the news . These are through OutputCollector Of ack and fail Method to achieve . Look back at the example SplitSentence, It can be found that when all word After the message has been sent , The input message representing the sentence will be answered (acked).

Each message processed must indicate success or failure (acked perhaps failed).Storm Memory is used to track the processing of each message , If the processed message does not respond , Sooner or later, the memory will run out !>

quite a lot bolt Follow a specific process : Read a message 、 Send its derived sub message 、 stay execute Answer this message at the end . General filters (filter) Or simple processing functions are all such applications .Storm There is one BasicBolt The interface encapsulates the above process . Example SplitSentence have access to BasicBolt To rewrite :

In this way , The code is a little simpler than before , But the function is the same . Send to BasicOutputCollector The message is automatically anchored to the input message , also , When execute When it's done , Will automatically respond to input messages .

In many cases , A message needs to be delayed , For example, aggregation or join. Only after getting a result from a set of input messages , Will answer all previous input messages . And aggregate and join Most of the time, the output message is multi anchored . However , These features are not IBasicBolt What we can handle .

4.5 Efficient implementation tuple tree

Storm There's a group in the system called “acker” The special task of , They are responsible for tracking DAG( Directed acyclic graph ) Every message in . Every time I find one DAG Be dealt with completely , It sends a message to the spout The mission sends a signal . In Topology acker The parallelism of tasks can be determined by configuring parameters Config.TOPOLOGY_ACKERS To set up . default acker The task parallelism is 1, When there are a lot of messages in the system , We should improve it appropriately acker Concurrency of tasks .

In order to understand Storm Reliability handling mechanism , We start by studying the life cycle of a message and tuple tree Start with the management of . When a message is created ( Whether in the spout still bolt in ), The system assigns a 64bit As a random value of id. These are random id yes acker It's used to track people who are spout Message derived tuple tree Of .

Every message knows where it is tuple tree Of the corresponding root message id. whenever bolt A new message is generated , Corresponding tuple tree The root message in messageId Copy it to this message . When this message is answered , It's about tuple tree Information about the changes is sent to the people who are tracking the tree acker. for example , He will tell acker: This message has been processed , But I've spawned some new news , Help to track it .

for instance , Suppose the news D and E It's the news C derived , Here's the message C When answered ,tuple tree How it changed .

Because in C While being removed from the tree D and E Will be added to tuple tree in , therefore tuple tree Will not be prematurely considered to have been fully dealt with .

About Storm How to track tuple tree, Let's go deeper . As mentioned earlier, there can be any number of acker, that , Whenever a message is created or answered , How does it know which acker Well ?

The system uses a hash algorithm based on spout News messageId Determine which acker Trace this message derived tuple tree. Because every message knows the corresponding root message messageId, So it knows which acker signal communication .

When spout When sending a message , It informs the corresponding acker A new root message is generated , At this time acker I'm going to create a new one tuple tree. When acker After finding out that the tree has been completely disposed of , He will inform the corresponding spout Mission .

tuple How is it tracked ? There are thousands of messages in the system , If for each spout If all the messages sent build a tree , Soon the memory will run out . therefore , Different strategies must be adopted to track each message . Because of the new tracking algorithm ,Storm Just fixed memory ( about 20 byte ) You can track a tree . The algorithm is storm The core of proper operation , It's also storm The biggest breakthrough .

acker The mission is saved spout news id Mapping to a pair of values . The first value is spout The task of id, Through this id,acker You know which one to notify when message processing is complete spout Mission . The second value is a 64bit The number of , We call it “ack val”, It's a random representation of all the messages in the tree id The exclusive or result of .ack val Represents the state of the whole tree , No matter how big the tree is , You just need this fixed size number to track the whole tree . When a message is created and answered, there will be the same message id Send it for XOR .

whenever acker Found a tree ack val The value is 0 When , It knows that the tree has been completely disposed of . Because of the randomness of the message ID It's a 64bit Value , therefore ack val Set to before the tree is processed 0 The probability is very small . Suppose you send 10000 messages per second , In terms of probability , Need at least 50,000,000 There's a chance of a mistake every year . Even so , Only when this message fails to be processed can data be lost !

4.6 Choose the right level of reliability

Acker The mission is lightweight , So you don't need too much in topology acker There is . Can pass Storm UI To observe acker Throughput of tasks , If it seems that the throughput is not enough , Indicates that additional acker.

If you don't require that every message be processed ( You're allowed to lose some information in the process ), Then you can turn off the reliable processing mechanism of messages , So we can get better performance . Turning off the reliable processing mechanism of messages means that the number of messages in the system will be halved ( Every message doesn't need to be answered ). in addition , Turning off reliable processing of messages can reduce the size of messages ( Not every tuple Record its roots id 了 ), This saves bandwidth .

There are three ways to implement a reliable processing mechanism for relational messages :

  • The parameter Config.TOPOLOGY_ACKERS Set to 0, In this way , When Spout When sending a message , its ack Method will be called immediately ;
  • The second way is Spout When sending a message , Do not specify... For this message messageID. When you need to turn off the reliability of a particular message , You can use this method ;
  • Last , If you don't care about the reliability of a message derived from its descendants , The sub message derived from this message should not be anchored when it is sent , That is to say emit Input message is not specified in method . Because these posterity messages are not anchored in any tuple tree in , So their failure won't cause any spout Resend message .

4.7 Fault tolerance at all levels of cluster

Up to now , You have understood Storm The reliability mechanism of , And know how to choose different reliability levels to meet the requirements . Now let's look at Storm How to ensure that the data is not lost in various situations .

3.7.1 Mission level failure

  • because bolt Mission crash The resulting message was not answered . here ,acker All in this bolt Messages associated with tasks fail because of timeouts , Corresponding spout Of fail Method will be called .
  • acker Task failed . If acker The mission itself failed , All messages it holds before it fails will fail due to timeout .Spout Of fail Method will be called .
  • Spout Task failed . In this case ,Spout External devices for task docking ( Such as MQ) Responsible for message integrity . For example, when the client is abnormal ,kestrel The queue will be in pending All messages in the state are put back in the queue .

4.7.2  Task slot (slot) fault

  • worker Failure . Every worker There are several bolt( or spout) Mission .supervisor Responsible for monitoring these tasks , When worker After failure ,supervisor Will try to restart it locally .
  • supervisor Failure .supervisor It's stateless , therefore supervisor The failure of does not affect the currently running task , Just restart it in time .supervisor Not bootstrapped , Need external monitoring to restart in time .
  • nimbus Failure .nimbus It's stateless , therefore nimbus The failure of does not affect the currently running task (nimbus When the failure , Unable to submit new task ), Just restart it in time .nimbus Not bootstrapped , Need external monitoring to restart in time .

4.7.3.  Cluster nodes ( machine ) fault

  • storm Node failure in cluster . here nimbus All running tasks on this machine will be transferred to other available machines to run .
  • zookeeper Node failure in cluster .zookeeper Ensure that less than half of the machine downtime can still be normal operation , Repair the machine in time .

4.8 Summary

This chapter introduces storm How to achieve reliable data processing in cluster . With the help of innovative tuple tree Tracking technology ,storm Efficient through the data response mechanism to ensure that the data is not lost .

storm Division in cluster nimbus Outside , There is no single point , Any node can fail without data loss .nimbus Designed to be stateless , As long as it can restart in time , It won't affect the running tasks .