MapReduce程序运行原理及其执行过程

一、 MapReduce编程模型

        MapReduce采用"分而治之"的思想,把对大规模数据集的处理,分发给主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。简单地说,MapReduce就是"任务的分解与结果的汇总"

       在Hadoop中,用于执行MapReduce任务的机器角色有两个:一个是JobTracker;另一个是TaskTrackerJobTracker是用于调度工作的,TaskTracker是用于执行工作的。一个Hadoop集群中只有一台JobTracker

       在分布式计算中,MapReduce框架负责处理了并行编程中分布式存储、工作调度、负载均衡、容错均衡、容错处理以及网络通信等复杂问题。但需要注意的是,用MapReduce来处理的数据集(或任务)必须具备这样的特点:待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。

二、 特别数据类型介绍

        Hadoop提供了如下内容的数据类型,这些数据类型都实现了WritableComparable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储,以及进行大小比较

 另外还有一个:  NullWritable:当<key,value>中的keyvalue为空时使用。

三、MapReduce程序在hadoop中的执行过程

如上图-流程图-所示:

1客户端(JobClient)启动一个MapReduce作业;JobClient同时完成对数据源的切片,切片信息由InputSplit对象封装(这个切片过程是由MapReduce框架自动完成);JobClient通过getSplits方法来计算切片信息,切片默认大小和HDFS的块大小相同(64M),这样有利于map任务的本地化执行,无需通过网络传递数据。
2、然后本次启动生成一个Job
,于是JobClientJobTracker申请一个JobID以标识这个Job:JobTracker.getNewJobId()

3JobClient将运行作业所需要的资源文件复制HDFS中一个以 JobID命名的目录  中,资源文件包括MapReduce程序打包的JAR文件,其它依赖的jar、hadoop配置文件以及客户端计算所得的输入划分信息。这些文件都存放在JobTracker专门为该作业创建的以Job ID命名的文件夹中。JAR文件默认会有10个副本(mapred.submit.replication属性控制);输入划分信息告诉了JobTracker应该为这个作业启动多少个Mapper Task等信息(一个数据分片inputSplit对应一个Mapper Task)。
4JobClientJobTracker提交这个Job
5JobTracker初始化这个JobJobTracker接收到作业后,将其放在一个作业队列里,等待作业调度器对其进行调度,当作业调度器根据自己的调度算法调度到该作业时,会根据输入划分信息(步骤6是在步骤5初始化过程中进行的每个划分创建一个Mapper Task

6JobTrackerHDFS获取初始化这个Job的所需的Split等信息
7JobTrackerMapper Task分配给TaskTracker执行(8.9.10):JobTracker会遍历每一个InputSplit,根据其记录的引用地址选择距离最近的TaskTracker去执行,理想情况下切片信息就在TaskTracker的本地,这样节省了网络数据传输的时间
TaskTracker每隔一段时间会给JobTracker发送一个心跳,告诉JobTracker它依然在运行,同时心跳中还携带着很多的信息,比如当前map任务完成的进度等信息。当JobTracker收到作业的最后一个任务完成信息时,便把该作业设置成“成功”。当JobClient查询状态时,它将得知任务已完成,便显示一条消息给用户。
8TaskTrackerHDFS获取这个Job的相关资源(如:将任务jar包从HDFS拷贝到本地并进行解压);
9TaskTracker开启一个新的JVM(.创建一个新的JVM来执行具体的任务,这样做的好处是即使所执行的任务出现了异常,也不会影响TaskTracker的运行使用,因为TaskTracker本身也运行在jvm上)

10TaskTracker用新的JVM来执行MapReduce程序;

如果所执行的任务是map任务则处理流程大致如下:首先加载InputSplit记录的数据源切片,通过InputFormat的getRecordReader()方法, 获取到Reader后

[java] view plain copy

  1. K key = reader.createKey();  

  2. V value = reader.createValue();  

  3. while (reader.next(key, value)) {//遍历split中的每一条记录,执行map功能函数  

  4.     mapper.map(key, value, output, reporter);  

  5. }  

执行反馈:

mapreduce的执行是一个漫长的过程,执行期间会将任务的进度反馈给用户;
任务结束后,控制台会打印Counter信息,方便用户以全局的视角来审查任务。
执行成功

清理MapReduce本地存储(mapred.local.dir属性指定的目录)
清理map任务的输出文件。

 执行失败:

1.如果task出现问题(map或者reduce)
错误可能原因:用户代码出现异常;任务超过mapred.task.timeout指定的时间依然没有返回
错误处理:首先将错误信息写入日志;然后jobtracker会调度其他tasktracker来重新执行次任务,

如果失败次数超过4次(通过mapred.map.max.attempts和mapred.reduce.max.attempts属性来设置,默认为4),则job以失败告终;
如果系统不想以这种方式结束退出,而是想通过Task成功数的百分比来决定job是否通过,则可以指定如下两个属性:
mapred.max.map.failures.percent            map任务最大失败率
mapred.max.reduce.failures.percent        reduce任务最大失败率
如果失败比率超过指定的值,则job以失败告终。
2.如果是tasktracker出现问题
判断问题的依据:和jobtracker不再心跳通信
jobtracker将该tasktracker从资源池中移除,以后不在调度它
3.jobtracker出现问题
jobtracker作为系统的单点如果出现问题也是最为严重的问题,系统将处于瘫痪。

注:上述流程针对的是hadoop-0.20.0版本,该版本是使用的mapreuceAPIhadoopAPI中,已经有所变化。在Hadoop0.20.0版本以后,都包含一个新的Java MapReduce API,这个API有时也称为上下文对象,新的API在类型上不兼容以前的API。关于新旧API的区别,这里先不做介绍了,也什么意义,Apache官方已经不建议使用旧API了,估计旧API在Hadoop以后的版本中会被逐渐放弃,所以建议以后再写mapreuce程序是,使用新API。