在写 Hadoop 程序时,有时侯需要从文件名中提取某些信息,比如说文件名中可能包含有时间戳。下面,就针对这种需求,谈谈在 Hadoop 程序中该怎么做。
在用 Java 写的 MapReduce 程序中
MapReduce 的 mapper 类的 map()
方法的原型是这样的:
protected void map(KEYIN key, VALUEIN value, Mapper.Context context) throws java.io.IOException, InterruptedException
获取 input split 所在的文件名,需要从 context
参数入手。废话少说,直接上代码:
// 获取 input split 所在的文件名 private String getFileName(MapContext context) { return ((FileSplit) context.getInputSplit()).getPath().getName(); }
代码流程大概是:Context → InputSplit → FileSplit → Path → String(file name)。
mapper 类采用不同的 input format class,getInputSplit()
的结果差异:
- 对于
TextInputFormat
,可以保证每个 mapper 的数据来自同一个文件,同一 mapper 下用getInputSplit()
获取的文件名相同 - 对于
CombineFileInputFormat
,无法保证每个 mapper 的数据来自同一个文件,同一 mapper 下用getInputSplit()
获取的文件名可能不同
其实,如果 input format 是 CombineFileInputFormat
,那么完全没有必要用 getInputSplit()
来获取文件名。因为,此时 mapper 类的 key 就是对应的文件名。
在 Hadoop Streaming 程序中
对于 Hadoop Streaming 程序,同样有办法获取相应的文件名。方法是读取 mapreduce_map_input_file
或 map_input_file
环境变量的值。其中,mapreduce_map_input_file
是较新的 API 支持的用法,map_input_file
是旧的 API 用法,建议采用 mapreduce_map_input_file
。注意,这两个环境变量都是全小写。
也许,你可能会有疑惑:一个 Hadoop Streaming 程序,一般都有多个 mapper,它们的数据可能来自于不同的文件,但是它们的文件名却都保存在 mapreduce_map_input_file
环境变量中。一个环境变量如何能保存多个不同的文件名呢?
其实,这里的 mapreduce_map_input_file
/map_input_file
环境变量都是 process-local 的。也就是说,不同的 mapper,它们都分别有自己的 mapreduce_map_input_file
/map_input_file
环境变量。它们的值可能相同,也可能不同,互不干扰,是隔离的。究其原因嘛,是因为 MapReduce 是多进程框架,各个 mapper 对应不同的 JVM 进程,而进程之间当然是互相隔离的。
mapreduce_map_input_file
环境变量其实来源于 mapreduce.map.input.file
。Apache 官网是这么解释的:
Note: During the execution of a streaming job, the names of the "mapred" parameters are transformed. The dots ( . ) become underscores ( _ ). For example, mapred.job.id becomes mapred_job_id and mapred.jar becomes mapred_jar. To get the values in a streaming job's mapper/reducer use the parameter names with the underscores.
除 mapreduce.map.input.file
/mapreduce_map_input_file
以外,Hadoop/MapReduce 还有哪些进程空间内的属性/变量呢?
下表是一个简单的总结:
------------------------------------------------------------------------------------------------- Name Type Description ------------------------------------------------------------------------------------------------- mapred.job.id String The job id mapred.jar String job.jar location in job directory job.local.dir String The job specific shared scratch space mapred.tip.id String The task id mapred.task.id String The task attempt id mapred.task.is.map boolean Is this a map task mapred.task.partition int The id of the task within the job map.input.file String The filename that the map is reading from map.input.start long The offset of the start of the map input split map.input.length long The number of bytes in the map input split mapred.work.output.dir String The task's temporary output directory -------------------------------------------------------------------------------------------------
参考链接:
[…] 之前一篇博客提到了在 Hadoop 程序中如何获取 input split 所在的文件名。其实,写 Awk 程序时,也可能有同样的需求。那么,Awk 是如何处理的呢? […]