在写 MapReduce 程序时,mapper/reducer 与进程在概念上的对应关系是这样的:

一个 mapper/reducer,对应一个 map/reduce task,对应一个 JVM,对应一个进程。也就是说,在一个 JVM 进程里只能运行一个 mapper/reducer。

其中,mapper 类总是必不可少的。实际上,MapReduce API 还提供了另外一种 mapper:MultithreadedMapper。它的特点是,可以在一个 JVM 进程里以多线程的方式同时运行多个 mapper。其中,每个线程实例化一个 mapper 对象,各个线程并发执行。

Tom White 的 Hadoop: The Definitive Guide 一书中,对 MultithreadedMapper 是这么描述的:

MultithreadedMapper is an implementation that runs mappers concurrently in a configurable number of threads (set by mapreduce.mapper.multithreadedmapper.threads). For most data processing tasks, it confers no advantage over the default implementation. However, for mappers that spend a long time processing each record—because they contact external servers, for example—it allows multiple mappers to run in one JVM with little contention.

 

接下来,再介绍下程序中应该如何使用 MultithreadedMapper 这一特性。

有两个要点:

  1. 在 MapReduce 的 driver 类中指定启用 MultithreadedMapper

    具体有两种方法(假如 mapper 类名称为 MultiMapper):

    1. job.setMapperClass(MultithreadedMapper.class);
      conf.set("mapred.map.multithreadedrunner.class", MultiMapper.class.getCanonicalName());
      conf.set("mapred.map.multithreadedrunner.threads", "8");	// 设置并发执行的线程数
    2. job.setMapperClass(MultithreadedMapper.class);
      MultithreadedMapper.setMapperClass(job, MultiMapper.class);
      MultithreadedMapper.setNumberOfThreads(job, 8);		// 设置并发执行的线程数

       

  2. 保证 MapReduce 的 mapper 类是线程安全的

    要确保这一点,mapper 类的成员和方法应该尽量为 non-static 的。

    在某些情形下,成员/方法也可以为 static。我粗略想了一下,大概有以下这些情形:

    • member read-only. 即成员是只读不写的。
    • method 的作用相当于一个独立的工具,没有显式操作 non-static member。

    当然,如何保证一个类是线程安全的,该主题属于 Java OOP 的内容范畴。这里不再赘述了。

 

那么,MultithreadedMapper 适宜的应用场景呢?

MultithreadedMapper 适用于 CPU-bound 的情形,而不适用于 I/O-bound 的情形。原因就在于,MultithreadedMapper 要保证 mapper 类线程安全,就需要做线程间的互斥与同步,而这些操作对于 I/O 的效率影响比较大。

在我个人的实际使用经历中,由于对 I/O-bound 型的作业使用了 MultithreadedMapper,导致 MapReduce 程序性能不升反降。不得已,最终换回了传统的 Mapper 类。:-(

 

参考链接:

Leave a Reply

Your email address will not be published. Required fields are marked *