MR v1 与 MR v2

MapReduce 框架在大规模数据集中的优良表现使其得到广泛应用,然而随着数据量的增长以及新应用类型的出现,最初的 MapReduce 框架(MR v1)面临诸多瓶颈。所以,Apache 又提出了 MR v2 以解决最初的 MapReduce 遇到的问题。 两者的主要区别有如下几点:

  • 扩展性:在 MR v1 中,资源管理和作业控制在同一个组件中管理。当数据量过大时,该组件的能力成为扩展集群规模的瓶颈。而 MR v2 将资源管理和作业控制分别作为两个可通信的独立组件,这使得瓶颈节点的压力得到了分散,提高了可扩展性。
  • 可靠性:MR v1 采用传统的 Master/Slave 架构,这时的 Master 存在单点故障问题,一旦 Master 节点失效,所有作业都将无法运行。MR v2 则将作业的 master 独立于资源分配,在 master 失效后可进行重新调度作业而不影响整个平台。
  • 资源利用率:MR v1 中资源分配的最小单位为 slot,是一种相对粗粒度的分配方案。因为一个任务很可能不能完全利用一个 slot 中的所有资源而造成浪费,此外 Map 和 Reduce 任务的 slot 不能共享,也就意味着会有闲置资源的等待问题。 在 MR V2 中有专门的资源管理器,结合管理员配置以及程序需求创建 container(一组资源的结合)运行任务。
  • 程序类型:MR v1的资源管理以及任务调度专为 MapReduce 这种离线数据处理应用制作,而后来兴起的内存计算,流式数据处理等计算需要一个更普适性的平台。

Apache YARN

YARN(Yet Another Resource Negotiator)是一种资源统一管理平台,与其类似的还有Facebook 的 Corona 和 Berkeley 的 Mesos。 由于计算框架类型的不同,使用这种资源统一管理平台才能将多种计算框架部署于同一个集群上,能更有效的利用资源以及降低运维成本。

YARN 组成结构

YARN 在资源管理上仍是 Master/Slave 架构,其中 Master 为 ResourceManager Slave 为 NodeManager。而对于应用的调度,则需要指定 ApplicationMaster 负责向 ResourceManager 申请资源,与 NodeManager 进行通信以获取信息进行任务调度。 要注意在 YARN 作为资源管理平台的视野中,ApplicationMaster 和其他的 Task 的地位是相似的,都运行在 Container中。其间关系如图所示: YARN 基本组成结构

  1. Container: 资源分配的单位,封装了多种所需资源。task 与 container 是一对一的关系,由应用程序需求动态生成。
  2. ResourceManager(RM): 全局资源管理,负责系统资源分配。可通过 Zookeeper 来解决单点失效的问题。其中包括两个组件
    1. 调度器:根据各种限制条件将系统中的资源分配给应用程序。因为它仅负责资源分配,YARN 中将其设计为松耦合组件,提供几种实现方式,也可以由用户根据需求进行重新设计。
    2. 应用程序管理器:管理系统中所有应用,负责启动、监控 AM 并在失败时重启。
  3. NodeManager(NM):运行在每个物理节点上的资源和任务管理器,主要负责两部分工作
    1. 定时向 RM 汇报本节点资源使用情况以及 container 运行状态
    2. 接收并处理来自 AM 的任务操作请求
  4. ApplicationMaster(AM): 用户提交的应用程序需要制定 AM,YARN 内置两个 AM 的实现,其一是一个 demo 以 shell script 的形式给出,其二是 MapReduce 程序的 AM —— MRAppMaster。AM 主要负责以下三个功能
    1. 与 RM 调度器通信获取资源
    2. 与 NM 通信控制任务进程
    3. 监控调度属于他的所有任务,在失败时进行重新调度

YARN 通信协议

各组件之间通过 RPC 通信。为保证兼容性,YARN 采用 Google 的 Protocol Buffers,在编译源代码过程中可能由于 protobuf 版本问题导致编译不通过。 在 YARN 中的通信总体上采用 pull 模型,也就是 client 主动向 server 发起连接请求。在 Hadoop 中有以下几种协议负责不同组件间的通信:

  • ClientRMProtocol: JobClient 与 RM 之间的通信,用于提交应用程序、查询程序状态
  • RMAdminProtocol: 系统管理员通过该协议更新系统配置文件
  • AMRMProtocol:每个 AM 通过该协议向 RM 注册并为各个 task 申请资源
  • ContainerManager: AM 通过该协议控制 NM 启动停止 container,以及获取各个 container 的状态信息
  • ResourceTracker:NM 通过该协议向 RM 注册并定时发送心跳信息汇报节点资源使用情况以及 container 的运行信息。

YARN 工作流程

当JobClient 提交应用程序到 YARN 时,首先启动对应的 AM,然后由 AM 创建应用、申请资源、监控只到作业完成,如下图具体包含以下几个步骤1YARN 工作流程

  1. 用户提交程序
  2. RM 分配 Container,与 对应 NM 通信告知启动 AM
  3. AM 向 RM 注册,程序进入运行状态,重复执行4-7直至结束
  4. AM 采用轮询方式向 RM 申请领取资源
  5. AM 申请到资源后与 NM 通信告知其启动任务
  6. NM 设置好运行环境,通过脚本启动任务
  7. 各任务向 AM 汇报进度
  8. 完成后 AM 向 RM 发起注销请求,关闭自己

YARN 编程模型

在 YARN 中以采用事件驱动的有限状态机进行运行控制,在 MR v1 中控制 Job 以及 Task 的 Progress 类将由用状态机重写的类替代。 对于一个生命周期较长的对象,其状态都会由一个状态机来表示。同时设计事件监听器(EventListener)和事件派发器(EventDispatcher)。 事件和状态机的变迁相互影响,也就是说监听到某事件可能会导致状态机的变迁,而状态机变迁时的行为有可能是产生一个新的事件。 所有的事件都将经过AsyncDispatcher 进行排队分发,这种事件集中排队分发的设计方式与Kafka2类似。

此外,考虑到耦合性问题,YARN 中的组件以服务化的形式出现,均实现 Service 接口。AbstractService是一个抽象类,提供了一个 Service 最基本的实现。 对于非组合服务,可以直接继承AbstarctService类,而对于组合服务,比如MRAppMaster组合了各种服务对象,则需要继承CompositeService

MRAppMaster 分析

MRAppMaster继承CompositeService,是 MapReduce 型应用程序的 ApplicationMaster。 该类的 Java Doc 中介绍到,应用程序所用到的状态机被封装到接口Job的实现类中,所有状态的改变都要通过Job接口。事件(event)会导致该有限状态机的变迁。 该类本分是一个松耦合的服务组件,和其他组件的交互行为都是通过事件产生的,这种方式与参与者模式(Actor Model)很类似。 所有的组件都会向一个中心分发器(dispatcher)注册,事件均由该分发器调度分发。而各组件之间的信息会通过AppContext进行共享。

MRAppMaster源码中有一个内部类 RunningAppContext实现AppContext接口,通过context可以创建 Job, Task, TaskAttempt 以及 Speculative 的事件分发器。 这个类中的代码多为环境搭建的操作,比如各服务组件的组合,监听器和分发器的创建注册,用户的验证等。 在serviceStart()方法的最后调用startJobs()创建一个job-start事件并发送至AsyncDispatcher,以此触发整个 Job 的状态机。


  1. 参考自《Hadoop 技术内幕:深入解析 MapReduce架构设计与实现原理》 董西成 著 

  2. https://kafka.apache.org 

对于一个简单的 MapReduce 作业,用户只需要实现 Map 与 Reduce 两个函数。 这两个函数由继承 Mapper 与 Reducer 类并重写相关方法得来, 在 Hadoop2.7.3 源码中可以看到,hadoop-mapreduce-project module 中的示例程序 WordCount。 其中包含两个静态内部类: TokenizerMapper和IntSumReducer。 分别继承自 Mapper 和 Reducer,并重写了对应的 map 和 reduce 方法。在 Hadoop 中会通过反射将这两个类分别注入到 MapTask 和 ReduceTask 中进行调度运行。 其中 MapTask 可以细分为Read, Map, Collect, Spill 和 Combine 五个阶段;ReduceTask 可以细分为 Shuffle, Merge, Sort, Reduce 和 Write 五个阶段。


Task 运行概述

总体来说,MapReduce 过程分为 Map 阶段以及 Reduce 阶段。 其中每个 Map Task 处理一篇数据(InputSplit,属逻辑分片),而 Reduce Task 从每个 Map Task 的输出中拷贝相应数据,经处理后写入 HDFS 中作为最终结果。 总体上来看,整个过程采用了 pull 模型。也就是说,Map Task 将中间结果保存到本地磁盘中,再由 Reduce Task 从 Map Task 端拷贝拉取(pull)数据进行后续处理。 这两者之间通过 HTTP 协议传输数据。

Map Task 流程:

  1. 根据用户指定、编写的 InputFormat 将Split 解析成为 键值对(k1,v1) 的形式
  2. 上述(k1,v1) 输入 map 函数,得到(k2,v2)
  3. 根据指定的 Partitioner(默认为 Hash Partitioner) 进行数据分片, 以确定每个键值对应该由哪个 Reduce Task 处理
  4. 将经过之前处理后的数据通过用户定义的 Combiner(可选项) 进行本地规约
  5. 将处理后的结果保存至本地硬盘上

Reduce Task 流程:

  1. 通过 HTTP 协议从各个已完成的 Map Task 中拷贝相应的数据
  2. 数据拷贝完成后根据 key 值进行排序,将 key 相等的记录聚集到一起形成若干分组
  3. 将每组数据输入 reduce 函数中
  4. 将 reduce 的输出写入 HDFS

Task 抽象类分析

Task 作为一个抽象类,是 MapTask 和 ReduceTask 的父类。 其中若干内部类作为 Task 运行流程中的工具类:

Task 内部类介绍

  • Counter: 统计 Task 内部各类数据,目前已废弃,使用单独的枚举类TaskCounter 实现。
  • TaskReporter: 继承自 StatusReporter 并实现ReporterRunnable接口。作为一个独立的线程报告任务的进度、状态以及更新计数器。 若长期未收到该线程的报告,则视为该任务失效,杀死并重新调度。
  • GcTimeUpdater: 记录并更新该任务在 GC 中所消耗的时间
  • FileSystemStatisticUpdater: Hadoop 提供一个 FileSystem 抽象类,为了保证不同文件系统之间的通用性。其中包含内部类Statistics,该类中包含用 volatile 关键字标注的统计用变量。 FileSystemStatisticUpdater 在其中迭代上述变量并记录最新状态,并创建各变量对应的 Counter。
  • ValuesIterator: 以排序后的 RawKeyValueIterator 为输入,以迭代value 为目的的键值对迭代器。
  • CombineValuesIterator: 继承自ValuesIterator, 仅在重写 next()方法时添加了 combine 计数器。
  • CombineRunner: 以抽象类的形式对 combinerRunner 的两种API进行一次封装, 实现create()方法并在其中实例化 OldCombineRunnerNewCombineRunner。 规定combine()接口,以空方法表示,要求子类重写。
  • OldCombineRunner: 实现了combine()方法的子类,通过反射实例化 Reducer ,迭代CombineValuesIterator 将具有相同 key 值的键值对进行 reduce 操作。
  • NewCombineRunner: 为使用新的 API 实现一个内部类 OutputConverter<K,V> 并创建 ReduceContext 作为参数传入 Reducer 的 run 方法中。 新 API 封装性更好,将迭代过程封装到 Reducer 内部, 通过ReduceContext 设置参数。

Task 方法介绍

Task 作为一个抽象类,提供了许多 get/set 方法。包括各类秘钥(JobTokenSecret, ShffleSecret, EntryptedSpillKey),流程控制方法(partition, phase, FsStatistics, progress, status), 任务相关标示(taskID, isJobSetupTask, isMapOrReduce)。

接口

作为一个 Task 最重要的 run 方法, 规定了一个接口,由 MapTaskReduceTask 分别各自实现。

  public abstract void run(JobConf job, TaskUmbilicalProtocol umbilical)
    throws IOException, ClassNotFoundException, InterruptedException;

其第一个参数JobConf是作业从 Client 创建时指定的内容, 第二个参数 TaskUmbilicalProtocol 是 Hadoop 所有子进程与其父进程通信的协议接口。 父进程指一个守护进程,它与 Master 进行通信,要求创建 Map 或 Reduce 任务并作为子进程运行。

任务准备

Taskinitilize 方法进行任务初始化。 首先,根据JobConf创建 JobContextImpl:jobContextTaskAttemptContextImpl:taskContext,并将任务状态由UNSSIGNED修改为RUNNING。 若使用 new API, 需要通过反射获得 outputFormat,从 OutputFormat 中获取OutputCommitter:committer,否则直接从JobConf中取得committer。 然后指定输出目录,调用comitter.setupTask()完成任务创建。 最后使用到ResourceCalculatorProcessTree进行一些资源管理方面的处理。

读写

readFiles(DataInput in)write(DataOutput out) 分别负责从(向)I/O 流中读取(写入)数据。 此外还需判断当前热舞是否为 Job clean up 或 Task clean up 任务并进行相关操作。

Map Task 内部实现

run 方法

MapTaskrun()进行 task 相关设置

  1. 根据是否包含 Reduce 任务设置进度百分比:
    1. Reduce 任务数量等于0,task 只有 map 阶段,完成时进度显示为100%(1.0f)
    2. Reduce 任务数量大于0, task 分为 map 和 sort 两个阶段,前者完成时进度为66.7%(0.667f),后者即为剩下的33.3%(0.333f)
  2. 调用 jobConf:getUseNewMapper() 判断使用的 API类型并传入父类的 initilize()方法进行初始化
  3. 并判断任务是否为 setup 或 cleanup 操作完成, 若是则执行相关方法并返回,否则进行常规的 map 任务
  4. 具体的 Map 操作由 runNewMapper()runOldMapper()实现。
  5. map 操作完成后,调用 done() 方法进行收尾工作。

runOldMapper 方法

对于一个 Map 任务,首先要找到与之对应的 Split 这里涉及到三个方法

    InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()),
           splitIndex.getStartOffset());
    updateJobWithSplit(job, inputSplit);
    reporter.setInputSplit(inputSplit);

注意到InputSplit 在这里(runOldMapper)是一个接口,而与之同名的抽象类用于新的 API(runNewMapper()) 中。 FileSplit 通过实现InputSplitWithLocationInfo(子接口)并继承InputSplit以达到在新旧 API 中能够通用的目的。 InputSplit表示每个task 需要处理的数据, 一般来说以面向字节的方式呈现,而 RecordReader 负责以面向记录(record-oriented) 的方式呈现数据。 另外一个需要指出的是变量splitIndex,它是类JobSplit的一个内部静态类TaskSplitIndex的实例, 用于指定该任务待处理数据在文件中的偏移量。 之后通过两个方法将 inputSplit 的详细内容更新到 Job 和 reporter 中。 需要了解 Split 的详细内容请点击这里

接下来,会判断 skipping mode 实例化 SkippingRecordReader 或 TrackedRecordReader。 在 Map Task 中,读取 Record 出现错误时该 Record 会被标记, 如果多次出现问题,将会进入 skipping mode,跳过所有被标记的 Record。 之后根据 Reduce Task 的数量实例化 MapOutputCollector: 对于不存在 Reduce Task 的作业,将实例化DirectMapOutputCollector,把结果直接输出。 而正常存在 Reduce Task 的作业,将调用createSortingCollector()JobContext 中读取 Collectors 列表并迭代选择合适的 Collector 进行初始化。 然后通过MapRunnable接口创建 runner,该接口是为了扩展 Map 通用性,使其在多线程或或异步 mapper 中更具可操作性。调用runner.run()之后 map 阶段完成, 对于存在 Reduce task 的作业,进入 sort 阶段。 最后更新状态进行资源的释放,本方法结束。

另外值得一提的是 MapOutPutBuffer 中存在一个默认 Collector : MapOutPutBuffer 该类是 MapTask 的一个内部类, 涉及到 Spill操作和缓冲区设计的细节, 需要了解请点击这里

Reduce Task 内部实现

run 方法

与 Map 类似,判断是否为正常的 MapReduce 任务(即非 setup 或 cleanup 任务)。是则通过Progress类添加三个阶段,分别为 copy, sort和 reduce,否则进行相关特殊任务的操作。 与 Map 不同的是,这里用到接口ShuffleConsumerPlugin,Hadoop 提供它的一个实现类Shuffle。 copy 和 sort 两个阶段均在 Shuffle:run() 中完成。该方法返回值为一个迭代器类RawKeyValueIterator。关于 shuffle 的详细介绍请点击这里 在清理 shuffle 相关数据结构以及标记sort 阶段结束之后,进入reduce 阶段,根据 useNewApi 选择调用 runNewReducer()runOldReducer() 进行 reduce操作,其中 reduce 需要对键值对进行分组,用到RawComparator类。 最后关闭释放资源标记任务结束。

runOldReducer方法

首先仍然是通过反射从 JobConf 中获取到 Reducer。 键值对的输出使用到RecordWriter, 在旧 API 中实例化的是 OldTrackingRecordWriter:out, 并为保证数据不变形用final修饰变量finalOut,其值为out。 之后创建一个OutputCollector的实例,重写其collect方法。在该方法中使用finalOut输出数据并用reporter更新任务进度。 与 Map 相似,Reduce 中也存在 Skip Mode,Hadoop 会根据isSkipping()实例化一种 ReduceValuesIterator。 之后会逐 Key 循环进行 reduce 操作。若处于 Skip Mode 还会更新坏记录的数量。 同样在最后进行清理工作与资源能释放。

MapReduce 的数据传递

Hadoop MapReduce 中涉及以下数据存储单位:

  • block: HDFS 文件存储的基本单位,在 HDFS 中所有文件都以 block 进行存储,一式三份以保证容错能力。
  • split: map 任务的输入,是一种逻辑划分,map task 和 split 是一对一关系。
  • record: map 和 reduce 函数处理的目标, 是一个键值对。
  • spill: map 函数的输出超出缓冲区大小时写入本地磁盘的文件。

InputFormat

InputFormat 是一个接口,规定了两个方法,有如下功能:

  • 验证作业输入是否符合标准
  • 将给定的输入文件(可能是多个)按照给定规则划分为多个部分(Split)作为Map Task的输入
  • 提供将 Spilt 转化为 map 函数能处理的键值对形式的一种实现
public interface InputFormat<K, V> {
  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
  RecordReader<K, V> getRecordReader(InputSplit split,
                                     JobConf job,
                                     Reporter reporter) throws IOException;
}

getSplits()会根据作业信息以及 Hadoop 配置产生出一组 Split。 要注意,这些 Split 作为逻辑分区并不真实存在于每个 Map 节点上(从其实现类的源码中可以看出)。 每个 Split 可由一个三元组(input-file-path, start, offset)表示(此处存疑,但源码中 getSplit 方法的注释如此描述)。 getRecordReader() 则由不同的实现类负责划分 record 的界限,为 map 操作做好准备。

InputFormat的设计通过多级划分,充分考虑了复用性和差异性。其类间关系如下图所示: InputFormat继承结构

接口InputFormat最主要的实现是FileInputFormat而他本身也只是一个抽象类。其中涉及到大量文件操作的方法,包括多文件操作的过滤器,递归访问,路径设置等。 其中最重要的就是实现了getSplits()方法,其中涉及到 Split 分割的算法。 Split 大小由如下公式计算得来 其中目标大小: , minSize由配置文件指定,默认为1。 也就是说通常情况下, Split 的大小与 blockSize 的默认大小(64M)相等。 若想增大 Split 的大小,从配置文件中增大 minSize 即可。 这里给出 三个文件,大小分别为(1MB, 32MB 和 250MB)各 size 取不同值对应的 Split 数目1:

minsize goalSize splitSize file3 split 数目 split 总数目
1 MB totalSize(numSplits=1) 64MB 4 6
32 MB totalSize/5 50MB 5 7
128MB totalSize/2 128MB 2 4

在进行逻辑上的 Split 划分之后,需要确定该 Split 的物理位置。因为 HDFS 中文件以 Block 形式存储且一式三份,也就意味着对于一个 Split,与之对应的 block 会有多个。getSplitHosts()方法接收网络拓扑和 Split 信息做为参数,将返回一组对当前 split 贡献度最大的 hosts(节点),一般来说,考虑到数据本地性,同一Rack 的节点贡献度要高于跨 rack 节点。

InputFormat 的最后一项任务就是 getRecordReader(), 该方法因输入文件类型以及程序目的的不同又各子类进行实现。RecordReader 在实现时需要注意两点,第一:记录需要被分解为 Key, value 两部分,因为 map 函数只能接受键值对形式的输入。 第二:记录可能因 Split 的划分被分割成两部分,RecordReader 规定每个 Split 的第一条不完整 Record 为上一个 Split 所有。

OutputFormat

接口OutputFormat 中规定两个方法,其一是getRecordWriter(),获取用于写输出文件的 RecordWriter, 同样根据场景子类进行实现。 另一个方法为checkOutputSpecs()在抽象类FileOutputFormat中有实现,用于验证 Job 指定的输出的正确性,出现问题时抛出异常由上层进行处理。 RecordWriterReduceTaskDirectMapOutputCollector(没有 reduce 任务的作业)中用到,将数据通过输出流一步步写出。

spill

spill 产生于 map 之后,也就是 collect 方法中。经过 map 计算后的中间结果会经由 partitioner 按 key 分成若干组(组数等于 reduce task 的个数),这时(k,v)会变成(k,v,partition)组成的三元组,传递给 MapOutputBuffer.collect() 做进一步处理。 在MapOutputBuffer中使用一个缓存区收集中间结果,在使用率到达一定阈值后排序并进行一次flush操作。这个过程由 MapOutputBuffer.sortAndSpill() 方法完成,每一个 spill 都是内部有序的,而且如果存在 combiner,在 spill 之前会执行 combine 操作进行数据压缩。 中间结果的产生和写入磁盘可以看成生产者-消费者模型,collect()方法是生产者,而write()为消费者。二者通过可重入互斥锁 ReentrantLock:spillLock 同步。 类SpillThread继承Thread 实现 run() 方法

      @Override
      public void run() {
        spillLock.lock();
        spillThreadRunning = true;
        try {
          while (true) {
            spillDone.signal();
            while (!spillInProgress) {
              spillReady.await();
            }
            try {
              spillLock.unlock();
              sortAndSpill();
            } catch (Throwable t) {
              sortSpillException = t;
            } finally {
              spillLock.lock();
              if (bufend < bufstart) {
                bufvoid = kvbuffer.length;
              }
              kvstart = kvend;
              bufstart = bufend;
              spillInProgress = false;
            }
          }
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        } finally {
          spillLock.unlock();
          spillThreadRunning = false;
        }
      }

从源码中可以看出,配合spillLock控制同步信号的还有两个Condition:spillDonespillReady 以及两个boolean:spillInProgressvolatile spillThreadRunning。 该线程在 MapOutputBuffer.init() 方法中被启动进入等待状态,每当collect执行的时候,都需要判断缓冲区是否达到阈值且 spill 线程是否正在运行,然后执行相关操作。 另外源码中还设计两组标志位(kvstart,kvend)和(bufstart,bufend),分别对应索引和缓冲区(详见下节),每次 spill 都会将 start 和 end 之间的内容写入磁盘。

缓冲区与索引

为保证读写操作的并行性,Hadoop 在这里设计了一种环形缓冲区。 如果采取读写公用的单向缓冲区,锁将使得读写操作中总有一个处于等待状态。另外一种做法是设置两个缓冲区,交替读写,这一定程度上缓解了并行问题,但还是会存在等待问题。所以 Hadoop 选择了环形缓冲区,不断向剩余空间内循环的写入数据。 另外,Hadoop 考虑到效率问题,设计了二级索引结构。所以如上的环形缓冲区共有三个,分别为 kvoffsets, kvindices 和 kvbuffer,共占用空间为 io.sort.mb(默认值100MB)。其关系如下图所示1

buffer 二级索引

  • kvoffsets: 偏移量索引数组,保存 key,value 的位置索引在 kvindices 中的偏移量
  • kvindices: 位置索引数组,保存 key,value 在 kvbuffer 中的起始位置
  • kvbuffer: 数据缓冲区,保存实际的 key,value 值

因为缓冲区共享100MB 内存,一个合理的比例成为一个问题。在 Hadoop 0.21中,为避免设置比例的不合理带来的性能问题,将这三个缓冲区进行了合并。通过一个指针 equator 标明索引和数据共同的起始位置,索引和数据分别沿相反方向增长使用内存空间,如下图所示1

buffer 合并环形缓冲区

shuffle

shuffle 过程是将 map task 的最终结果拷贝到 reduce task 所在的节点上的过程,在拷贝数据量达到一定阈值后出发合并线程对本地数据进行合并操作为 reduce 的运行做准备。 在ReduceTask我们提到接口ShuffleConsumerPlugin规定了 shuffle 过程中必要的方法,Hadoop 给出了他的一个实现类Shuffle

Shuffle类中最重要的方法run()返回一个RawKeyValueIterator,由后续操作再次封装后作为 reduce 操作的对象。 在run()方法中,首先实例化一个EventFetcher来处理 map-completion 事件,然后根据配置实例化若干 Fetcher并启动线程,对于 map 结果在本地磁盘中的任务,会实例化LocalFetcher。 最后不断通过Reporter输出进度,直到 ShuffleSchedulerwaitUntilDone() 判定 shuffle 完成,copy 阶段结束。

具体 copy 的步骤在Fetcher类中实现,copyFromHost() 从指定MapHost中 shuffle 可用的 map 结果,将该 Host 中的可用结果拼接返回一个 URL并打开输入流。 之后开始执行 copyMapOutput() 函数拷贝数据。 对于拷贝失败或没有响应的任务,通过异常做重新调度等处理。copyMapOutput() 方法将通过各种工具类实施 copy 过程,并进行 merge 操作。


  1. 参考自《Hadoop 技术内幕:深入解析 MapReduce架构设计与实现原理》 董西成 著  2 3

基本介绍

NFS (Network File System)是一种分布式文件系统,可以在客户端将服务器的某一目录挂载,像访问本地文件一样访问服务器文件。

  • 服务器:192.168.0.1
  • 客户端:192.168.0.101

软件安装

服务器:

在服务器上安装 nfs-kernel-server 执行以下命令:

    sudo apt-get update
    sudo apt-get install nfs-kernel-server

客户端

在客户端上安装 nfs-common

    sudo apt-get update
    sudo apt-get install nfs-common

在服务器设置共享目录

  1. 在服务器上创建共享目录
        sudo mkdir /var/nfs
    
  2. 修改该目录拥有者和用户组
        sudo chown nobody:nogroup /var/nfs
    
  3. 修改配置文件 配置文件在 /etc/exports 内有示例。 比如:我们挂载/home以及/var/nfs 供IP地址为192.168.1.101的客户端访问

    /var/nfs 192.168.1.101(rw,sync,no_subtree_check)

    其中参数含义如下:

    • rw: 允许客户端对该目录读和写。
    • sync: 强制NFS 在返回消息之前向磁盘中写入文件。 通常情况下,该参数给环境带来更高稳定性。
    • nosubtreecheck: 在每次请求之前都查看当前用户是否有权限对其子目录进行操作,带来更高安全性。
    • norootsquash: 关闭客户端对该目录的root权限。
  4. 创建NFS表项 状态并启动服务
        sudo exportfs -a
        sudo service nfs-kernel-server start
    

    在客户端设置挂载点

  5. 在mnt目录下创建挂载点
       sudo mkdir -p /mnt/nfs/var/nfs
    
  6. 挂载服务器目录
       sudo mount 192.168.1.1:/var/nfs /mnt/nfs/var/nfs
    
  7. 查看挂载目录
        df -h
        mount -t nfs
    

    我们可以在最后看到刚挂载的目录。

    设置自动挂载目录

    /etc/fstab 中写入配置项

    192.168.1.1:/var/nfs /mnt/nfs/var/nfs nfs auto,noatime,nolock,bg,nfsvers=4,sec=krb5p,intr,tcp,actimeo=1800 0 0