问题导读:
1.hadoop哪些数据类型,是如何与Java数据类型对应的?
2.ApplicationMaster什么时候启动?
3.YarnChild进程什么时候产生?
4.如果在recuece的情况下,map任务完成暂总任务的多少百分比?
5.run的执行步骤是什么?
6.哪个方法来执行具体的map任务?
7.获取配置信息为哪个类?
8.TaskAttemptContextImpl还增加了什么信息?
4.3 Map类
4.3.1 Map介绍
1.jpg (52.27 KB, 下载次数: 5)
下载附件
保存到相册
- StringTokenizer itr = new StringTokenizer(value.toString());
- while (itr.hasMoreTokens()) {
- word.set(itr.nextToken());
- context.write(word, one);
- }
复制代码
4.3.2 Map任务分析
初始化任务,设置任务的当前状态为RUNNING,设置输出目录等。
4.3.3 runNewMapper分析
- private <INKEY,INVALUE,OUTKEY,OUTVALUE>
- void runNewMapper(final JobConf job,
- final TaskSplitIndex splitIndex,
- final TaskUmbilicalProtocol umbilical,
- TaskReporter reporter
- ) throws IOException, ClassNotFoundException,
- InterruptedException {
- // make a task context so we can get the classes
- org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter);
- // make a mapper
- org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
- ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
- // make the input format org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
- ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
- // rebuild the input split
- org.apache.hadoop.mapreduce.InputSplit split = null;20
- split = getSplitDetails(new path(splitIndex.getSplitLocation()), splitIndex.getStartOffset());
- LOG.info(“Processing split: ” + split);
- org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE> (split, inputFormat, reporter, taskContext);
- job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
- org.apache.hadoop.mapreduce.RecordWriter output = null;
- // get an output object
- if (job.getNumReduceTasks() == 0) {
- output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
- } else {
- output = new NewOutputCollector(taskContext, job, umbilical, reporter);
- }
- org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split);
- org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(mapContext);
- try {
- input.initialize(split, mapperContext);
- mapper.run(mapperContext);
- mapPhase.complete();
- setPhase(TaskStatus.Phase.SORT);
- statusUpdate(umbilical);
- input.close();
- input = null;
- output.close(mapperContext);
- output = null;
- } finally {
- closeQuietly(input);
- closeQuietly(output, mapperContext);
- }
- }
复制代码
- ReflectionUtils.newInstance(job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class, MapOutputCollector.class), job);
复制代码
- in.readLine(value, maxLineLength, Math.max(maxBytesToConsume(pos), maxLineLength));
复制代码
- public void run(Context context) throws IOException, InterruptedException {
- setup(context);
- try {
- while (context.nextKeyValue()) {
- map(context.getCurrentKey(), context.getCurrentValue(), context);
- }
- } finally {
- cleanup(context);
- }
- }
复制代码
- org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
- mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(mapContext);
复制代码
4.3.4 Spill分析
2.jpg (22.33 KB, 下载次数: 6)
下载附件
保存到相册
- kvbuffer = new byte[maxMemUsage];
- bufvoid = kvbuffer.length;
- kvmeta = ByteBuffer.wrap(kvbuffer).order(ByteOrder.nativeOrder()).asIntBuffer();
- setEquator(0);
- bufstart = bufend = bufindex = equator;
- kvstart = kvend = kvindex;
复制代码
3.jpg (15.47 KB, 下载次数: 5)
下载附件
保存到相册
- private void startSpill() {
- assert !spillInProgress;
- kvend = (kvindex + NMETA) % kvmeta.capacity();
- bufend = bufmark;
- spillInProgress = true;
- LOG.info(“Spilling map output”);
- LOG.info(“bufstart = ” + bufstart + “; bufend = ” + bufmark +
- “; bufvoid = ” + bufvoid);
- LOG.info(“kvstart = ” + kvstart + “(” + (kvstart * 4) +
- “); kvend = ” + kvend + “(” + (kvend * 4) +
- “); length = ” + (distanceTo(kvend, kvstart,
- kvmeta.capacity()) + 1) + “/” + maxRec);
- spillReady.signal();
- }
复制代码
这里会触发信号量,使得在MapTask类的init方法中正在等待的SpillThread线程继续运行。
- while (true) {
- spillDone.signal();
- while (!spillInProgress) {
- spillReady.await();
- }
- try {
- spillLock.unlock();
- sortAndSpill();
- } catch (Throwable t) {
- sortSpillException = t;
- } finally {
- spillLock.lock();
- if (bufend < bufstart) {
- bufvoid = kvbuffer.length;
- }
- kvstart = kvend;
- bufstart = bufend;
- spillInProgress = false;
- }
- }
复制代码
- final long size = (bufend >= bufstart? bufend – bufstart: (bufvoid – bufend) + bufstart) +partitions * APPROX_HEADER_LENGTH;
复制代码
- return lDirAlloc.getLocalPathForWrite(MRJobConfig.OUTPUT + “/spill”
- + spillNumber + “.out”, size, getConf());
复制代码
- writer = new Writer<K, V>(job, out, keyClass, valClass, codec, spilledRecordsCounter);
复制代码
- if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
- // create spill index file
- Path indexFilename =
- mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
- * MAP_OUTPUT_INDEX_RECORD_LENGTH);
- spillRec.writeToFile(indexFilename, job);
- } else {
- indexCacheList.add(spillRec);
- totalIndexCacheMemory +=
- spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- }
复制代码
4.jpg (16.06 KB, 下载次数: 5)
下载附件
保存到相册
- private void resetSpill() {
- final int e = equator;
- bufstart = bufend = e;
- final int aligned = e – (e % METASIZE);
- // set start/end to point to first meta record
- // Cast one of the operands to long to avoid integer overflow
- kvstart = kvend = (int)
- (((long)aligned – METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
- LOG.info(“(RESET) equator ” + e + ” kv ” + kvstart + “(” +
- (kvstart * 4) + “)” + ” kvi ” + kvindex + “(” + (kvindex * 4) + “)”);
- }
复制代码
4.3.5 合并
4.3.6 相关配置选项
来源URL:http://www.aboutyun.com/forum.php?mod=viewthread&tid=9370&highlight=hadoop2%CC%E1%BD%BB%B5%BDYarn