- 轻松学大数据挖掘:算法、场景与数据产品
- 汪榕
- 1265字
- 2020-08-28 01:52:17
3.2.2 MapReduce的代码规范
开始你会发现写MapReduce的确很简单,慢慢的你会有些厌烦它。因为毕竟重复性的代码太多,不够简洁大方。
要写好MapReduce的代码,掌握以下3个板块就够了。
· 第1个板块:Map阶段。
· 第2个板块:Reduce阶段。
· 第3个板块:Run阶段。
整体结构很简单,直观来看Map阶段就分为两部分:setup和map。一个是初始化Map阶段的全局变量、常量;另一个是数据在Map阶段需要解析的过程。
public static class dealMap extends Mapper<Object, Text, Text, Text>{ //Hadoop中常用的内置数据类型 //IntWritable:整型数 //DoubleWritable:双字节数值 //Text:使用UTF8格式存储的文本 //NullWritable:当key或value为空时使用 @Override protected void setup(Context context) throws IOException, InterruptedException{ /** * 初始化Map阶段的全局变量 */ } public void map(Object key, Text value, Context context) throws IOException, InterruptedException { //主要做这三件事 //1.按行读取文件,切分该行的数据字段 //2.解析和计算每一行的数据 //3.设置key和value传输到reduce端 } }
初始化全局变量、常量的原因在于MapReduce是一个分布式计算框架,每一个计算节点都在独立处理相应的数据,如果在Map处理阶段设置全局变量是不能保证每个节点获取的值都一致的。
对于Map阶段,还需要补充以下3点说明。
· 数据类型:除了默认的文本格式,还有IntWritable、DoubleWritable和NullWritable这些常用的数据类型(具体含义看具体命名)。
· Map阶段的数据读取:可以从不同的数据源获取(HDFS、本地和HBase等),而且文件存储格式也接受文本、二进制等,但重点都是按行解析。
· 在Map阶段中处理的三件事:按特定分隔符解析输入数据、做一定程度的清洗、传输到Reduce阶段。
对于Reduce阶段,同样也很简单,分为setup和reduce两个步骤。一个是初始化Reduce阶段的全局变量、常量;另一个是数据在Reduce阶段需要汇总的过程。
public static class dealReduce extends Reducer<Text, Text, Text, Text> { @Override protected void setup(Context context) throws IOException, InterruptedException{ /** * 初始化Reduce阶段的全局变量 */ } public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ //主要也做这三件事 //1.按key读取map阶段的数据,做循环 //2.计算和汇总每一类的数据 //3.设置key和value传输到输出端 } }
Map阶段和Reduce阶段的初始化变量、常量是需要单独设置的,毕竟集群在处理这两个阶段的过程中,或许不在同一台机器上。
对于Reduce阶段,需要补充以下3点说明。
· 数据类型:除了默认的文本格式,还有IntWritable、DoubleWritable和NullWritable这些常用的数据类型(具体含义看具体命名)。
· 从Map阶段获取数据:是根据Map阶段数据传输过程中的Key值来进行分组处理的,可以说是每个小组都会单独在Reduce处理一次。
· 在Reduce阶段中处理的两件事:根据不同的Key值分别进行聚合处理、传输到目标路径中存储。
Run阶段因人而异,有些人习惯归纳到main中统一处理,而笔者偏好于将其抽象出来单独维护。
public static Boolean run(String input, String ouput) throws IOException, ClassNotFoundException, Interrupted- Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "FrameWorkJob"); job.setJarByClass(FrameWork.class); job.setMapperClass(dealMap.class); job.setReducerClass(dealReduce.class); job.setNumReduceTasks(1); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置输入、输出文件路径,可以保证多文件 Path output = new Path(ouput); FileInputFormat.setInputPaths(job, input); FileOutputFormat.setOutputPath(job, output); //设置每一次执行前先删除输出目录,防止报错 output.getFileSystem(conf).delete(output, true); Boolean result=job.waitForCompletion(true); return result; }
看起来需要设置参数的地方很多,但是改动不大,很多时候都是直接复制就可以重新使用了。需要强调几个点的设置。
· job.setJarByClass(FrameWork.class); :代表整个工程类的名字,保持一致就可以了。同理对于job.setMapperClass和job.setReducerClass也一样。
· job.setNumReduceTasks(1):代表Reduce执行节点的个数,有时候也可以看作是输出文件的个数。
· job.setOutputKeyClass(Text.class)和job.setOutputValueClass(Text.class):代表输出数据的类型,分别是Key和Value的数据类型,和Reduce阶段保持一致就可以了。
总体来说,整个MapReduce编程就是这个过程,最后在整个工程类中添加main执行就算成功了,代码如下。
public static void main(String[] args) throws Exception { run("输入文件目录", "输出文件目录"); }