- 大数据:从基础理论到最佳实践
- 祁伟
- 2179字
- 2021-01-07 18:48:01
3.2 操作实践
前面主要介绍了HDFS系统接口和编程方式,本节介绍HDFS中Java编程的操作实例。
3.2.1 文件操作
使用命令行编写HDFS程序,通常有三个步骤。
首先,编写HDFS程序源码,并通过java编译器编译成字节码。
然后,将字节码打包成JAR文件。
最后,通过Hadoop加载JAR文件,并运行。
下面,我们以一个完整的文件操作为例来说明。程序的主要功能如下。
(1)在HDFS文件系统中创建一个名为“hdtest”的目录。
(2)将本地名为“hfile.txt”的文件上传到HDFS中的hdtest目录下面。
(3)遍历hdtest目录。
(4)将HDFS中的hdtest/hfile.txt文件下载到本地,并另存为“hfile2.txt”。
程序的源代码如下:
import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class HdfsTest { private static final String HADOOP_URL = "hdfs://test.hadoop:9000"; private Configuration conf; /** * 构造函数 */ public HdfsTest() { this.conf = new Configuration(); } /** * 测试入口函数 */ public static void main(String[]args) throws IOException { HdfsTest hdfs = new HdfsTest(); hdfs.createDir("/hdtest"); //创建目录 hdfs.copyFile("file/hfile.txt", "/hdtest/hfile.txt"); //拷贝文件 hdfs.ls("/hdtest"); //遍历目录 hdfs.cat("/hdtest/hfile.txt"); //查看文件内容 //下载文件并另存 hdfs.download("/hdtest/hfile.txt", "file/hfile2.txt"); } /** * 创建目录 * @param folder * @throws IOException */ public void createDir(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(conf); if (! fs.exists(path)) { fs.mkdirs(path); System.out.println("Create: " + folder); } fs.close(); } /** * 上传文件到HDFS * @param local * @param remote * @throws IOException */ public void copyFile(String local, String remote) throws IOException { FileSystem fs = FileSystem.get(conf); fs.copyFromLocalFile(new Path(local), new Path(remote)); System.out.println("copy from: " + local + " to " + remote); fs.close(); } /** * 遍历文件 * @param folder * @throws IOException */ public void ls(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(conf); FileStatus[]list = fs.listStatus(path); System.out.println("ls: " + folder); System.out.println("**********list begin*************"); for (FileStatus f : list) { System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen()); } System.out.println("**********list end*************"); fs.close(); } /** * 查看文件中的内容 * @param remoteFile * @return * @throws IOException */ public String cat(String remoteFile) throws IOException { Path path = new Path(remoteFile); FileSystem fs = FileSystem.get(conf); FSDataInputStream fsdis = null; System.out.println("Content: " + remoteFile); OutputStream baos = new ByteArrayOutputStream(); String str = null; try { fsdis = fs.open(path); IOUtils.copyBytes(fsdis, baos, 4096, false); str = baos.toString(); } finally { IOUtils.closeStream(fsdis); fs.close(); } System.out.println(str); return str; } /** * 从HDFS中下载文件到本地 * @param remote * @param local * @throws IOException */ public void download(String remote, String local) throws IOException { Path path = new Path(remote); FileSystem fs = FileSystem.get(conf); fs.copyToLocalFile(path, new Path(local)); System.out.println( "download file from'" + remote + "' to '" + local + "'"); fs.close(); } /** * 重命名文件 * @param src * @param dst * @throws IOException */ public void rename(String src, String dst) throws IOException { FileSystem fs = FileSystem.get(conf); fs.rename(new Path(src), new Path(dst)); System.out.println("Rename: " + src + " to " + dst); fs.close(); } /** * 删除文件或目录 * @param folder * @throws IOException */ public void delete(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(conf); fs.deleteOnExit(path); System.out.println("Delete: " + folder); fs.close(); } }
编译HdfsTest.java源文件。Hadoop 2.x版本中JAR不再集中在一个hadoop-core*.jar中,而是分成多个JAR(如$HADOOP_HOME/share/hadoop/common/hadoop-common-2.6.0.jar、$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar、$HADOO P_HOME/share/hadoop/common/lib/commons-cli-1.2.jar等),通过“hadoop classpath”命令,可以得到运行Hadoop程序所需的全部classpath信息。
我们将Hadoop的classhpath信息添加到CLASSPATH变量中,然后直接编译:
$ javac HdfsTest.java
编译时会有警告,可以忽略。编译后,可以看到生成的.class文件,如图3-10所示。
图3-10 编译并查看生成的.class文件
打包.class文件,如图3-11所示。
图3-11 打包.class文件并查看
运行测试,结果如图3-12所示。
图3-12 运行测试结果
由上面的运行结果可以看到,我们在HDFS文件系统中成功地创建了目录并上传/下载了一个文件。通过Fs Shell命令,可以验证查看已上传的文件,如图3-13所示。
图3-13 验证查看已上传的文件
此外,上述实例代码中,还提供了重命名(rename)和删除(delete)函数,感兴趣的读者可以自己测试一下。
使用命令行编译运行Java程序有些麻烦,每修改一次就需要手动编译、打包一次。对于较大规模的应用,可以使用Eclipse等集成环境进行开发,以提高开发效率。
3.2.2 压缩与解压缩
我们在HDFS中对数据进行压缩处理来优化磁盘使用率,提高数据在磁盘和网络中的传输速度,从而提高系统处理数据的效率。
Hadoop应对压缩格式的技术是自动识别。如果我们压缩的文件有相应压缩格式的扩展名(比如lzo、gz、bzip2等),Hadoop会根据压缩格式的扩展名,自动选择相对应的解码器来解压数据,此过程完全是Hadoop自动处理的,我们只须确保输入的压缩文件有扩展名。
Hadoop在Codec类(org.apache.hadoop.io.compress)中,实现了压缩和解压缩的接口CompressionCodec。可用的Codec实现类见表3-12。
表3-12 可用的Codec实现类
CompressionCodec有两个方法,可以帮助我们方便地压缩或解压数据。压缩数据时使用createOutputStream(OutputStream out)获取压缩输出流对象CompressionOutputStream,我们将未压缩的数据写入该流,它会帮我们压缩数据后,写出至底层的数据流out。
相反地,在解析数据的时候,使用createInputStream(InputStream in)获取解压缩输入流对象CompressionInputstream,通过它,我们可以从底层的数据流中读取解压后的数据。
CompressionOutputStream、CompressionInputStream与java.util.zip.DeflaterOutputStream、java.util.zip.DeflaterInputStream类似,但是,前者支持重置内部的压缩器(Compressor)与解压缩器(Decompressor)状态。
CompressionCodecFactory是Hadoop压缩框架中的另一个类,主要功能是负责根据不同的文件扩展名,来自动地获取相对应的压缩解压器,使用者可以通过它提供的方法,获得CompressionCodec,极大地增强了应用程序在处理压缩文件时的通用性。
除了前面介绍的createInputStream()和createInputStream()方法外,Hadoop中还有其他两种压缩模式。
一是压缩机Compressor和解压机Decompressor。在Hadoop的实现中,数据编码器和解码器被抽象成了两个接口:org.apache.hadoop.io.compress.Compressor和org.apache.hadoop.io.compress.Decompressor。它们规定了一系列的方法,所以,在Hadoop内部的编码/解码算法实现中都需要实现对应的接口。在实际的数据压缩与解压缩过程中,Hadoop为用户提供了统一的I/O流处理模式。
二是压缩流CompressionOutputStream和解压缩流CompressionInputStream。这两个类分别继承自java.io.OutputStream和java.io.InputStream,作用也类似。
下面,我们编码实现文件的压缩和解压缩操作。源程序如下:
import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionInputStream; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.util.ReflectionUtils; public class CompressTest { /** * 压缩文件 * @param codecClassName * @param filein, fileout * @throws IOException */ public static void compress(String codecClassName, String filein, String fileout) throws Exception { Class<? > codecClass = Class.forName(codecClassName); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); CompressionCodec codec = (CompressionCodec)ReflectionUtils .newInstance(codecClass, conf); //指定压缩文件路径 FSDataOutputStream outputStream = fs.create(new Path(fileout)); //指定要被压缩的文件路径 FSDataInputStream in = fs.open(new Path(filein)); //创建压缩输出流 CompressionOutputStream out = codec.createOutputStream(outputStream); IOUtils.copyBytes(in, out, conf); IOUtils.closeStream(in); IOUtils.closeStream(out); } /** * 解压缩:使用文件扩展名来推断codec * @param fileuri * @throws IOException */ public static void uncompress(String fileuri) throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(fileuri), conf); Path inputPath = new Path(fileuri); CompressionCodecFactory factory = new CompressionCodecFactory(conf); CompressionCodec codec = factory.getCodec(inputPath); if(codec == null) { System.out.println("no codec for " + fileuri); System.exit(1); } String outputUri = CompressionCodecFactory.removeSuffix( fileuri, codec.getDefaultExtension()); InputStream in = null; OutputStream out = null; try { in = codec.createInputStream(fs.open(inputPath)); out = fs.create(new Path(outputUri)); IOUtils.copyBytes(in, out, conf); } finally { IOUtils.closeStream(out); IOUtils.closeStream(in); } } public static void main(String[]args) throws Exception { String filein = "/hdtest/bigdata.pdf"; String fileout = "/hdtest/bigdatacom.gz"; compress("org.apache.hadoop.io.compress.GzipCodec", filein, fileout); //uncompress(fileout); } }
编译并打包运行。压缩操作运行的结果如图3-14所示。
图3-14 进行压缩操作并查看结果
解压缩操作的运行结果如图3-15所示。
图3-15 解压缩的运行结果