相信用 Java 写过稍微大型点儿的 MapReduce 程序的朋友都有体会,MapReduce 程序调试起来并不方便。粗略想了一下,原因大概有以下两点:
- MapReduce 的处理思路增加了数据流的环节。
按照 MapReduce 的思想,业务逻辑在实现时,被人为地拆分成了 map 和 reduce 两个不同的阶段,两个阶段需要通过 key/value 对来传递数据流。更有甚者,在业务逻辑比较复杂时,一次 MapReduce 可能还不足以解决问题,需要多个回合的 MapReduce。
- 分布式程序,记录的数据流向是不确定的。
对于某一条记录而言,它在 map 和 reduce 阶段,很有可能是在不同的节点上计算的。
简而言之,我们不能像调试单机程序那样来方便地调试 MapReduce 程序。调试单机程序时,我们可以单步跟踪、设置断点,还有各式各样功能的工具(strace, ltrace, ldd, ldconfig, Valgrind …)。但是对于 MapReduce 程序,这些调试方法/工具通通不奏效。
那么,调试 MapReduce 程序是不是完全无迹可循呢?也不尽然。下面,我就来谈谈自己调试 MapReduce 程序的一点儿心得。
跟踪 stdout/stderr/syslog 的输出
- 读 stdout/stderr/syslog。
当一个 MapReduce 程序运行出错时,除了在终端窗口有错误提示信息以外,更详细的错误信息会被记录在对应 task attempt 所在节点的 stdout/stderr/syslog 日志上。
通过访问 job 的 tracking URL,我们可以看到该 job 的详细执行情况,其中包括各个 map/reduce task attempt 的详细信息(开始/结束时间、在哪台节点上被计算、stdout/stderr/syslog 日志信息)。当然,我们这里更关注那些 failed/killed map/reduce task attempt 的日志。
无论是正在运行的还是已结束的 MapReduce 程序,都可以看到 job 的 task attempt 日志。根据日志信息,往往可以看出 job 失败的一些端倪。
- 写 stdout/stderr/syslog。
对于用 Java 写的 MapReduce 程序来说,
System.out.println()
的输出被写到stdout
文件,System.err.println()
的输出被写到stderr
文件。我们也可以根据需要,将调试信息写到 stdout/stderr/syslog 中,作用相当于记日志。
当然,这种方式也有弊端:
- 调试信息比较分散,查看不便。
调试信息分散在不同的 task attempt 日志里,在 web 浏览器里可能需要打开多个 tab 查看。
- 如果某个 stdout/stderr 文件过大,在 web 浏览器里可能无法打开日志。
如果有登录该 task attempt 所在节点的权限,可以 SSH 登录后,用 less/Vim 查看;当然,也可以适当减少调试信息,尽量避免这种情形的出现。
- 调试信息比较分散,查看不便。
用 task side-effect files 特性记日志
MapReduce 提供了 task side-effect files 特性,用于 map/reduce 端除了标准的 context.write(key, vlaue)
方式以外的输出。
这里,我们可以利用这个特性记录调试信息。大致流程如下(对 mapper/reducer 都适用):
- 在
setup()
方法里,得到FSDataOutputStream
对象。可以参考我常用的用于获取
FSDataOutputStream
对象的方法getOutputStream()
:static private FSDataOutputStream getOutputStream(TaskInputOutputContext context, String file_prefix, int flag) throws IOException, InterruptedException { String[] taskID = context.getConfiguration().get("mapreduce.task.attempt.id").split("_"); Path work_path; if (0 == flag) { // mapper-side work_path = FileOutputFormat.getWorkOutputPath(context).suffix(file_prefix + taskID[3] + "-" + taskID[4]); } else { // reducer-side work_path = new Path(file_prefix + taskID[3] + "-" + taskID[4]); } FileSystem fs = work_path.getFileSystem(context.getConfiguration()); return fs.create(work_path); }
- 在
map()
/reduce()
方法里,用FSDataOutputStream.writeBytes()
方法写调试信息。这里需要注意的是,不要用
FSDataOutputStream.writeUTF()
方法写。具体原因,可以参考我之前的一篇博客:MapReduce FSDataOutputStream 写数据有误。 - 在
cleanup()
方法里,用FSDataOutputStream.close()
关闭输出流
参考链接: