相信用 Java 写过稍微大型点儿的 MapReduce 程序的朋友都有体会,MapReduce 程序调试起来并不方便。粗略想了一下,原因大概有以下两点:

  • MapReduce 的处理思路增加了数据流的环节。

    按照 MapReduce 的思想,业务逻辑在实现时,被人为地拆分成了 map 和 reduce 两个不同的阶段,两个阶段需要通过 key/value 对来传递数据流。更有甚者,在业务逻辑比较复杂时,一次 MapReduce 可能还不足以解决问题,需要多个回合的 MapReduce。

  • 分布式程序,记录的数据流向是不确定的。

    对于某一条记录而言,它在 map 和 reduce 阶段,很有可能是在不同的节点上计算的。

简而言之,我们不能像调试单机程序那样来方便地调试 MapReduce 程序。调试单机程序时,我们可以单步跟踪、设置断点,还有各式各样功能的工具(strace, ltrace, ldd, ldconfig, Valgrind …)。但是对于 MapReduce 程序,这些调试方法/工具通通不奏效。

那么,调试 MapReduce 程序是不是完全无迹可循呢?也不尽然。下面,我就来谈谈自己调试 MapReduce 程序的一点儿心得。

 

跟踪 stdout/stderr/syslog 的输出

  1. 读 stdout/stderr/syslog。

    当一个 MapReduce 程序运行出错时,除了在终端窗口有错误提示信息以外,更详细的错误信息会被记录在对应 task attempt 所在节点的 stdout/stderr/syslog 日志上。

    通过访问 job 的 tracking URL,我们可以看到该 job 的详细执行情况,其中包括各个 map/reduce task attempt 的详细信息(开始/结束时间、在哪台节点上被计算、stdout/stderr/syslog 日志信息)。当然,我们这里更关注那些 failed/killed map/reduce task attempt 的日志。

    无论是正在运行的还是已结束的 MapReduce 程序,都可以看到 job 的 task attempt 日志。根据日志信息,往往可以看出 job 失败的一些端倪。

  2. 写 stdout/stderr/syslog。

    对于用 Java 写的 MapReduce 程序来说,System.out.println() 的输出被写到 stdout 文件,System.err.println() 的输出被写到 stderr 文件。

    我们也可以根据需要,将调试信息写到 stdout/stderr/syslog 中,作用相当于记日志。

    当然,这种方式也有弊端:

    • 调试信息比较分散,查看不便。

      调试信息分散在不同的 task attempt 日志里,在 web 浏览器里可能需要打开多个 tab 查看。

    • 如果某个 stdout/stderr 文件过大,在 web 浏览器里可能无法打开日志。

      如果有登录该 task attempt 所在节点的权限,可以 SSH 登录后,用 less/Vim 查看;当然,也可以适当减少调试信息,尽量避免这种情形的出现。

 

用 task side-effect files 特性记日志

MapReduce 提供了 task side-effect files 特性,用于 map/reduce 端除了标准的 context.write(key, vlaue) 方式以外的输出。

这里,我们可以利用这个特性记录调试信息。大致流程如下(对 mapper/reducer 都适用):

  1. setup() 方法里,得到 FSDataOutputStream 对象。

    可以参考我常用的用于获取 FSDataOutputStream 对象的方法 getOutputStream()

    static private FSDataOutputStream getOutputStream(TaskInputOutputContext context, String file_prefix, int flag)
    		throws IOException, InterruptedException {
    	String[] taskID = context.getConfiguration().get("mapreduce.task.attempt.id").split("_");
    	Path work_path;
    	if (0 == flag) {		// mapper-side
    		work_path = FileOutputFormat.getWorkOutputPath(context).suffix(file_prefix + taskID[3] + "-" + taskID[4]);
    	}
    	else {					// reducer-side
    		work_path = new Path(file_prefix + taskID[3] + "-" + taskID[4]);
    	}
    	FileSystem fs = work_path.getFileSystem(context.getConfiguration());
    	return fs.create(work_path);
    }
  2. map()/reduce() 方法里,用 FSDataOutputStream.writeBytes() 方法写调试信息。

    这里需要注意的是,不要用 FSDataOutputStream.writeUTF() 方法写。具体原因,可以参考我之前的一篇博客:MapReduce FSDataOutputStream 写数据有误

  3. cleanup() 方法里,用 FSDataOutputStream.close() 关闭输出流

 

参考链接:

Leave a Reply

Your email address will not be published. Required fields are marked *