The fifth chapter Consistent transactions

Storm It's a distributed streaming system , utilize anchor and ack The mechanism guarantees all tuple It's all handled successfully . If tuple error , Can be retransmitted , But how to make sure that something goes wrong tuple It's only handled once ?Storm Provides a set of transactional components Transaction Topology, To solve this problem .

Transactional Topology It is no longer maintained , from Trident To achieve transactional topology, But the principle is the same .

5.1 Design of consistent transactions

Storm How to realize that is right tuple parallel processing , And ensure the transactional . This section starts with a simple transactional implementation , Step by step Transactional Topology Principle .

5.1.1 Simple design one : Strong sequential flow

Guarantee tuple It's only dealt with once , The easiest way is to put tuple It turns into a strong order , And only one at a time tuple. from 1 Start , For each tuple Add one in order id. Processing tuple When , Will handle the successful tuple id And the results are stored in the database . next tuple When it comes , Put it id It's the same as... In the database id compare . If the same , It means tuple It's been dealt with successfully , Ignore it ; If different , According to strong order , Illustrate this tuple Not dealt with , Put it's id And the calculation results are updated to the database .

Take counting the total number of messages as an example . Every one of them tuple, If the data stored in the database id With the current tuple id Different , Then the total number of messages in the database plus 1, At the same time, update the current tuple id value . Pictured :

 Strong sequential flow

         But this mechanism allows the system to process only one at a time tuple, Can't implement distributed computing .

5.1.2 Simple design two : Strong order batch flow

In order to achieve distributed , We can process one batch at a time tuple, Is called a batch. One batch Medium tuple Can be processed in parallel .

We need to make sure that one batch It's only dealt with once , The mechanism is similar to the previous section . It's just that what's stored in the database is batch id.batch The intermediate results of the calculation exist in the local variables first , When one batch All in tuple After it's all dealt with , Judge batch id, If it's in the database id Different , Update the intermediate calculation results to the database .

How to ensure a batch Everything in it tuple It's all finished ? You can use Storm Provided CoordinateBolt. Pictured :

 The order batches

But strong order batch There are limits to flow , Only one at a time batch,batch There's no parallel between . To achieve real distributed transaction processing , have access to storm Provided Transactional Topology. Before that , Let's talk about it in detail CoordinateBolt Principle .

5.1.3 CoordinateBolt principle

CoordinateBolt The principle is as follows :

  • What really does the calculation bolt There's a package out there CoordinateBolt. The one who really does the job bolt We call it real bolt.
  • Every CoordinateBolt Record two values : What are they? task Sent me tuple( according to topology Of grouping Information ); What I'm going to give tuple Send a message ( Same basis groping Information )
  •  Real bolt Send out a tuple after , The outer layer of CoordinateBolt Will record this tuple To whom task 了 .
  • Wait for all. tuple When it's all sent ,CoordinateBolt Through another special stream With emitDirect The way to tell all it sent tuple Of task, How much it sent tuple Here it is task. The downstream task I will compare this number with what I have received tuple Compare the quantity , If equal , It means that all of the tuple.
  • The downstream CoordinateBolt I'll repeat the above steps , Inform the downstream .

The whole process is as shown in the figure :

coordinateBolt

CoordinateBolt It's mainly used in two scenarios :

  • DRPC
  • Transactional Topology

CoordinatedBolt It's intrusive to the business , To use CoordinatedBolt Features provided , You have to make sure that every one of you bolt Every one that's sent tuple One of the first field yes request-id. So-called “ I've dealt with my upstream ” It means the current one bolt For the present request-id The work that needs to be done is done . This request-id stay DRPC It represents a DRPC request ; stay Transactional Topology It represents a batch.

5.1.4 Trasactional Topology

Storm Provided Transactional Topology take batch The calculation is divided into process and commit Two phases .Process Stages can handle multiple batch, There's no need to guarantee sequence ;commit Stage guarantee batch The strong order of , And only one at a time batch, The first 1 individual batch Before successful submission , The first 2 individual batch Can't be submitted .

Take the total number of messages as an example , The following code comes from storm-starter Inside TransactionalGlobalCount.

MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA,new Fields(“word“), PARTITION_TAKE_PER_BATCH);

TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder(“global-count“, “spout“, spout, 3);

builder.setBolt(“partial-count“, new BatchCount(), 5).noneGrouping(“spout“);

builder.setBolt(“sum“, new UpdateGlobalCount()).globalGrouping(“partial-count“);

TransactionalTopologyBuilder Four parameters are received .

  • This Transactional Topology Of id.Id Used in Zookeeper Save the current topology Progress , If this topology restart , You can continue with the previous schedule .
  •  Spout In this topology Medium id
  • One TransactionalSpout. One Trasactional Topology There can only be one of them TrasactionalSpout. In this case, a MemoryTransactionalSpout, From a memory variable (DATA) Read data from .
  • TransactionalSpout Parallelism of ( Optional ).

Here is BatchCount The definition of :

public static class BatchCount extends BaseBatchBolt {

        Object _id;

        BatchOutputCollector _collector;

        int _count = 0;

        @Override

        public void prepare(Map conf, TopologyContext context,

              BatchOutputCollector collector, Object id) {

            _collector = collector;

            _id = id;

        }

        @Override

        public void execute(Tuple tuple) {

            _count++;

        }

        @Override

        public void finishBatch() {

            _collector.emit(new Values(_id, _count));

        }

        @Override

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

            declarer.declare(new Fields(“id“, “count“));

        }

}

 BatchCount Of prepare The last parameter of the method is batch id, stay Transactional Tolpoloyg Inside this id It's a TransactionAttempt object .

Transactional Topology It's from tuple All must be with TransactionAttempt As the first field,storm According to this field To judge tuple Which one does it belong to batch.

TransactionAttempt Contains two values : One transaction id, One attempt id.transaction id The purpose of this is what we described above for each batch Medium tuple Is the only one. , And regardless of this batch replay It's the same many times .attempt id It's for every batch The only one id, But for the same batch, it replay After that attempt id Follow replay It was different before , We can attempt id Comprehend replay-times, storm Use this id To distinguish one from the other batch The launch of the tuple Different versions of .

execute The way would be batch Each of them tuple Do it once , You should put this batch The calculation state is kept in a local variable . For this example , It's in execute The method is incremental tuple The number of .

Last , When this bolt Received some batch All of tuple after , finishBatch Method will be called . In this case BatchCount Class will send its local number to its output stream at this time .

Here is UpdateGlobalCount The definition of a class :

public static class UpdateGlobalCount extends BaseTransactionalBolt

implements ICommitter {

        TransactionAttempt _attempt;

        BatchOutputCollector _collector;

        int _sum = 0;

        @Override

        public void prepare(Map conf, TopologyContext context,

BatchOutputCollector collector, TransactionAttempt attempt) {

            _collector = collector;

            _attempt = attempt;

        }

        @Override

        public void execute(Tuple tuple) {

            _sum+=tuple.getInteger(1);

        }

        @Override

        public void finishBatch() {

            Value val = DATABASE.get(GLOBAL_COUNT_KEY);

            Value newval;

            if(val == null || !val.txid.equals(_attempt.getTransactionId())) {

                newval = new Value();

                newval.txid = _attempt.getTransactionId();

                if(val==null) {

                    newval.count = _sum;

                } else {

                    newval.count = _sum + val.count;

                }

                DATABASE.put(GLOBAL_COUNT_KEY, newval);

            } else {

                newval = val;

            }

            _collector.emit(new Values(_attempt, newval.count));

        }

        @Override

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

            declarer.declare(new Fields(“id“, “sum“));

        }

}

UpdateGlobalCount Realized ICommitter Interface , therefore storm Only in commit Stage execution finishBatch Method . and execute Methods can be done at any stage .

stay UpdateGlobalCount Of finishBatch In the method , Change the current transaction id And the data stored in the database id compare . If the same , Ignore this batch; If different , Then put this batch The results of the calculation are added to the summary , And update the database .

Transactional Topolgy The operation diagram is as follows :

transactional topology

So to summarize Transactional Topology Some characteristics of

  •  Transactional Topology All the transactional mechanisms are encapsulated , Internal use CoordinateBolt To guarantee a batch Medium tuple It's done .
  •  TransactionalSpout There can only be one , It will all tuple Divided into one by one batch, And guarantee the same batch Of transaction id Always the same .
  •  BatchBolt Handle batch Together tuples. For each of these tuple call execute Method , In the whole batch Call... When processing is complete finishBatch Method .
  •   If BatchBolt Marked as Committer, Only in commit Phase call finishBolt Method . One batch Of commit The stage consists of storm Make sure it's only the first one batch It will not be executed until it is submitted successfully . And it will try again until topology Everything in it bolt stay commit Complete submission .
  •  Transactional Topology Hide the anchor/ack frame , It provides a different mechanism to fail One batch, So that makes this batch By replay.

5.2 Trident Introduce

Trident yes Storm High level abstraction above , Provides joins,grouping,aggregations,fuctions and filters Such as the interface . If you have Pig or Cascading, I'm familiar with these interfaces .

Trident take stream Medium tuples Divide into batches To deal with ,API It encapsulates our understanding of these batches Processing of , Guarantee tuple It's only dealt with once . Handle batches The intermediate results are stored in TridentState In the object .

Trident Transactional principles are not detailed here , If you are interested, please refer to the information .