hadoop2提交到Yarn: Mapreduce执行过程分析1-Hadoop2|YARN-about云开发

1.为什么会产生Yarn?

2.Configuration类的作用是什么?

3.GenericOptionsParser类的作用是什么?

4.如何将命令行中的参数配置到变量conf中?

5.哪个方法会获得传入的参数?

6.如何在命令行指定reduce的个数?

7.默认情况map、reduce为几?

8.setJarByClass的作用是什么?

9.如果想在控制台打印job(maoreduce)当前的进度,需要设置哪个参数?

10.配置了哪个参数,在提交job的时候,会创建一个YARNRunner对象来进行任务的提交?

11.哪个类实现了读取yarn-site.xml、core-site.xml等配置文件中的配置属性的?

12.JobSubmitter类中的哪个方法实现了把job提交到集群?

13.DistributedCache在mapreduce中发挥了什么作用?

14.对每个输入文件进行split划分,是物理划分还是逻辑划分,他们有什么区别?

15.分片的大小有哪些因素来决定

16.分片是如何计算得来的?

1 概述

该瞅瞅MapReduce的内部运行原理了,以前只知道个皮毛,再不搞搞,不然怎么死的都不晓得。下文会以2.4版本中的WordCount这个经典例子作为分析的切入点,一步步来看里面到底是个什么情况。

 

2 为什么要使用MapReduce

Map/Reduce,是一种模式,适合解决并行计算的问题,比如TopN、贝叶斯分类等。注意,是并行计算,而非迭代计算,像涉及到层次聚类的问题就不太适合了。
从名字可以看出,这种模式有两个步骤,Map和Reduce。Map即数据的映射,用于把一组键值对映射成另一组新的键值对,而Reduce这个东东,以Map阶段的输出结果作为输入,对数据做化简、合并等操作。
而MapReduce是Hadoop生态系统中基于底层HDFS的一个计算框架,它的上层又可以是Hive、Pig等数据仓库框架,也可以是Mahout这样的数据挖掘工具。由于MapReduce依赖于HDFS,其运算过程中的数据等会保存到HDFS上,把对数据集的计算分发给各个节点,并将结果进行汇总,再加上各种状态汇报、心跳汇报等,其只适合做离线计算。和实时计算框架Storm、Spark等相比,速度上没有优势。旧的Hadoop生态几乎是以MapReduce为核心的,但是慢慢的发展,其扩展性差、资源利用率低、可靠性等问题都越来越让人觉得不爽,于是才产生了Yarn,并且二代版的Hadoop生态都是以Yarn为核心。Storm、Spark等都可以基于Yarn使用。

 

3 怎么运行MapReduce

明白了哪些地方可以使用这个牛叉的MapReduce框架,那该怎么用呢?Hadoop的MapReduce源码给我们提供了范例,在其hadoop-mapreduce-examples子工程中包含了MapReduce的Java版例子。在写完类似的代码后,打包成jar,在HDFS的客户端运行:
bin/hadoop jar mapreduce_examples.jar mainClass args
即可。当然,也可以在IDE(如Eclipse)中,进行远程运行、调试程序。
至于,HadoopStreaming方式,网上有很多。我们这里只讨论Java的实现。

 

4 如何编写MapReduce程序

    如前文所说,MapReduce中有Map和Reduce,在实现MapReduce的过程中,主要分为这两个阶段,分别以两类函数进行展现,一个是map函数,一个是reduce函数。map函数的参数是一个<key,value>键值对,其输出结果也是键值对,reduce函数以map的输出作为输入进行处理。

 

4.1 代码构成

    实际的代码中,需要三个元素,分别是Map、Reduce、运行任务的代码。这里的Map类是继承了org.apache.hadoop.mapreduce.Mapper,并实现其中的map方法;而Reduce类是继承了org.apache.hadoop.mapreduce.Reducer,实现其中的reduce方法。至于运行任务的代码,就是我们程序的入口。
    下面是Hadoop提供的WordCount源码。
  1. /**

 

  • * Licensed to the Apache Software Foundation (ASF) under one

 

 

  • * or more contributor license agreements.  See the NOTICE file

 

 

  • * distributed with this work for additional information

 

 

  • * regarding copyright ownership.  The ASF licenses this file

 

 

  • * to you under the Apache License, Version 2.0 (the

 

 

  • * “License”); you may not use this file except in compliance

 

 

  • * with the License.  You may obtain a copy of the License at

 

 

  • *

 

 

  • *     http://www.apache.org/licenses/LICENSE-2.0

 

 

  • *

 

 

  • * Unless required by applicable law or agreed to in writing, software

 

 

  • * distributed under the License is distributed on an “AS IS” BASIS,

 

 

  • * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 

 

  • * See the License for the specific language governing permissions and

 

 

  • * limitations under the License.

 

 

  • */

 

 

  • package org.apache.hadoop.examples;

 

 

 

 

  • import java.io.IOException;

 

 

  • import java.util.StringTokenizer;

 

 

 

 

  • import org.apache.hadoop.conf.Configuration;

 

 

  • import org.apache.hadoop.fs.Path;

 

 

  • import org.apache.hadoop.io.IntWritable;

 

 

  • import org.apache.hadoop.io.Text;

 

 

  • import org.apache.hadoop.mapreduce.Job;

 

 

  • import org.apache.hadoop.mapreduce.Mapper;

 

 

  • import org.apache.hadoop.mapreduce.Reducer;

 

 

  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

 

 

  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

 

  • import org.apache.hadoop.util.GenericOptionsParser;

 

 

 

 

  • public class WordCount {

 

 

 

 

  •   public static class TokenizerMapper

 

 

  •        extends Mapper<Object, Text, Text, IntWritable>{

 

 

 

 

  •     private final static IntWritable one = new IntWritable(1);

 

 

  •     private Text word = new Text();

 

 

 

 

  •     public void map(Object key, Text value, Context context

 

 

  •                     ) throws IOException, InterruptedException {

 

 

  •       StringTokenizer itr = new StringTokenizer(value.toString());

 

 

  •       while (itr.hasMoreTokens()) {

 

 

  •         word.set(itr.nextToken());

 

 

  •         context.write(word, one);

 

 

  •       }

 

 

  •     }

 

 

  •   }

 

 

 

 

  •   public static class IntSumReducer

 

 

  •        extends Reducer<Text,IntWritable,Text,IntWritable> {

 

 

  •     private IntWritable result = new IntWritable();

 

 

 

 

  •     public void reduce(Text key, Iterable<IntWritable> values,

 

 

  •                        Context context

 

 

  •                        ) throws IOException, InterruptedException {

 

 

  •       int sum = 0;

 

 

  •       for (IntWritable val : values) {

 

 

  •         sum += val.get();

 

 

  •       }

 

 

  •       result.set(sum);

 

 

  •       context.write(key, result);

 

 

  •     }

 

 

  •   }

 

 

 

 

  •   public static void main(String[] args) throws Exception {

 

 

  •     Configuration conf = new Configuration();

 

 

  •     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

 

 

  •     if (otherArgs.length != 2) {

 

 

  •       System.err.println(“Usage: wordcount <in> <out>”);

 

 

  •       System.exit(2);

 

 

  •     }

 

 

  •     Job job = new Job(conf, “word count”);

 

 

  •     job.setJarByClass(WordCount.class);

 

 

  •     job.setMapperClass(TokenizerMapper.class);

 

 

  •     job.setCombinerClass(IntSumReducer.class);

 

 

  •     job.setReducerClass(IntSumReducer.class);

 

 

  •     job.setOutputKeyClass(Text.class);

 

 

  •     job.setOutputValueClass(IntWritable.class);

 

 

  •     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

 

 

  •     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

 

 

  •     System.exit(job.waitForCompletion(true) ? 0 : 1);

 

 

  •   }

 

 

  • }

复制代码

 

4.2 入口类4.2.1 参数获取

首先定义配置文件类Configuration,此类是Hadoop各个模块的公共使用类,用于加载类路径下的各种配置文件,读写其中的配置选项。
    第二步中,用到了GenericOptionsParser类,其目的是将命令行中参数自动设置到变量conf中。
    GenericOptionsParser的构造方法进去之后,会进行到parseGeneralOptions,对传入的参数进行解析:
  1. private void parseGeneralOptions(Options opts, Configuration conf,

 

 

 

  •       String[] args) throws IOException {

 

 

 

 

  •     opts = buildGeneralOptions(opts);

 

 

 

 

  •     CommandLineParser parser = new GnuParser();

 

 

 

 

  •     try {

 

 

 

 

  •       commandLine = parser.parse(opts, preProcessForWindows(args), true);

 

 

 

 

  •       processGeneralOptions(conf, commandLine);

 

 

 

 

  •     } catch(ParseException e) {

 

 

 

 

  •       LOG.warn(“options parsing failed: “+e.getMessage());

 

 

 

 

 

 

 

 

  •       HelpFormatter formatter = new HelpFormatter();

 

 

 

 

  •       formatter.printHelp(“general options are: “, opts);

 

 

 

 

  •     }

 

 

 

 

  •   }

复制代码

 

 

而getRemainingArgs方法会获得传入的参数,接着在main方法中会进行判断参数的个数,由于此处是WordCount计算,只需要传入文件的输入路径和输出路径即可,因此参数的个数为2,否则将退出:

  1. if (otherArgs.length != 2) {

 

 

 

  •       System.err.println(“Usage: wordcount <in> <out>”);

 

 

 

 

  •       System.exit(2);

 

 

 

 

  • }

复制代码

 

如果在代码运行的时候传入其他的参数,比如指定reduce的个数,可以根据GenericOptionsParser的命令行格式这么写:
bin/hadoop jar MyJob.jar com.xxx.MyJobDriver -Dmapred.reduce.tasks=5
其规则是-D加MapReduce的配置选项,当然还支持-fs等其他参数传入。当然,默认情况下Reduce的数目为1,Map的数目也为1。
 
 

4.2.2 Job定义

   定义Job对象,其构造方法为:
  1. public Job(Configuration conf, String jobName) throws IOException {

 

 

 

  •     this(conf);

 

 

 

 

  •     setJobName(jobName);

 

 

 

 

  •   }

复制代码

 

可见,传入的”word count”就是Job的名字。而conf被传递给了JobConf进行环境变量的获取:

  1. public JobConf(Configuration conf) {

 

 

 

  •     super(conf);

 

 

 

 

  •     if (conf instanceof JobConf) {

 

 

 

 

  •       JobConf that = (JobConf)conf;

 

 

 

 

  •       credentials = that.credentials;

 

 

 

 

  •     }

 

 

  •      checkAndWarnDeprecation();

 

 

  •   }

复制代码

 

 

Job已经实例化了,下面就得给这个Job加点佐料才能让它按照我们的要求运行。于是依次给Job添加启动Jar包、设置Mapper类、设置合并类、设置Reducer类、设置输出键类型、设置输出值的类型。
    这里有必要说下设置Jar包的这个方法setJarByClass:
  1. public void setJarByClass(Class<?> cls) {

 

 

 

  •     ensureState(JobState.DEFINE);

 

 

 

 

  •     conf.setJarByClass(cls);

 

 

 

 

  •   }

复制代码

 

它会首先判断当前Job的状态是否是运行中,接着通过class找到其所属的jar文件,将jar路径赋值给mapreduce.job.jar属性。至于寻找jar文件的方法,则是通过classloader获取类路径下的资源文件,进行循环遍历。具体实现见ClassUtil类中的findContainingJar方法。
    搞完了上面的东西,紧接着就会给mapreduce.input.fileinputformat.inputdir参数赋值,这是Job的输入路径,还有mapreduce.input.fileinputformat.inputdir,这是Job的输出路径。具体的位置,就是我们前面main中传入的Args。

 

4.2.3 Job提交

    万事俱备,那就运行吧。
    这里调用的方法如下:
  1. public boolean waitForCompletion(boolean verbose

 

 

 

  •                                    ) throws IOException, InterruptedException,

 

 

 

 

  •                                             ClassNotFoundException {

 

 

 

 

  •     if (state == JobState.DEFINE) {

 

 

 

 

  •       submit();

 

 

 

 

  •     }

 

 

 

 

  •     if (verbose) {

 

 

 

 

  •       monitorAndPrintJob();

 

 

 

 

  •     } else {

 

 

 

 

  •       // get the completion poll interval from the client.

 

 

 

 

  •       int completionPollIntervalMillis =

 

 

 

 

  •         Job.getCompletionPollInterval(cluster.getConf());

 

 

 

 

  •       while (!isComplete()) {

 

 

 

 

  •         try {

 

 

 

 

  •           Thread.sleep(completionPollIntervalMillis);

 

 

 

 

  •         } catch (InterruptedException ie) {

 

 

 

 

  •         }

 

 

 

 

  •       }

 

 

 

 

  •     }

 

 

 

 

  •     return isSuccessful();

 

 

 

 

  •   }

复制代码

 

 

至于方法的参数verbose,如果想在控制台打印当前的进度,则设置为true。
   至于submit方法,如果当前在HDFS的配置文件中配置了mapreduce.framework.name属性为“yarn”的话,会创建一个YARNRunner对象来进行任务的提交。其构造方法如下:

 

  1. public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,

 

 

 

  •       ClientCache clientCache) {

 

 

 

 

  •     this.conf = conf;

 

 

 

 

  •     try {

 

 

 

 

  •       this.resMgrDelegate = resMgrDelegate;

 

 

 

 

  •       this.clientCache = clientCache;

 

 

 

 

  •       this.defaultFileContext = FileContext.getFileContext(this.conf);

 

 

 

 

  •     } catch (UnsupportedFileSystemException ufe) {

 

 

 

 

  •       throw new RuntimeException(“Error in instantiating YarnClient”, ufe);

 

 

 

 

  •     }

 

 

 

 

  •   }

复制代码

 

 

其中,ResourceMgrDelegate实际上ResourceManager的代理类,其实现了YarnClient接口,通过ApplicationClientProtocol代理直接向RM提交Job,杀死Job,查看Job运行状态等操作。同时,在ResourceMgrDelegate类中会通过YarnConfiguration来读取yarn-site.xml、core-site.xml等配置文件中的配置属性。

 

   下面就到了客户端最关键的时刻了,提交Job到集群运行。具体实现类是JobSubmitter类中的submitJobInternal方法。这个牛气哄哄的方法写了100多行,还不算其几十行的注释。我们看它干了点啥。
Step1:
检查job的输出路径是否存在,如果存在则抛出异常。

 

Step2:
初始化用于存放Job相关资源的路径。注意此路径的构造方式为:
  1. conf.get(MRJobConfig.MR_AM_STAGING_DIR,

 

 

 

  •         MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)

 

 

 

 

  •         + Path.SEPARATOR + user

 

 

 

 

  • + Path.SEPARATOR + STAGING_CONSTANT

复制代码

 

其中,MRJobConfig.DEFAULT_MR_AM_STAGING_DIR为“/tmp/hadoop-yarn/staging”,STAGING_CONSTANT为”.staging”。

 

Step3:
设置客户端的host属性:mapreduce.job.submithostname和mapreduce.job.submithostaddress

 

Step4:
通过RPC,向Yarn的ResourceManager申请JobID对象。

 

Step5:
从HDFS的NameNode获取验证用的Token,并将其放入缓存。

 

Step6:
将作业文件上传到HDFS,这里如果我们前面没有对Job命名的话,默认的名称就会在这里设置成jar的名字。并且,作业默认的副本数是10,如果属性mapreduce.client.submit.file.replication没有被设置的话。

 

Step7:
文件上传到HDFS之后,还要被DistributedCache进行缓存起来。这是因为计算节点收到该作业的第一个任务后,就会有DistributedCache自动将作业文件Cache到节点本地目录下,并且会对压缩文件进行解压,如:.zip,.jar,.tar等等,然后开始任务。
最后,对于同一个计算节点接下来收到的任务,DistributedCache不会重复去下载作业文件,而是直接运行任务。如果一个作业的任务数很多,这种设计避免了在同一个节点上对用一个job的文件会下载多次,大大提高了任务运行的效率。

 

Step8:
对每个输入文件进行split划分。注意这只是个逻辑的划分,不是物理的。因为此处是输入文件,因此执行的是FileInputFormat类中的getSplits方法。只有非压缩的文件和几种特定压缩方式压缩后的文件才分片。分片的大小由如下几个参数决定:mapreduce.input.fileinputformat.split.maxsize、mapreduce.input.fileinputformat.split.minsize、文件的块大小。
具体计算方式为:
Math.max(minSize, Math.min(maxSize, blockSize))
分片的大小有可能比默认块大小64M要大,当然也有可能小于它,默认情况下分片大小为当前HDFS的块大小,64M。
   接下来就该正儿八经的获取分片详情了。代码如下:
  1. long bytesRemaining = length; 2

 

  •           while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {

 

 

 

 

  •             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);

 

 

 

 

  •             splits.add(makeSplit(path, length-bytesRemaining, splitSize,

 

 

  •                                      blkLocations[blkIndex].getHosts()));

 

 

 

 

  •             bytesRemaining -= splitSize;

 

 

  •           }

 

 

 

 

  •           if (bytesRemaining != 0) {

 

 

  •             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);

 

 

 

 

  •             splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,

 

 

  •                        blkLocations[blkIndex].getHosts()));

 

 

 

 

  •           }

复制代码

 

Step8.1:
   将bytesRemaining(剩余未分片字节数)设置为整个文件的长度。
Step8.2:
如果bytesRemaining超过分片大小splitSize一定量才会将文件分成多个InputSplit,SPLIT_SLOP(默认1.1)。接着就会执行如下方法获取block的索引,其中第二个参数是这个block在整个文件中的偏移量,在循环中会从0越来越大:
  1. protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {

 

  •     for (int i = 0 ; i < blkLocations.length; i++) {

 

 

  •       // is the offset inside this block?

 

 

  •       if ((blkLocations[i].getOffset() <= offset) &&

 

 

  •           (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){

 

 

  •         return i;

 

 

  •       }

 

 

  •     }

 

 

 

 

  •     BlockLocation last = blkLocations[blkLocations.length -1];

 

 

  •     long fileLength = last.getOffset() + last.getLength() -1;

 

 

  •     throw new IllegalArgumentException(“Offset ” + offset + ” is outside of file (0..” + fileLength + “)”);

 

 

  •   }

复制代码

 

将符合条件的块的索引对应的block信息的主机节点以及文件的路径名、开始的偏移量、分片大小splitSize封装到一个InputSplit中加入List<InputSplit> splits。
Step8.3:
bytesRemaining -= splitSize修改剩余字节大小。剩余如果bytesRemaining还不为0,表示还有未分配的数据,将剩余的数据及最后一个block加入splits。
Step8.4
如果不允许分割isSplitable==false,则将第一个block、文件目录、开始位置为0,长度为整个文件的长度封装到一个InputSplit,加入splits中;如果文件的长度==0,则splits.add(new FileSplit(path, 0, length, new String[0]))没有block,并且初始和长度都为0;
Step8.5
将输入目录下文件的个数赋值给 “mapreduce.input.num.files”,方便以后校对,返回分片信息splits。
  这就是getSplits获取分片的过程。当使用基于FileInputFormat实现InputFormat时,为了提高MapTask的数据本地性,应尽量使InputSplit大小与block大小相同。
 如果分片大小超过bolck大小,但是InputSplit中的封装了单个block的所在主机信息啊,这样能读取多个bolck数据吗?
比如当前文件很大,1G,我们设置的最小分片是100M,最大是200M,当前块大小为64M,经过计算后的实际分片大小是100M,这个时候第二个分片中存放的也只是一个block的host信息。需要注意的是split是逻辑分片,不是物理分片,当Map任务需要的数据本地性发挥作用时,会从本机的block开始读取,超过这个block的部分可能不在本机,这就需要从别的DataNode拉数据过来,因为实际获取数据是一个输入流,这个输入流面向的是整个文件,不受split的影响,split的大小越大可能需要从别的节点拉的数据越多,从从而效率也会越慢,拉数据的多少是由getSplits方法中的splitSize决定的。所以为了更有效率,分片的大小尽量保持在一个block大小吧。

 

Step9:
将split信息和SplitMetaInfo都写入HDFS中。使用方法:
  1. JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);

复制代码

Step10:
对Map数目设置,上面获得到的split的个数就是实际的Map任务的数目。

 

Step11:
相关配置写入到job.xml中:
  1. jobCopy.writeXml(out);

复制代码

Step12:
通过如下代码正式提交Job到Yarn:
  1. status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

复制代码

  这里就涉及到YarnClient和RresourceManager的RPC通信了。包括获取applicationId、进行状态检查、网络通信等。

 

Step13:
上面通过RPC的调用,最后会返回一个JobStatus对象,它的toString方法可以在JobClient端打印运行的相关日志信息。

 

4.2.4 另一种运行方式

   提交MapReduce任务的方式除了上述源码中给出的之外,还可以使用ToolRunner方式。具体方式为:
  1. ToolRunner.run(new Configuration(),new WordCount(), args);

复制代码

至此,我们的MapReduce的启动类要做的事情已经分析完了。

来源URL:http://www.aboutyun.com/thread-9366-1-1.html