编程知识 cdmana.com

MapReduce computing framework of Hadoop big data series

If you will Hadoop Compare to an elephant , that MapReduce It's the elephant's computer .MapReduce yes Hadoop The core programming model . stay Hadoop in , The core of data processing is MapReduce Programming model .

Content of this chapter :

1) MapReduce Programming model

2) MapReduce Execute the process

3) MapReduce Data localization

4) MapReduce working principle

5) MapReduce Error handling mechanism


1. MapReduce Programming Model

Map and Reduce The concept of "is borrowed from a function into a language , Whole MapReduce The calculation process is divided into Map Phase and Reduce Stage , Also known as the mapping and reduction phase , These two independent stages are actually two independent processes , namely Map Process and Reduce The process , stay Map Read and preprocess the data , The preprocessing results are then sent to Reduce To merge .

Let's go through a code case , Let's get familiar with how to use code quickly , Quickly implement our own MapReduce.

Case study : Distributed calculation of the number of times each word appears in an article , That is to say WordCount.

1) establish map.py file , Write the following code :

#!/usr/bin/env pythonimport sysword_list = []for line in sys.stdin:
  word_list = line.strip().split(' ')
  if len(word_list) <= 0:
    continue
  for word in word_list:
    w = word.strip()
    if len(w) <= 0:
      continue
    print '\t'.join([w, "1"])

The main work of the code is to read from the article data source line by line , Words in the article are separated by spaces ,

word_list = line.strip().split(' ') This code is the current reading of a whole line of data according to space , Store the segmented result into word_list Array , And then through for word in word_list Traversal array , Take out every word , Back up “1” Identify the current word appear 1 Time .

2) establish reduce.py, Write the following code :

#!/usr/bin/env pythonimport syscur_word = Nonesum_of_word = 0for line in sys.stdin:
  ss = line.strip().split('\t')
  if len(ss) != 2:
    continue
  word = ss[0].strip()
  count = ss[1].strip()
  if cur_word == None:
    cur_word = word
  if cur_word != word:
    print '\t'.join([cur_word, str(sum_of_word)])
    sum_of_word = 0
    cur_word = word
  sum_of_word += int(count)print '\t'.join([cur_word, str(sum_of_word)])sum_of_word = 0

This code is for map The array of the phase is summarized ,map To reduce By default, there is shuffle partition Grouping mechanism , Guarantee the same word The record of , It will be transmitted continuously to reduce in , So in reduce The phase only needs to be identical to the continuous word The latter technology can be accumulated and summed .

3) Local simulation test scripts :

$ cat big.txt | python map.py | sort -k1 | python reduce.pycat 1run 3see 2spot 2the 1

6) Script execution process :

see spot runrun spot runsee the catsee spot runsee the catrun spot runsee,1spot,1run,1run,1spot,1run,1see,1the,1cat,1see,1see,1spot,1spot,1run,1run,1run,1the,1cat,1see,1spot,1run,1the,1cat,1cat 1run 3see 2spot 2the 1

Hadoop Big data combat series Mapreduce  Computing framework


2. MapReduce Execute the process

The above example belongs to MapReduce The general flow of computing frameworks , After finishing and summarizing :

Hadoop Big data combat series Mapreduce  Computing framework


1) Input and split :

Do not belong to map and reduce The main process of , But it's part of the time consumed by the whole computing framework , This part will be formal map Prepare the data .


Fragmentation (split) operation :

split It just splits the contents of the source file into a series of InputSplit, Every InputSpilt The data information of the corresponding partition is stored in ( for example , File block information 、 The starting position 、 Data length 、 The list of nodes …), It's not about splitting the source file into smaller files , Every InputSplit All by one mapper Follow up .

Each slice size parameter is very important ,splitSize It is an important parameter in the composition of fragmentation rules , The parameter is determined by three values :

 minSize:splitSize The minimum value of , from mapred-site.xml In profile

mapred.min.split.size Parameter determination .

 maxSize:splitSize The maximum of , from mapred-site.xml In profile

mapreduce.jobtracker.split.metainfo.maxsize Parameter determination .

 blockSize:HDFS The fast size of the file store in , from hdfs-site.xml In profile

dfs.block.size Parameter determination .

splitSize The definite rules of :splitSize=max{minSize,min{maxSize,blockSize}}

Formatting Data (Format) operation :

Will divide the good InputSplit Data formatted as key value pairs . among key For offset ,value It's the content of each line .

It is worth noting that , stay map During the execution of the task , Will continue to perform data formatting operations , Each key value pair generated will be passed in map, To deal with . therefore map There is no time difference between before and after data format operation , But at the same time .

Hadoop Big data combat series Mapreduce  Computing framework


2) Map mapping :

yes Hadoop Where the parallel nature plays out . According to the user specified map The process ,MapReduce Try to execute this on the machine where the data is located map Program . stay HDFS in , The file data is copied in multiple copies , So the calculation will choose the most idle node that has this data .

In this part ,map Internal specific implementation process , It can be customized by users .

3) Shuffle distributed :

Shuffle A process is a process Mapper The resulting direct output , After a series of treatments , Become the ultimate Reducer The whole process of entering data directly . This is a mapreduce Core process of . The process can be divided into two stages :

Mapper Terminal Shuffle: from Mapper The results are not written directly to disk , It's stored in memory first , When the amount of data in memory reaches the set threshold , Write to local disk at once . At the same time sort( Sort )、combine( Merge )、partition( Fragmentation ) Wait for the operation . among ,sort It's a Mapper The result is in accordance with key Value to sort ;combine It's a key Records with the same value are merged ;partition It is to distribute data evenly to Reducer.

Reducer Terminal Shuffle: because Mapper and Reducer Often not running on the same node , therefore Reducer You need to download from multiple nodes Mapper Results data for , And processing these data , And then to be Reducer Handle .

4) Reduce cut :

Reducer Data stream in the form of receiving , Form the output of form , The specific process can be customized by the user , The final result is written directly into hdfs. Every reduce The process will correspond to an output file , Names to part- start .

3. MapReduce data localization (Data-Local )

First ,HDFS and MapReduce yes Hadoop The core design of . about HDFS, It's the storage Foundation , At the data level, it provides massive data storage support . and MapReduce, It's on the top of the data , By writing MapReduce The program calculates and processes massive data .

in front HDFS In the chapter , got it NameNode Is the name node process of the file system ,DataNode It's the data node process of the file system .

MapReduce In the computing framework, it is responsible for computing task scheduling JobTracker Corresponding HDFS Of NameNode Role , Only one is responsible for computing task scheduling , One is responsible for storage task scheduling .

MapReduce In a computing framework, responsible for real computing tasks TaskTracker Corresponding to HDFS Of DataNode Role , One is responsible for calculating , One is responsible for managing storage data .

in consideration of “ Localization principles ”, In a general way , take NameNode and JobTracker Deploy to the same machine , each DataNode and TaskNode It's also deployed to the same machine .

Hadoop Big data combat series Mapreduce  Computing framework


The purpose of this is to map The task is assigned to contain the map Processing of data blocks TaskTracker On , At the same time, the program JAR Copy the package to the TaskTracker Come up and run , This is called “ Operations move , Data doesn't move ”. And distribution reduce The task does not consider data localization .

4. MapReduce Work principle

We go through Client、JobTrask and TaskTracker From the perspective of MapReduce How it works :

Hadoop Big data combat series Mapreduce  Computing framework


First, on the client side (Client) Start a job (Job), towards JobTracker Request one Job ID. Copy the resource files needed to run the job to HDFS On , Include MapReduce It's packaged JAR file 、 Configuration file and input partition information calculated by client . These documents are stored in JobTracker Files created specifically for this job In the middle of it , file It's called The book Industry Job ID. JAR file Default There will be 10 Pair Ben

(mapred.submit.replication Attribute control ); The input partition information tells JobTracker How many should be started for this job map Mission information .

JobTracker After receiving the assignment , Put it in a job queue , Wait for the job scheduler to schedule it. When the job scheduler schedules the job according to its own scheduling algorithm , A... Is created for each partition based on the input partition information map Mission , And will map The task is assigned to TaskTracker perform . about map and reduce Mission ,TaskTracker According to the number of host cores and the size of memory, there is a fixed number of map Slot and reduce Slot . What needs to be emphasized here is :map Tasks are not randomly assigned to someone TaskTracker Of , This is related to the above mentioned

Data localization to (Data-Local).

TaskTracker Every once in a while JobTracker Send a heartbeat , tell JobTracker It's still running , At the same time, the heartbeat also carries a lot of information , Such as the current map Information about the progress of the task . When JobTracker When receiving the last task completion information of the job , Set the assignment to “ success ”. When JobClient When querying the status , It will know that the task has been completed , A message will be displayed to the user .

Hadoop Big data combat series Mapreduce  Computing framework


If you start from map End sum reduce End analysis , Please refer to the picture above , As follows :

Map End process :

1) Each input slice will have one map Tasks to deal with ,map The output will be temporarily put in a ring memory buffer ( The size of the buffer defaults to 100M, from io.sort.mb Attribute control ), When the buffer is about to overflow ( The default is buffer size 80%, from io.sort.spill.percent Attribute control ), An overflow file is created in the local file system , Write the data in this buffer to this file .

2) Before writing to disk , Thread first according to reduce The number of tasks divides the data into the same number of partitions , That's one reduce The task corresponds to the data of a partition . This is done to avoid some reduce The task is assigned to a large amount of data , And some of them reduce The task was assigned very little data , There's not even a data embarrassment . In fact, partitioning is about data hash The process of . Then sort the data in each partition , If you set Combiner, The results after sorting are Combine operation , The goal is to write as little data as possible to disk .

3) When map When the task outputs the last record , There may be a lot of overflow files , You need to merge these files . In the process of merging, there will be continuous sorting and Combine operation , There are two purposes :

 Minimize the amount of data written to disk each time ;

 Minimize the amount of data transmitted over the network in the next replication phase .

Finally, it is merged into a partitioned and sorted file . In order to reduce the amount of data transmitted by the network , You can compress the data here , As long as mapred.compress.map.out Set to true That's all right. .

4) Copy the data in the partition to the corresponding reduce Mission . How does the data in the partition know its corresponding reduce Which is it ? Actually map The mission has been with his father TaskTracker Keep in contact , and TaskTracker And all the time JobTracker Keep your heart beating . therefore JobTracker The macro information of the whole cluster is saved in . as long as reduce The task is to JobTracker Get the corresponding map The output position is OK .

Reduce End process :

1) Reduce It will receive different map Data from the mission , And each map The data is all in order . If reduce The amount of data received by the end is quite small , And it is stored directly in memory ( The buffer size is determined by mapred.job.shuffle.input.buffer.percent Attribute control , Represents 100% of the heap space used for this purpose branch Than ), Such as fruit Count According to the The amount super too 了 The slow blunt District Big Small Of One set Than example ( from

mapred.job.shuffle.merge.percent decision ), Then the data is merged and overflowed to the disk .

2) As the number of overflow files increases , Background threads will merge them into a larger, ordered file , This is done to save time for later merges . In fact, no matter in map End or end reduce End ,MapReduce It's sort over and over again , Merge operation , So the order is hadoop Soul .

3) During the process of merging, a lot of intermediate files will be produced ( Written to disk ), but MapReduce It will make the data written to the disk as little as possible , And the result of the last merge was not written to disk , It's a direct input to reduce function .

stay Map After processing the data , To Reduce Before we get the data , This process is in MapReduce Can be seen as a Shuffle The process of .

after mapper After the operation of , We learned that mapper The output of is like this key/value Yes . In the end, the current key Which one should be handed over reduce Do it , It needs to be decided now . MapReduce Provide Partitioner Interface , Its function is based on key or value And reduce To determine which of the current pairs of output data should be handed over to reduce task Handle . Default pair key do hash Then I'll use reduce task Numerical modeling . The default mode is just to average reduce Handling capacity of , If the user is right about Partitioner Demand , Can be customized and set to job On .

5. MapReduce error Processing mechanism

MapReduce There are two types of failures in the process of task execution : Failure caused by hardware failure and task execution failure .

1) Hardware failure

stay Hadoop Cluster in , only one JobTracker, therefore ,JobTracker There is a single point of failure in itself . How to solve JobTracker The single point of question ? We can use the main standby deployment mode , start-up JobTracker At the same time as the master node , Start one or more JobTracker Spare node . When JobTracker When there is a problem with the master node , By some sort of election algorithm , From the spare JobTracker Re select a master node from the node .

The machine is out of order JobTracker The mistake is TaskTracker error .TaskTracker Faults are relatively common ,MapReduce The fault is usually resolved by re executing the task .

stay Hadoop In the cluster , Under normal circumstances ,TaskTracker Will be constantly with JobTracker Communicate through heartbeat mechanism . If a TaskTracker There is a failure or slow operation , It will stop or rarely to JobTracker Send a heartbeat . If one TaskTracker In a certain amount of time ( The default is 1 minute ) With no JobTracker signal communication , that JobTracker Will TaskTracker Scheduling from waiting tasks TaskTracker Remove... From collection . meanwhile JobTracker Will ask for this TaskTracker The mission on will return immediately . If so TaskTracker The task is still

mapping Stage Map Mission , that JobTracker Will ask for other TaskTracker Re execute all original faults TaskTracker Executive Map Mission . If the mission is to Reduce Stage Reduce Mission , that JobTracker Will ask for other TaskTracker Re execute the fault TaskTracker Unfinished Reduce Mission .

such as : One TaskTracker Three assigned... Have been completed Reduce Two of the tasks , because Reduce Once the task is completed, the data will be written to HDFS On , So there is only a third unfinished Reduce Need to be re executed . But for Map In terms of tasks , Even if TaskTracker Part of it is done Map,Reduce You may still not be able to get all Map All the output of . So no matter Map Task completed or not , fault TaskTracker Upper Map Tasks must be re performed .

2) Failure caused by the failure of task execution

In practical tasks ,MapReduce The job will also encounter user code defects or process crash caused by task failure and so on . User code defects can cause it to throw exceptions during execution . here , Mission JVM The process will exit automatically , And to TaskTracker The parent process sends an error message , At the same time, the error message will be written to log file , Last TaskTracker Failed to mark this task attempt . The failure of the task caused by the process ,TaskTracker The listener will find that the process exited , here TaskTracker This task attempt will also be marked as failed . For dead loop programs or programs that take too long to execute , because TaskTracker No progress updates were received , It also marks the mission attempt as a failure , And kill the process corresponding to the program .

In the above case ,TaskTracker Marking a task attempt as failed will cause TaskTracker Its own task counter minus 1, In order to think JobTracker Apply for a new assignment .TaskTracker And it will tell you through the heart beat mechanism JobTracker A local task attempt failed .JobTracker After receiving the notification of task failure , By resetting the task status , Add it to the scheduling queue to reallocate the task execution (JobTracker Attempts to avoid reassigning failed tasks to failed ones TaskTracker). If this task tries 4 Time ( The number of times can be set ) It's not finished yet , It won't be tried again , At this point, the whole job also failed


版权声明
本文为[osc_ kzisjxiy]所创,转载请带上原文链接,感谢

Scroll to Top