1.为什么会产生Yarn?
2.Configuration类的作用是什么?
3.GenericOptionsParser类的作用是什么?
4.如何将命令行中的参数配置到变量conf中?
5.哪个方法会获得传入的参数?
6.如何在命令行指定reduce的个数?
7.默认情况map、reduce为几?
8.setJarByClass的作用是什么?
9.如果想在控制台打印job(maoreduce)当前的进度,需要设置哪个参数?
10.配置了哪个参数,在提交job的时候,会创建一个YARNRunner对象来进行任务的提交?
11.哪个类实现了读取yarn-site.xml、core-site.xml等配置文件中的配置属性的?
12.JobSubmitter类中的哪个方法实现了把job提交到集群?
13.DistributedCache在mapreduce中发挥了什么作用?
14.对每个输入文件进行split划分,是物理划分还是逻辑划分,他们有什么区别?
15.分片的大小有哪些因素来决定
16.分片是如何计算得来的?
1 概述
2 为什么要使用MapReduce
3 怎么运行MapReduce
4 如何编写MapReduce程序
4.1 代码构成
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * “License”); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an “AS IS” BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.examples;
- import java.io.IOException;
- import java.util.StringTokenizer;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.GenericOptionsParser;
- public class WordCount {
- public static class TokenizerMapper
- extends Mapper<Object, Text, Text, IntWritable>{
- private final static IntWritable one = new IntWritable(1);
- private Text word = new Text();
- public void map(Object key, Text value, Context context
- ) throws IOException, InterruptedException {
- StringTokenizer itr = new StringTokenizer(value.toString());
- while (itr.hasMoreTokens()) {
- word.set(itr.nextToken());
- context.write(word, one);
- }
- }
- }
- public static class IntSumReducer
- extends Reducer<Text,IntWritable,Text,IntWritable> {
- private IntWritable result = new IntWritable();
- public void reduce(Text key, Iterable<IntWritable> values,
- Context context
- ) throws IOException, InterruptedException {
- int sum = 0;
- for (IntWritable val : values) {
- sum += val.get();
- }
- result.set(sum);
- context.write(key, result);
- }
- }
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
- if (otherArgs.length != 2) {
- System.err.println(“Usage: wordcount <in> <out>”);
- System.exit(2);
- }
- Job job = new Job(conf, “word count”);
- job.setJarByClass(WordCount.class);
- job.setMapperClass(TokenizerMapper.class);
- job.setCombinerClass(IntSumReducer.class);
- job.setReducerClass(IntSumReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
- FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
复制代码
4.2 入口类4.2.1 参数获取
- private void parseGeneralOptions(Options opts, Configuration conf,
- String[] args) throws IOException {
- opts = buildGeneralOptions(opts);
- CommandLineParser parser = new GnuParser();
- try {
- commandLine = parser.parse(opts, preProcessForWindows(args), true);
- processGeneralOptions(conf, commandLine);
- } catch(ParseException e) {
- LOG.warn(“options parsing failed: “+e.getMessage());
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(“general options are: “, opts);
- }
- }
复制代码
而getRemainingArgs方法会获得传入的参数,接着在main方法中会进行判断参数的个数,由于此处是WordCount计算,只需要传入文件的输入路径和输出路径即可,因此参数的个数为2,否则将退出:
- if (otherArgs.length != 2) {
- System.err.println(“Usage: wordcount <in> <out>”);
- System.exit(2);
- }
复制代码
4.2.2 Job定义
- public Job(Configuration conf, String jobName) throws IOException {
- this(conf);
- setJobName(jobName);
- }
复制代码
可见,传入的”word count”就是Job的名字。而conf被传递给了JobConf进行环境变量的获取:
- public JobConf(Configuration conf) {
- super(conf);
- if (conf instanceof JobConf) {
- JobConf that = (JobConf)conf;
- credentials = that.credentials;
- }
- checkAndWarnDeprecation();
- }
复制代码
- public void setJarByClass(Class<?> cls) {
- ensureState(JobState.DEFINE);
- conf.setJarByClass(cls);
- }
复制代码
4.2.3 Job提交
- public boolean waitForCompletion(boolean verbose
- ) throws IOException, InterruptedException,
- ClassNotFoundException {
- if (state == JobState.DEFINE) {
- submit();
- }
- if (verbose) {
- monitorAndPrintJob();
- } else {
- // get the completion poll interval from the client.
- int completionPollIntervalMillis =
- Job.getCompletionPollInterval(cluster.getConf());
- while (!isComplete()) {
- try {
- Thread.sleep(completionPollIntervalMillis);
- } catch (InterruptedException ie) {
- }
- }
- }
- return isSuccessful();
- }
复制代码
- public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
- ClientCache clientCache) {
- this.conf = conf;
- try {
- this.resMgrDelegate = resMgrDelegate;
- this.clientCache = clientCache;
- this.defaultFileContext = FileContext.getFileContext(this.conf);
- } catch (UnsupportedFileSystemException ufe) {
- throw new RuntimeException(“Error in instantiating YarnClient”, ufe);
- }
- }
复制代码
- conf.get(MRJobConfig.MR_AM_STAGING_DIR,
- MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
- + Path.SEPARATOR + user
- + Path.SEPARATOR + STAGING_CONSTANT
复制代码
- long bytesRemaining = length; 2
- while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
- int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
- splits.add(makeSplit(path, length-bytesRemaining, splitSize,
- blkLocations[blkIndex].getHosts()));
- bytesRemaining -= splitSize;
- }
- if (bytesRemaining != 0) {
- int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
- splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
- blkLocations[blkIndex].getHosts()));
- }
复制代码
- protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
- for (int i = 0 ; i < blkLocations.length; i++) {
- // is the offset inside this block?
- if ((blkLocations[i].getOffset() <= offset) &&
- (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
- return i;
- }
- }
- BlockLocation last = blkLocations[blkLocations.length -1];
- long fileLength = last.getOffset() + last.getLength() -1;
- throw new IllegalArgumentException(“Offset ” + offset + ” is outside of file (0..” + fileLength + “)”);
- }
复制代码
- JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);
复制代码
- jobCopy.writeXml(out);
复制代码
- status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
复制代码
4.2.4 另一种运行方式
- ToolRunner.run(new Configuration(),new WordCount(), args);
复制代码
至此,我们的MapReduce的启动类要做的事情已经分析完了。