3.2 操作实践

前面主要介绍了HDFS系统接口和编程方式,本节介绍HDFS中Java编程的操作实例。

3.2.1 文件操作

使用命令行编写HDFS程序,通常有三个步骤。

首先,编写HDFS程序源码,并通过java编译器编译成字节码。

然后,将字节码打包成JAR文件。

最后,通过Hadoop加载JAR文件,并运行。

下面,我们以一个完整的文件操作为例来说明。程序的主要功能如下。

(1)在HDFS文件系统中创建一个名为“hdtest”的目录。

(2)将本地名为“hfile.txt”的文件上传到HDFS中的hdtest目录下面。

(3)遍历hdtest目录。

(4)将HDFS中的hdtest/hfile.txt文件下载到本地,并另存为“hfile2.txt”。

程序的源代码如下:

        import java.io.ByteArrayOutputStream;
        import java.io.IOException;
        import java.io.OutputStream;
        import java.net.URI;
        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.fs.FSDataInputStream;
        import org.apache.hadoop.fs.FSDataOutputStream;
        import org.apache.hadoop.fs.FileStatus;
        import org.apache.hadoop.fs.FileSystem;
        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.io.IOUtils;
        public class HdfsTest {
            private static final String HADOOP_URL = "hdfs://test.hadoop:9000";
            private Configuration conf;
            /**
             * 构造函数
             */
            public HdfsTest() {
              this.conf = new Configuration();    }
            /**
             * 测试入口函数
             */
            public static void main(String[]args) throws IOException {
                HdfsTest hdfs = new HdfsTest();
                hdfs.createDir("/hdtest"); //创建目录
                hdfs.copyFile("file/hfile.txt", "/hdtest/hfile.txt"); //拷贝文件
                hdfs.ls("/hdtest");  //遍历目录
                hdfs.cat("/hdtest/hfile.txt");  //查看文件内容
                //下载文件并另存
                hdfs.download("/hdtest/hfile.txt", "file/hfile2.txt");
            }
             /**
             * 创建目录
             * @param folder
             * @throws IOException
             */
            public void createDir(String folder) throws IOException {
                Path path = new Path(folder);
                FileSystem fs = FileSystem.get(conf);
                if (! fs.exists(path)) {
                  fs.mkdirs(path);
                  System.out.println("Create: " + folder);
                }
                fs.close();
            }
            /**
             * 上传文件到HDFS
             * @param local
             * @param remote
             * @throws IOException
             */
            public void copyFile(String local, String remote) throws IOException {
                FileSystem fs = FileSystem.get(conf);
                fs.copyFromLocalFile(new Path(local), new Path(remote));
                System.out.println("copy from: " + local + " to " + remote);
                fs.close();
            }
            /**
             * 遍历文件
             * @param folder
             * @throws IOException
             */
           public void ls(String folder) throws IOException {
                Path path = new Path(folder);
                FileSystem fs = FileSystem.get(conf);
                FileStatus[]list = fs.listStatus(path);
                System.out.println("ls: " + folder);
                System.out.println("**********list begin*************");
                for (FileStatus f : list) {
                  System.out.printf("name: %s, folder: %s, size: %d\n",
                                      f.getPath(), f.isDir(), f.getLen());
                }
                System.out.println("**********list end*************");
                fs.close();
            }
            /**
             * 查看文件中的内容
             * @param remoteFile
             * @return    * @throws IOException
            */
           public String cat(String remoteFile) throws IOException {
              Path path = new Path(remoteFile);
              FileSystem fs = FileSystem.get(conf);
              FSDataInputStream fsdis = null;
              System.out.println("Content: " + remoteFile);
              OutputStream baos = new ByteArrayOutputStream();
              String str = null;
              try {
                  fsdis = fs.open(path);
                  IOUtils.copyBytes(fsdis, baos, 4096, false);
                  str = baos.toString();
              } finally {
                  IOUtils.closeStream(fsdis);
                  fs.close();
              }
              System.out.println(str);
              return str;
           }
           /**
            * 从HDFS中下载文件到本地
            * @param remote
            * @param local
            * @throws IOException
            */
           public void download(String remote, String local) throws IOException {
              Path path = new Path(remote);
              FileSystem fs = FileSystem.get(conf);
              fs.copyToLocalFile(path, new Path(local));
              System.out.println(
                "download file from'" + remote + "' to '" + local + "'");
              fs.close();
           }
           /**
            * 重命名文件
            * @param src
            * @param dst
            * @throws IOException
            */
           public void rename(String src, String dst) throws IOException {
              FileSystem fs = FileSystem.get(conf);
              fs.rename(new Path(src), new Path(dst));
              System.out.println("Rename:  " + src + " to " + dst);
              fs.close();
           }
           /**
            * 删除文件或目录
            * @param folder
            * @throws IOException
            */
           public void delete(String folder) throws IOException {
              Path path = new Path(folder);
              FileSystem fs = FileSystem.get(conf);
              fs.deleteOnExit(path);
              System.out.println("Delete: " + folder);
              fs.close();
           }
        }

编译HdfsTest.java源文件。Hadoop 2.x版本中JAR不再集中在一个hadoop-core*.jar中,而是分成多个JAR(如$HADOOP_HOME/share/hadoop/common/hadoop-common-2.6.0.jar、$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar、$HADOO P_HOME/share/hadoop/common/lib/commons-cli-1.2.jar等),通过“hadoop classpath”命令,可以得到运行Hadoop程序所需的全部classpath信息。

我们将Hadoop的classhpath信息添加到CLASSPATH变量中,然后直接编译:

        $ javac HdfsTest.java

编译时会有警告,可以忽略。编译后,可以看到生成的.class文件,如图3-10所示。

图3-10 编译并查看生成的.class文件

打包.class文件,如图3-11所示。

图3-11 打包.class文件并查看

运行测试,结果如图3-12所示。

图3-12 运行测试结果

由上面的运行结果可以看到,我们在HDFS文件系统中成功地创建了目录并上传/下载了一个文件。通过Fs Shell命令,可以验证查看已上传的文件,如图3-13所示。

图3-13 验证查看已上传的文件

此外,上述实例代码中,还提供了重命名(rename)和删除(delete)函数,感兴趣的读者可以自己测试一下。

使用命令行编译运行Java程序有些麻烦,每修改一次就需要手动编译、打包一次。对于较大规模的应用,可以使用Eclipse等集成环境进行开发,以提高开发效率。

3.2.2 压缩与解压缩

我们在HDFS中对数据进行压缩处理来优化磁盘使用率,提高数据在磁盘和网络中的传输速度,从而提高系统处理数据的效率。

Hadoop应对压缩格式的技术是自动识别。如果我们压缩的文件有相应压缩格式的扩展名(比如lzo、gz、bzip2等),Hadoop会根据压缩格式的扩展名,自动选择相对应的解码器来解压数据,此过程完全是Hadoop自动处理的,我们只须确保输入的压缩文件有扩展名。

Hadoop在Codec类(org.apache.hadoop.io.compress)中,实现了压缩和解压缩的接口CompressionCodec。可用的Codec实现类见表3-12。

表3-12 可用的Codec实现类

CompressionCodec有两个方法,可以帮助我们方便地压缩或解压数据。压缩数据时使用createOutputStream(OutputStream out)获取压缩输出流对象CompressionOutputStream,我们将未压缩的数据写入该流,它会帮我们压缩数据后,写出至底层的数据流out。

相反地,在解析数据的时候,使用createInputStream(InputStream in)获取解压缩输入流对象CompressionInputstream,通过它,我们可以从底层的数据流中读取解压后的数据。

CompressionOutputStream、CompressionInputStream与java.util.zip.DeflaterOutputStream、java.util.zip.DeflaterInputStream类似,但是,前者支持重置内部的压缩器(Compressor)与解压缩器(Decompressor)状态。

CompressionCodecFactory是Hadoop压缩框架中的另一个类,主要功能是负责根据不同的文件扩展名,来自动地获取相对应的压缩解压器,使用者可以通过它提供的方法,获得CompressionCodec,极大地增强了应用程序在处理压缩文件时的通用性。

除了前面介绍的createInputStream()和createInputStream()方法外,Hadoop中还有其他两种压缩模式。

一是压缩机Compressor和解压机Decompressor。在Hadoop的实现中,数据编码器和解码器被抽象成了两个接口:org.apache.hadoop.io.compress.Compressor和org.apache.hadoop.io.compress.Decompressor。它们规定了一系列的方法,所以,在Hadoop内部的编码/解码算法实现中都需要实现对应的接口。在实际的数据压缩与解压缩过程中,Hadoop为用户提供了统一的I/O流处理模式。

二是压缩流CompressionOutputStream和解压缩流CompressionInputStream。这两个类分别继承自java.io.OutputStream和java.io.InputStream,作用也类似。

下面,我们编码实现文件的压缩和解压缩操作。源程序如下:

        import java.io.IOException;
        import java.io.InputStream;
        import java.io.OutputStream;
        import java.net.URI;
        import org.apache.hadoop.conf.Configuration;
        import org.apache.hadoop.fs.FSDataInputStream;
        import org.apache.hadoop.fs.FSDataOutputStream;
        import org.apache.hadoop.fs.FileSystem;
        import org.apache.hadoop.fs.Path;
        import org.apache.hadoop.io.IOUtils;
        import org.apache.hadoop.io.compress.CompressionCodec;
        import org.apache.hadoop.io.compress.CompressionCodecFactory;
        import org.apache.hadoop.io.compress.CompressionInputStream;
        import org.apache.hadoop.io.compress.CompressionOutputStream;
        import org.apache.hadoop.util.ReflectionUtils;
        public class CompressTest {
            /**
            * 压缩文件
            * @param codecClassName
            * @param filein, fileout
            * @throws IOException
            */
            public static void compress(String codecClassName, String filein,
              String fileout) throws Exception {
              Class<? > codecClass = Class.forName(codecClassName);
              Configuration conf = new Configuration();
              FileSystem fs = FileSystem.get(conf);
              CompressionCodec codec = (CompressionCodec)ReflectionUtils
                                          .newInstance(codecClass, conf);
              //指定压缩文件路径
              FSDataOutputStream outputStream = fs.create(new Path(fileout));
              //指定要被压缩的文件路径
              FSDataInputStream in = fs.open(new Path(filein));
              //创建压缩输出流
              CompressionOutputStream out =
                codec.createOutputStream(outputStream);
              IOUtils.copyBytes(in, out, conf);
              IOUtils.closeStream(in);
              IOUtils.closeStream(out);
            }
            /**
            * 解压缩:使用文件扩展名来推断codec
            * @param fileuri
            * @throws IOException
            */      
            public static void uncompress(String fileuri) throws IOException {
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get(URI.create(fileuri), conf);
                Path inputPath = new Path(fileuri);
                CompressionCodecFactory factory = new CompressionCodecFactory(conf);
                CompressionCodec codec = factory.getCodec(inputPath);
                if(codec == null) {
                  System.out.println("no codec for " + fileuri);
                  System.exit(1);
                }
                String outputUri = CompressionCodecFactory.removeSuffix(
                                    fileuri, codec.getDefaultExtension());
                InputStream in = null;
                OutputStream out = null;
                try {
                  in = codec.createInputStream(fs.open(inputPath));
                  out = fs.create(new Path(outputUri));
                  IOUtils.copyBytes(in, out, conf);
                } finally {
                  IOUtils.closeStream(out);
                  IOUtils.closeStream(in);
                }
            }
            public static void main(String[]args) throws Exception {
                String filein = "/hdtest/bigdata.pdf";
                String fileout = "/hdtest/bigdatacom.gz";
                compress("org.apache.hadoop.io.compress.GzipCodec",
                          filein, fileout);
                //uncompress(fileout);
            }
        }

编译并打包运行。压缩操作运行的结果如图3-14所示。

图3-14 进行压缩操作并查看结果

解压缩操作的运行结果如图3-15所示。

图3-15 解压缩的运行结果