3.2.2 MapReduce的代码规范

开始你会发现写MapReduce的确很简单,慢慢的你会有些厌烦它。因为毕竟重复性的代码太多,不够简洁大方。

要写好MapReduce的代码,掌握以下3个板块就够了。

· 第1个板块:Map阶段。

· 第2个板块:Reduce阶段。

· 第3个板块:Run阶段。

整体结构很简单,直观来看Map阶段就分为两部分:setup和map。一个是初始化Map阶段的全局变量、常量;另一个是数据在Map阶段需要解析的过程。

      public static class dealMap extends Mapper<Object, Text, Text, Text>{
            //Hadoop中常用的内置数据类型
            //IntWritable:整型数
            //DoubleWritable:双字节数值
            //Text:使用UTF8格式存储的文本
            //NullWritable:当key或value为空时使用

            @Override
            protected void setup(Context context)
                    throws IOException, InterruptedException{
                /**
                  * 初始化Map阶段的全局变量
                  */
            }
            public void map(Object key, Text value, Context context)
                    throws IOException, InterruptedException {
                //主要做这三件事
                //1.按行读取文件,切分该行的数据字段
                //2.解析和计算每一行的数据
                //3.设置key和value传输到reduce端
              }
        }

初始化全局变量、常量的原因在于MapReduce是一个分布式计算框架,每一个计算节点都在独立处理相应的数据,如果在Map处理阶段设置全局变量是不能保证每个节点获取的值都一致的。

对于Map阶段,还需要补充以下3点说明。

· 数据类型:除了默认的文本格式,还有IntWritable、DoubleWritable和NullWritable这些常用的数据类型(具体含义看具体命名)。

· Map阶段的数据读取:可以从不同的数据源获取(HDFS、本地和HBase等),而且文件存储格式也接受文本、二进制等,但重点都是按行解析。

· 在Map阶段中处理的三件事:按特定分隔符解析输入数据、做一定程度的清洗、传输到Reduce阶段。

对于Reduce阶段,同样也很简单,分为setup和reduce两个步骤。一个是初始化Reduce阶段的全局变量、常量;另一个是数据在Reduce阶段需要汇总的过程。

      public static class dealReduce extends Reducer<Text, Text, Text, Text> {
            @Override
            protected void setup(Context context)
                throws IOException, InterruptedException{
                /**
                  * 初始化Reduce阶段的全局变量
                  */
            }

            public void reduce(Text key, Iterable<Text> values, Context
  context)
                    throws IOException, InterruptedException{
                //主要也做这三件事
                //1.按key读取map阶段的数据,做循环
                //2.计算和汇总每一类的数据
                //3.设置key和value传输到输出端
            }
      }

Map阶段和Reduce阶段的初始化变量、常量是需要单独设置的,毕竟集群在处理这两个阶段的过程中,或许不在同一台机器上。

对于Reduce阶段,需要补充以下3点说明。

· 数据类型:除了默认的文本格式,还有IntWritable、DoubleWritable和NullWritable这些常用的数据类型(具体含义看具体命名)。

· 从Map阶段获取数据:是根据Map阶段数据传输过程中的Key值来进行分组处理的,可以说是每个小组都会单独在Reduce处理一次。

· 在Reduce阶段中处理的两件事:根据不同的Key值分别进行聚合处理、传输到目标路径中存储。

Run阶段因人而异,有些人习惯归纳到main中统一处理,而笔者偏好于将其抽象出来单独维护。

      public static Boolean run(String input, String ouput)
            throws  IOException,  ClassNotFoundException,  Interrupted-
  Exception{
          Configuration conf = new Configuration();
          Job job = Job.getInstance(conf, "FrameWorkJob");
          job.setJarByClass(FrameWork.class);
          job.setMapperClass(dealMap.class);
          job.setReducerClass(dealReduce.class);
          job.setNumReduceTasks(1);
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(Text.class);

          //设置输入、输出文件路径,可以保证多文件
          Path output = new Path(ouput);
          FileInputFormat.setInputPaths(job, input);
          FileOutputFormat.setOutputPath(job, output);
          //设置每一次执行前先删除输出目录,防止报错
          output.getFileSystem(conf).delete(output, true);
          Boolean result=job.waitForCompletion(true);

          return result;
      }

看起来需要设置参数的地方很多,但是改动不大,很多时候都是直接复制就可以重新使用了。需要强调几个点的设置。

· job.setJarByClass(FrameWork.class); :代表整个工程类的名字,保持一致就可以了。同理对于job.setMapperClass和job.setReducerClass也一样。

· job.setNumReduceTasks(1):代表Reduce执行节点的个数,有时候也可以看作是输出文件的个数。

· job.setOutputKeyClass(Text.class)和job.setOutputValueClass(Text.class):代表输出数据的类型,分别是Key和Value的数据类型,和Reduce阶段保持一致就可以了。

总体来说,整个MapReduce编程就是这个过程,最后在整个工程类中添加main执行就算成功了,代码如下。

      public static void main(String[] args) throws Exception {
          run("输入文件目录", "输出文件目录");
      }