Hadoop中的事件总线

之前的文章Hadoop状态机介绍提到整个Hadoop的调度由状态机来控制,而驱动状态机变迁的则是事件。 这里给出Hadoop中几个类的层级结构图,内容较多,该图可由Intellij idea导出。 Hadoop层次结构 从图中我们可以看到,实现AbstractService类有三种,其一为CompositeService,它的子类包括了之前提到的RM,NM以及AM等,这些子类都有main()方法,也就是说,在运行Hadoop的过程中,他们各自独立为一个进程; 其二为AbstractYarnScheduler,这个类完全属于YARN模块内,这里不进行介绍。最后一个则是所有事件的总线,AsyncDispatcher。Hadoop运行过程中所有的事件都将被发布到该类的一个阻塞队列当中,再由他做一次分发。 在这个类中有几个重要的内容:

  • eventDispatchers: Map类型的成员变量,其Key为eventType,是一个枚举类型,value为处理该eventType的EventHandler。关于EventHandler,我们从图中可以看到,大量的类都实现了该接口, 其中既包括TaskImpl这样封装了状态机的类,也包括JobEventDispatcher这样只做分发并不做其他处理的分发器类。其实Dispatch这样的操作本身也属于处理事件的一种方式,起名为Dispatcher更能表达该类本身的行为。 所有实现EventHandler的类通过AsyncDispatcher:register()将自身注册到eventDispatchers之中。
  • GenericEventHandler: 他是一个内部类,实现了EventHandler,在handle()方法中把收到的时间put到eventQueue中。
  • dispatch(): dispatch()方法从队列中拿一个事件,到eventDispatchers中寻找对应的handler,调用其handle()方法。这里值得注意的是,在这个方法中捕获到异常时,他会新开启一个shutdownThread关闭AsyncDispatcher服务。

在开发过程中,我们可能需要对事件处理部分进行断点调试,在这个类中加断点将阻塞所有消息,包括RM以及NM本身运行所发的事件。所以更好的方式是找到与开发内容相关的事件分发器,避免AsyncDispatcher中的大量事件的干扰。

MRAppMaster中的事件处理

之前提到 MRAppMaster 作为一个单独的进程,有它的main方法。在main方法中除了设置一系列相关参数以及关闭时使用的钩子函数外,调用了静态方法initAndStartAppMaster()。在这个方法中进行用户身份的验证等操作,最后以用户身份初始化并运行 MRAppMaster。

从之前的层次结构图中可以看到,MRAppMaster也是AbstractService的实现类,他重写了serviceInit(final Configuration conf)方法,首先根据入参conf设置了一系列上下文环境以及成员变量,然后调用createDispatcher()实例化了一个AsyncDispatcher。 但是,在这里并没有单例模式出现,同时在ResourceManager 等AbstractService 的子类中也有这样的实例化方法。这与我所设想的有些不同,如果按照代码中所示,将有多个AsyncDispatcher的实例出现,而且其中的消息队列以及存储handler与对应事件的Map也不是静态变量。 也就意味着在不同的服务中所使用的事件队列是不一样的,这一点需要再在运行时考证。

MRAppMaster 本身没有实现Handler接口,但他有几个实现Handler接口的内部类,在serviceInit()方法中调用AsyncDispatcher:register分别把他们和相关的事件进行绑定。这样一来,Job、Task、TaskAttempt 等事件就都会由对应的Handler (JobImpl,TaskImpl,TaskAttemptImpl)来进行处理。在这些实现了hanlder接口的impl内部有各自的状态机,经 MRAppMaster 分发之后的状态将驱动这些状态机进行变迁,同时不断产生新的事件直到状态机运行到终态。

Hadoop MapReduce 状态机类结构

之前的文章YARN 流程分析中提到 MR v2 之后对流程的控制都以 Actor Model 的形式进行(Actor Model (参与者模式)思想与实例)。 Hadoop MapReduce 中的状态机被封装hadoop-mapreduce-client-appmodule 的几个类中,其目录结构如下:

org.apache.hadoop.mapreduce.v2.app.job event

JobEvent
JobEventType
JobFinishEvent
JobStartEvent

impl

JobImpl
MapTaskImpl
ReduceTaskImpl
TaskImpl
TaskAttemptImpl

Job
JobStateInternal
Task
TaskAttempt
TaskStateInternal

其中 StateInternal 是一个枚举类型,其中封装了各状态机的所有状态,接口中规定了一些 get 方法,用于读取作业或任务的状态信息。 event 包中包含了这里用到的事件,正是这些事件被 handle 或 dispatch 导致了状态机变迁的发生,而变迁发生又会根据需要产生新的事件推动整个系统的进行。 impl 包中则为创建这些状态机的类,每个类中包含大量实现SingleArcTransitionMultipleArcTransition接口的内部类,这些内部类分别代表着一个变迁。 接口中只规定了一个方法

  public STATE transition(OPERAND operand, EVENT event);

任何变迁都要实现这个方法,保证在接收到某事件后可做出一系列操作并进入下一状态。 状态机类都包含一个静态变量stateMachineFactory用于将状态与变迁连接到一起,形成状态机。 StateMachineFactory是 YARN 中一个不可变类,用final修饰,它提供了一个方法addTransition()

  public StateMachineFactory
             <OPERAND, STATE, EVENTTYPE, EVENT>
          addTransition(STATE preState, STATE postState,
                        EVENTTYPE eventType,
                        SingleArcTransition<OPERAND, EVENT> hook){
    return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>
        (this, new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>
           (preState, eventType, new SingleInternalArc(postState, hook)));
  }

可以看出,这个方法采用链式编程的原则,所以在初始化stateMachineFactory的时候,可以不断调用该方法向状态机中加入新的变迁。 该方法另有4个重载方法,其一与之类似,将后置状态改为一个Set,SingleArcTransition 改为 multipleArcTransition,用于根据事件中的具体情况判断变迁如何进行。 另两个取消 hook 参数,用于添加可忽略的事件,最后一个重载方法将返回一个新的状态机。 所有变迁添加完成后调用installTopology()完成状态机的创建。

状态机及变迁分析

因官方提供下载的 FSM 并没有随着版本更新,在下面我对 Hadoop 2.7.3 中的状态机进行了绘制。下图中考虑到可见性有部分省略,需要确认变迁请参照上章节内容到源代码中查看。 图中绿色线代表正常流程,红色线代表失效,黄色线代表关闭。 虚线表现了一部分 multipleArcTransition,也就是在收到某事件时,将进一步判断事件内部信息来决定状态机变迁如何产生。 驱动整个状态机的入口在handle()方法中,首先将写操作上锁,然后获取当前状态,然后执行doTransition()。在该方法中捕获到InvalidStateTransitionException时, 首先更新状态,然后生成InternalError事件,状态机计入Error状态。

JobImpl

状态机

JobImpl 中创建的状态机如下图所示,为了清晰度省略了一部分变迁。 Job 状态机 Hadoop提供了作业级别的容错能力,图中紫色线将重启作业,而灰色线则代表发生InternalError时进入Error状态。

变迁(待填坑)

TaskImpl

状态机

Task 状态机

变迁

  • InitialScheduleTransition: 最初的变迁,首先调用 addAndScheduleAttempt()方法创建一个 TaskAttempt,该方法根据传入参数的不同发送 TA_RESCHEDULE 或 TA_SCHEDULE 事件。 此外还有一个枚举类型表明该task attempt 是 VIRGIN 还是 SPECULATIVE。最后调用 sendTaskStartedEvent 创建一个 JobHistoryEvent
  • LaunchTransition: 本变迁中只调用 MRAPPMitrics 中的方法对 task 运行进度进行衡量。变迁结束后状态机进入 RUNNING 状态。
  • AttemptSucceedTransition: 状态机通过本变迁从 RUNNING 状态变为 SUCCEED 状态。变迁中首先更新 task 中各类型 attempt 的值,然后发送TaskFinishedEvent。之后发送TaskAttemptKillEvent将该 task 的其他 attempt kill 掉。最后调用finished() 修改 MRAPPMitrics
  • AttemptCommitPendingTransition: 在给出 task 的输出之前需要通过此变迁,对 commit 输出的一次确认。
  • AttemptKilledTransition: 在收到 ATTEMPT_KILLED 事件之后触发本变迁,除设置相关变量之外,对于 TaskAttemptKilledEvent 还需要创建一个 rescheduleNewAttempt,调用addAndScheduleAttempt()方法发送事件。有两个变迁KillWaitAttemptSucceedTransition, KillWaitAttemptFailedTransition 分别继承自本变迁并返回不同的 Attempt 完成状态。
  • AttemptFailedTransition: 状态机通过此变迁进入 FAILED,同样设置相关变量。 对于重试次数小于最大重试限度的 task 将进行 reschedule,返回当前状态。而大于最大重试次数的 task 将当前未完成的属于该 Task 的 Attempt kill 掉,创建 TaskFailedEvent 后调用 finished()方法,结束 task 的运行。

    TaskAttemptImpl

    状态机

    TaskAttempt 状态机

    变迁

  • RequestContainerTransition: 在状态机拓扑建立完成后,状态机处于NEW。此时收到 SCHEDULE 或 RESCHEDULE 事件后将触发本变迁,若是为发生故障的 container 重新申请资源则调用 ContainerRequest 中的一个静态方法,该方法中对资源的申请不再考虑到本地性的问题。 而首次申请资源则将 dataLocalHosts 以及 dataLocalRacks 作为参数传入 ContainerRequest的构造方法中。状态机进入UNASSIGNED。
  • ContainerAssignedTransition: UNASSIGNED 下收到 ASSIGNED 触发本变迁,这时对 TaskAttemptImpl中的成员变量赋值。其中包括由事件传入的参数 container, 通过子类重新的方法createRemoteTask()的返回值得到的 remoteTask, 将以上两者组合封装的 JvmID。最后将 remoteTaskJvmID 作为参数向taskAttemptListener中注册。 此外发送两个事件:ContainerRemoteLaunchEventSpeculatorEvent。 变量remoteTask实际上是一个 Task的实例,也就是一个 Map 或 Reduce 任务运行的实际类。在注释中可以看出这个引用采用懒加载方式, 一旦任务开始运行后将这个引用设置为空,大概占用 30KB的内存。变迁结束后状态机进入 ASSIGNED。
  • LaunchedContainerTransition: ASSIGNED 下收到 CONTAINER_LAUNCHED 触发本变迁,首先同样是对成员变量进行赋值,并且向 TaskAttemptListener 注册开始监听事件。通过NetUtils:createSocketAddr 创建通信地址并赋值给相应成员变量。在这里将以后不再使用的 remoteTask 设置为 null 以释放内存。最后创建 ATTEMPT_LAUNCHED 事件。
  • CommitPendingTransition: 对于需要 commit 的任务,收到 TA_COMMIT_PENDING后执行本变迁,这里只进行了一个操作,向 eventHandler 中发送了 T_ATTEMPT_COMMIT_PENDING 事件,由 TaskImpl 进行处理。
  • CleanupContainerTransition: 处于 RUNNING 状态或 COMMIT_PENDING 下收到 DONE 消息时执行本变迁。首先向 TaskAttemptListener 注销以停止对本 Attempt 的监听,此变迁还有可能执行与收到 KILL 事件时,这时要设置 reschedule 标记。
  • SucceedTransition: 处于 SUCCESS_CONTAINER_CLEANUP 状态下收到 TA_CONTAINER_CLEANED 事件后将触发本变迁。 设置完成时间,发送与完成相关的几个事件。
  • DeallocateContainerTransition: 收到 TA_KILL和 TA_FAILMSG 时,本变迁被执行。首先设置该任务的完成时间,对于 TaskAttemptKillEvent 来说还要通过 getMessage() 方法获取诊断信息。然后发出CONTAINER_DEALLOATE事件。若从 UNASSIGNED 状态转移而来,还要撤回需求。然后判断终态为 FAILED 还是 KILLED分别发出对应事件。此外还需要一个TaskAttemptUnsuccessultCompletionEvent来封装终态以及 taskAttempt
  • TaskCleanupTransition: 处于 CONTAINER_CLEANUP 状态下时通过此变迁进入 TASK_CLEANUP
  • KilledTransition: 被 Kill 的任务的终态,封装相关信息发出TaskTAttemptKillEvent
  • FailedTransition: FAIL 的任务的终态,封装相关信息发出T_ATTEMPT_FAILED
  • TooMAnyFetchFailureTransition: 对于处于 SUCCEED 状态下的 Map 任务,可能出现该变迁,经过封装信息后发出 T_ATTEMPT_FAILED 事件,任务进入 FAILD 状态。

Actor Model 设计思想

Actor Model(参与者模型)简化并发而产生,它将系统中的所有组件都视为 Actor(参与者),参与者之间通过发送消息来实现交互。每个 Actor 同时只能处理一个消息,也可以发送消息给其他参与者,而大量的参与者同时存在则构成了并行化。 Actor Model 中的消息为异步非阻塞式的消息传递,也就是说消息发送之后可以进行其他操作,当接收到接收方发送的返回消息时可以进行后续操作。因为 Actor 之间只能通过消息进行交互,也就是说不存在共享内存空间,这就省去了锁的操作。 Actor 通过隔离控制和计算实体来实现封装,相比使用锁来进行同步控制而言,简化了并行的实现难度。 参与者模式在1973年由 Carl Hewitt 等人发表的论文中提出1,后来这种思想被应用于面向对象的并发编程语言中,其中最有名的当属 Erlang 以及 Scala。Apache 的 Akka 是实现了 Actor Model 的 library,支持 Java 以及 Scala。 在 Hadoop MapReduce v2 之后的版本中,Actor Model 就被用于其各个松耦合组件之间的通信。每个组件都维护其自己的状态机,由事件驱动状态机的变迁,在变迁发生时执行相关逻辑操作。 在通常的思路中,我们用线程来实现并发,用锁来控制共享内存中的同步互斥问题。然而,线程本身属于相对昂贵的资源,对于程序员的水平要求相对较高。对线程的不当使用将容易导致死锁问题的产生,而且大量线程的等待也对系统资源是一种浪费。 在 Actor Model 中使用基于事件驱动的编程模型,这种异步非阻塞的变成方式天生支持并发。且不需要涉及到对线程和锁的控制,可以使程序员关注于逻辑本身,降低了实现并发的难度。 不过,使用 Actor Model 进行编程需要思路上的转变,从顺序执行的程序转为事件驱动的模型可能会在一定程度上让程序本身更难以理解。 必须提到的是,对并行的控制(也就是同步)一定存在局部串行化。Actor Model 本身是一种抽象模型,对于使用者而言可以做到不使用锁,而锁将存在于更底层的位置来保证对并发的控制。 比如 Erlang 和 akka 对上层提供的 API 并不需要锁,而其复杂的并发控制分别将锁实现在编程语言级别和框架级别。也就是说 Actor Model 和线程不是同一个抽象级别的问题,Actor Model 提供了更高的抽象级别。

同步问题编程实例(哲学家就餐问题)

这里对哲学家就餐问题分别使用锁的机制以及 Actor Model 进行实现,以此表现 Actor Model 的抽象能力。 这里参考文章 Locks, Actors, And Stm In Pictures2,原文中详细介绍了Lock, Actor 再加上 Transaction 共三种方案在思想上的不同,并用卡通图进行辅助说明,在这里不做翻译只进行关键部分的转述。 Actor Model 的实现使用框架celluloid3,算法采用 Ruby 实现。

哲学家就餐问题描述

1965年,Dijkstra提出并解决了一个他称之为哲学家就餐的同步问题。该问题可简单描述为:5个哲学家围坐在一个圆桌上,每两个哲学家之间都有一只筷子,哲学家平时进行思考,只有当他们饥饿时,才拿起筷子吃饭。只有同时获得左右两只筷子他们才可以开始吃饭,吃饭后放下筷子继续思考。 关键问题是,是否可以给出一段描述其行为的程序且保证不会发生死锁。 这里使用带有 Waiter 的算法。

Locks

require 'thread'

class Waiter
  def initialize
    @mutex = Mutex.new
  end

  def can_eat? philosopher
    left = philosopher.left_fork
    right = philosopher.right_fork
    @mutex.synchronize do
      if !left.used && !right.used
        left.used = true
        right.used = true
        return true
      else
        return false
      end
    end
  end

  def stop_eating philosopher
    @mutex.synchronize do
      philosopher.left_fork.used = false
      philosopher.right_fork.used = false
    end
  end
end

class Philosopher
  attr_accessor :left_fork, :right_fork

  # 创建哲学家时指定其对应左右筷子,开始为思考状态
  def initialize(name, left_fork, right_fork, waiter)
    @name = name
    @left_fork = left_fork
    @right_fork = right_fork
    @waiter = waiter
    think
  end

  # 思考一定时间后处于饥饿状态,试图进食
  def think
    puts "Philosopher #@name is thinking..."
    sleep(rand())
    puts "Philosopher #@name is hungry..."
    dine
  end

  # 等待判断是否可以进食,进食完成后继续思考
  def dine
    while !@waiter.can_eat?(self)
      think
    end
    puts "Philosopher #@name eats..."
    sleep(rand())
    puts "Philosopher #@name belches"
    @waiter.stop_eating(self)
    think
  end
end

n = 5

forks = []

class Fork
  attr_accessor :used
  def initialize
    @used = false
  end
end

(1..n).each do |i|
  forks << Fork.new
end

threads = []
waiter = Waiter.new

# 为每个哲学家创建线程,为其分配筷子(圆桌)
(1..n).each do |i|
  threads << Thread.new do
    if i < n
      left_fork = forks[i]
      right_fork = forks[i+1]
    else
      # special case for philosopher #5 because he gets forks #5 and #1
      # and the left fork is always the lower id because that's the one we try first.
      left_fork = forks[0]
      right_fork = forks[n]
    end
    Philosopher.new(i, left_fork, right_fork, waiter)
  end
end

# 迭代启动线程
threads.each {|thread| thread.join}

Actors

require 'rubygems'
require 'celluloid'

class Waiter
  include Celluloid
  # 设置标记表示筷子可用状态
  FORK_FREE = 0
  FORK_USED = 1
  attr_reader :philosophers
  attr_reader :forks
  attr_reader :eating

  def initialize(forks)
    @philosophers = []
    @eating = []
    @forks = Array.new(forks, FORK_FREE)
  end

  def welcome(philosopher)
    @philosophers << philosopher
    # 异步发送事件,使哲学家开始思考
    philosopher.async.think
  end

  def hungry(philosopher)
    pos = @philosophers.index(philosopher)

    left_pos = pos
    right_pos = (pos + 1) % @forks.size
    # 判断哲学家左右两侧筷子是否可用
    if @forks[left_pos] == FORK_FREE && @forks[right_pos] == FORK_FREE
      @forks[left_pos] = FORK_USED
      @forks[right_pos] = FORK_USED
      # 当前哲学家加入进食队列并发送事件
      @eating << philosopher
      philosopher.async.eat
    else
      # 暂时不能进食,继续思考
      philosopher.async.think
    end
  end

  # 使某哲学家进食后释放筷子资源
  def drop_forks(philosopher)
    pos = @philosophers.index(philosopher)
    left_pos = pos
    right_pos = (pos + 1) % @forks.size
    @forks[left_pos] = FORK_FREE
    @forks[right_pos] = FORK_FREE
    @eating -= [philosopher]
    philosopher.async.think
  end
end

class Philosopher
  include Celluloid
  attr_reader :name
  attr_reader :waiter

  def initialize(name, waiter)
    @name = name
    @waiter = waiter
    # 向 Waiter 注册自身
    waiter.async.welcome(Actor.current)
  end

  def think
    puts "#{name} is thinking"
    sleep(rand)
    puts "#{name} gets hungry"
    waiter.async.hungry(Actor.current)
  end

  def eat
    puts "#{name} is eating"
    sleep(rand)
    puts "#{name} burps"
    waiter.async.drop_forks(Actor.current)
  end
end

names = %w{Heraclitus Aristotle Epictetus Schopenhauer Popper}
waiter = Waiter.new(names.size)
# 实例化所有哲学家,initialize方法中发送事件, 整个程序将被驱动开始运行
philosophers = names.map {|name| Philosopher.new(name, waiter)}
# 程序运行结束,进程休眠
sleep
  1. Carl Hewitt; Peter Bishop and Richard Steiger. A Universal Modular Actor Formalism for Artificial Intelligence. IJCAI. 1973 

  2. http://adit.io/posts/2013-05-15-Locks,-Actors,-And-STM-In-Pictures.html 

  3. https://github.com/celluloid/celluloid