在写 MapReduce 程序时,mapper/reducer 与进程在概念上的对应关系是这样的:
一个 mapper/reducer,对应一个 map/reduce task,对应一个 JVM,对应一个进程。也就是说,在一个 JVM 进程里只能运行一个 mapper/reducer。
其中,mapper 类总是必不可少的。实际上,MapReduce API 还提供了另外一种 mapper:MultithreadedMapper
。它的特点是,可以在一个 JVM 进程里以多线程的方式同时运行多个 mapper。其中,每个线程实例化一个 mapper 对象,各个线程并发执行。
Tom White 的 Hadoop: The Definitive Guide 一书中,对 MultithreadedMapper
是这么描述的:
MultithreadedMapper is an implementation that runs mappers concurrently in a configurable number of threads (set by mapreduce.mapper.multithreadedmapper.threads). For most data processing tasks, it confers no advantage over the default implementation. However, for mappers that spend a long time processing each record—because they contact external servers, for example—it allows multiple mappers to run in one JVM with little contention.
接下来,再介绍下程序中应该如何使用 MultithreadedMapper
这一特性。
有两个要点:
- 在 MapReduce 的 driver 类中指定启用
MultithreadedMapper
具体有两种方法(假如 mapper 类名称为
MultiMapper
):-
job.setMapperClass(MultithreadedMapper.class); conf.set("mapred.map.multithreadedrunner.class", MultiMapper.class.getCanonicalName()); conf.set("mapred.map.multithreadedrunner.threads", "8"); // 设置并发执行的线程数
-
job.setMapperClass(MultithreadedMapper.class); MultithreadedMapper.setMapperClass(job, MultiMapper.class); MultithreadedMapper.setNumberOfThreads(job, 8); // 设置并发执行的线程数
-
- 保证 MapReduce 的 mapper 类是线程安全的
要确保这一点,mapper 类的成员和方法应该尽量为 non-static 的。
在某些情形下,成员/方法也可以为 static。我粗略想了一下,大概有以下这些情形:
- member read-only. 即成员是只读不写的。
- method 的作用相当于一个独立的工具,没有显式操作 non-static member。
当然,如何保证一个类是线程安全的,该主题属于 Java OOP 的内容范畴。这里不再赘述了。
那么,MultithreadedMapper
适宜的应用场景呢?
MultithreadedMapper
适用于 CPU-bound 的情形,而不适用于 I/O-bound 的情形。原因就在于,MultithreadedMapper
要保证 mapper 类线程安全,就需要做线程间的互斥与同步,而这些操作对于 I/O 的效率影响比较大。
在我个人的实际使用经历中,由于对 I/O-bound 型的作业使用了 MultithreadedMapper
,导致 MapReduce 程序性能不升反降。不得已,最终换回了传统的 Mapper
类。:-(
参考链接: