2.5 HDFS接口使用详解

我们知道,HDFS是构建在Java环境上的分布式存储框架,那么它能不能像其他Java服务框架一样可以通过使用Java提供的API程序来操作HDFS对文件进行读写操作,从而为其他Java服务提供链接与支持。

答案是肯定的。HDFS的文件解释与执行器本身就是一个运行良好的Java应用,可以对Java代码进行编译,生成可执行的字节码文件。因此,HDFS提供了一整套的API为Java提供对文件相关操作的支持。

小提示:同样,根据Java字节码执行原理,我们可以使用其他语言对HDFS进行操作,只需要在底层最终生成相应的字节码文件即可。

2.5.1 使用FileSystem API操作HDFS中的内容

使用API对HDFS进行操作,首先需要获得当前对象的文件系统实例,即FileSystem。

FileSystem在Hadoop框架源码“org.apache.hadoop.fs”包中,是关于Hadoop文件系统使用 Java 代码实现的相关操作类,主要包括文件系统的建立、文件定义、实现基本的文件操作等。

首先获取指定对象的文件系统实例,如下所示:

Configuration conf = new Configuration(); //获取环境变量

FileSystem fs = FileSystem.get(conf); //创建文件系统实例

FileSystem为我们提供了相应的方法对文件进行操作,源码如下所示:

public abstract URI getUri(); //获取能够唯一标识一个FileSystem的URI

public abstract FSDataInputStream pen(Path f, int bufferSize) throws IOException; //根据给定的Path f,打开一个文件的FSDataInputStream输入流

public abstract FSDataOutputStream create(Path f,FsPermission permission,Boolean verwrite,int ufferSize, short replication,long BlockSize,Progressable progress) throws IOException; //为写入进程打开一个FSDataOutputStream

public abstract FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException; //向一个已经存在的文件中执行追加操作

public abstract boolean rename(Path src, Path dst) throws IOException;//将文件重命名为dst所确定的形参

public abstract boolean delete(Path f) throws IOException; //删除文件

public abstract boolean delete(Path f, boolean recursive) throws IOException;//删除目录

public abstract FileStatus[] listStatus(Path f) throws IOException;//列出该目录中的文件

public abstract void setWorkingDirectory(Path new_dir);//设置当前工作目录

public abstract Path getWorkingDirectory();//获取当前工作目录

public abstract boolean mkdirs(Path f, FsPermission permission) throws IOException; //创建一个目录f

public abstract FileStatus getFileStatus(Path f) throws IOException;//获取对应的信息实例

源码注释上已经给出了对方法的详细说明,这里就不再重复。

小提示:如果读者对此阅读感到困难,请参考 Java IO的相关内容进行学习。有没有发现这些方法的设计和使用基本是一样的?这说明了什么呢?

上面这些方法提供了对一个标准文件系统进行的所有基本操作。根据需求的不同,程序设计人员可以设计和搭建不同的应用程序框架,从而实现不同的应用操作。

小提示:后面的实战部分会有一个云存储模型,如果读者从事的是相关工作,可以为你提供极大的便利哦。

下面我们根据FileSystem提供的API方法创建一个对文件进行查询并删除一个不需要的文件的演示程序,代码如程序2-1所示。

程序2-1

public class FileSystemOption {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration(); //获取环境变量

FileSystem fs = FileSystem.get(conf); //创建文件系统实例

Path dir = new Path("user/dir"); //设置一个目录路径

Path file = new Path(dir + "/a.txt"); //在设置目录下创建文件

fs.mkdirs(dir); //创建目录

fs.copyFromLocalFile(new Path("c://a.txt"), file); //复制一个文件

FileStatus status = fs.getFileStatus(file); //获取文件状态

System.out.println(status.getPath()); //获取绝对路径

System.out.println(status.getPath().toUri().getPath()); //获取相对路径

System.out.println(status.getBlockSize()); //获取当前Block大小

System.out.println(status.getGroup()); //获取所属组

System.out.println(status.getOwner()); //获取所有者

fs.delete(file, true); //删除文件目录

System.out.println(fs.isFile(file)); //确认删除结果

 }

}

从代码可以看到,此程序首先获得一个当前文件系统实例,之后通过Path定义了一个文件目录,调用mkdir方法去创建此目录。copyFromLocalFile方法从本地磁盘上复制了一个文件到指定目录下,并重命名为“a.txt”。

FileStatus类是定义文件基本信息的一个类。获得的FileSystem实例通过调用getFileStatus方法去获取指定路径下文件的一些基本信息,包装为FileStatus一个实例后返回。

获得FileStatus实例后,可以打印出其中包含的基本信息,例如路径、所属BlockSize大小、所属用户等一些基本信息。

信息打印后,FileSystem实例调用delete方法删除文件。这里读者注意到一个值为“true”的形参,它是用于确认如果此文件是目录并且其中不为空也进行删除。

最后,FileSystem实例通过isFile确认删除结果。

小提示:更具体的解释请参看2.5.2节。不过作者建议读者自己将上述代码运行一遍,以便有所了解。

2.5.2 使用FileSystem API读取数据详解

在介绍HDFS提供的FileSystem API之前,有一个概念需要读者理解。在学习Java的I/O类中,使用File类库为文件的读写建立路径。HDFS也一样,需要使用对应的类库建立相关路径。

HDFS中通常使用Path类定义需要的路径,具体代码见下:

Path path = new Path(“sample.txt”);

下面我们先来看一个例子,见程序2-2。

程序2-2

public class ReadSample {

public static void main(String[] args) throws Exception {

Path path = new Path("sample.txt"); //获取文件路径

Configuration conf = new Configuration(); //获取环境变量

FileSystem fs = FileSystem.get(conf); //获取文件系统实例

FSDataInputStream fsin= fs.open(path); //建立输入流

byte[] buff = new byte[128]; //建立缓存数组

int length = 0; //辅助长度

while( (length = fsin.read(buff, 0, 128)) != -1 ){ //将数据读入缓存数组

System.out.println(new String(buff,0,length)); //打印数据

 }

}

}

这是一个从指定的HDFS文件中读取数据并打印文件内容的程序,将代码生成对应的Jar文件上传到集群后,可以使用命令行程序运行,命令行如下:

$ hadoop jar ReadSample.jar ReadSample

下面我们对程序2-2作一个详细分析。

我们知道,任何文件系统都是与环境变量紧密联系在一起的,对于当前HDFS来说,在创建出当前文件系统实例之前,有必要获得当前的环境变量。代码如下:

Configuration conf = new Configuration();

Configuration 类为用户提供了获取当前环境变量的一个实例,实例封装了当前搭载环境的配置,这配置是由core-site.xml设置的。

而对于使用 HDFS 提供的 FileSystem,需要提供一套加载当前环境并建立读写路径的API,使用的方法如下所示:

public static FileSystem get(Configuration conf) throws IOException

public static FileSystem get(URI uri, Configuration conf) throws IOException

通过方法名可知,此代码为重载的方法,使用传入的环境变量以获取对应的HDFS文件系统。第一个方法是使用默认的URI地址获取当前对象中环境变量加载的文件系统,第二个方法是使用传入的URI获取路径指定的文件系统。

程序 2-2第六行代码使用 fs.open(Path path)方法打开数据的输入流,open方法的源码如下所示:

public FSDataInputStream open(Path path) throws IOException {//打开输入流

return open(path, getConf().getInt("io.file.buffer.size", 4096));//调用输入流创建方法

}

从源码分析可知,调用的open方法根据传进来的path路径,获取环境变量,并设置文件读取的缓冲区大小(如果未设定,则以默认的 4096 大小设定),之后返回一个数据流FSDataInputStream实例。在对FSDataInputStream进行分析之前,请读者将第六行代码替换为如下代码:

InputStream fsih=fs.open(Path);

程序依旧可以正常运行。

通过分析可知,InputStream 是一个标准的I/O类,目的是提供一个标准输入流。而通过替换代码可以了解,FSDataInputStream与DataInputStream互为继承关系,查看源码可知, FSDataInputStream 在继承DataInputStream 后又实现了两个接口,为HDFS提供了建立输入流的功能。部分源码如图所示:

public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable { //FSDataInputStream实现类

 ...

  public synchronized void seek(long desired) throws IOException {//seek接口实现方法

  ((Seekable)in).seek(desired);

 }

 ...

public int read(long position, byte[] buffer, int offset, int length) throws IOException {

  return ((PositionedReadable)in).read(position, buffer, offset, length); // read方法

 }

public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {

  ((PositionedReadable)in).readFully(position, buffer, offset, length);//PositionedReadable中的readFully实现方法

 }

  public void readFully(long position, byte[] buffer) throws IOException { ((PositionedReadable)in).readFully(position, buffer, 0, buffer.length);//readFully实现方法

 }

这里额外实现了两个接口。首先,PositionedReadable接口通过实现read方法以及多个重载的readFully方法为文件从指定偏移处读取数据至内存中。read方法中的形参定义说明如下:

● position指的是使用long定义的数据偏移量,用于指定读取的开始位置。

● buffer是设定的缓存byte数组,用以存放读取的数据,默认为4096。

● offset是从指定缓存数组开始计算的偏移量。

● length是每次读取的长度。

小提示:read方法的实质就是从所需要读取文件指定的position处读取长度为length字节的数据至指定的buffer数组中。

而另外一个接口Seekable为我们提供了 seek(long desired)方法,来实现对数据的重定位,seek可以移动到文件的任何一个绝对位置,例如使用seek(0)移动到文件的开始位置。seek与read的使用方法如程序2-3所示。

程序2-3

public class FSDSample {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration(); //获取环境变量

FileSystem fs = FileSystem.get(conf); //获取文件系统

FSDataInputStream fsin = fs.open(new Path("sample.txt")); //建立输入流

byte[] buff = new byte[128]; //建立辅助字节数组

int length = 0;

while( (length = fsin.read(buff, 0, 128)) != -1 ){ //将数据读入缓存

System.out.println(new String(buff,0,length)); //打印数据

}

System.out.println("length = " + fsin.getPos()); //打印输入流的长度

fsin.seek(0); //返回开始处

while( (length = fsin.read(buff, 0, 128)) != -1 ){ //将数据读入缓存

System.out.println(new String(buff,0,length)); //打印数据

}

fsin.seek(0); //返回开始处

byte[] buff2 = new byte[128]; //建立辅助字节数组

fsin.read(buff2, 0, 128); //将数据读入缓存数组System.out.println("buff2 =" + new String(buff2)); //打印数据

System.out.println(buff2.length); //打印数组长度

 }

}

上述代码的作用是重复读取指定HDFS中文件的内容,第一次读取结束后,调用seek(0)方法将输入流重定向,返回文件开始处重新进行数据读取。

小提示:请读者换个seek位置进行读取,查看结果。

2.5.3 使用FileSystem API写入数据详解

任何文件系统 API,除了提供对文件的读操作支持外,都会提供相应的功能对文件的写操作提供支持。对于 FileSystem API来说也是如此。

首先我们看下面这段源码:

public FSDataOutputStream create(Path f) throws IOException {//使用creat方法创造一个输出流

return create(f, true); //调用重载的creat方法

}

...

public FSDataOutputStream create(Path f, boolean overwrite)//重载的creat方法

throws IOException {

return create(f, overwrite,//创建文件输入流

getConf().getInt("io.file.buffer.size", 4096),//使用默认缓存

getDefaultReplication(),//使用默认复制数

getDefaultBlockSize()); //使用默认Block尺寸

}

与 open方法类似,create(Path f)依次为我们打开了用于创建文件输出流的一条通道。

小提示:FSDataOutputStream 的方法具有多个重载版本,通过依次调用,对是否复写已有文件、文件的缓存、保存时复制的副本数量以及文件块大小等有了明确的规定,若没有指定,则以默认值取代。

请观察 creat 方法的返回值,这里的返回是一个 FSDataOutputStream 对象实例。与FSDataInputStream类似,FSDataOutputStream也是继承自OutputStream的一个子类,用于为FileSystem创建文件的输出流。FSDataOutputStream中的write方法可以对数据文件进行相应的写操作。示例如程序2-4所示。

程序2-4

public class FSWriteSample {

  public static void main(String[] args) throws Exception {

  Path path = new Path("writeSample.txt"); //创建写路径

  Configuration conf = new Configuration(); //获取环境变量

  FileSystem fs = FileSystem.get(conf); //获取文件系统

  FSDataOutputStream fsout = fs.create(path); //创建输出流

  byte[] buff = "hello world".getBytes(); //设置输出字节数组

  fsout.write(buff); //开始写出数组

  IOUtils.closeStream(fsout); //关闭写出流

 }

}

为了将文件写入HDFS系统中,这里使用write方法,将对应的字节数组进行写出。在写出结束后,调用close方法,从而完成写操作。

除此之外,FileSystem实例还为我们提供了一系列的对文件在末尾进行追加操作的方法,源码如下所示:

public FSDataOutputStream append(Path f, int bufferSize) throws IOException { return append(f, bufferSize, null); //调用重载的追加方法

}

...

public abstract FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException; //追加方法

append方式是对已有文件进行数据追加的方法,请读者自行使用。

细心的读者可能已经注意到,源代码中默认的 append 方法传递的参数中有一个是Progressable progress的形参,Progressable 接口源码如下所示:

public interface Progressable {

public void progress(); //调用progress方法

}

其中只有一个progress方法,定义是每次在64K大小的文件被写入既定的输入流以后, progress方法会被调用一次,这可以用于显示输出进度。代码如程序2-5所示。

程序2-5

public class FSWriteSample2 {

static int index = 0;

public static void main(String[] args) throws Exception {

  StringBuffer sb = new StringBuffer(); //创建辅助可变字符串

  Random rand = new Random();

  for(int i = 0 ; i < 9999999;i++){ //随机写入字符

 sb.append((char)rand.nextInt(100));

 }

  byte[] buff = sb.toString().getBytes(); //生成字符数组

  Path path = new Path("writeSample.txt"); //创建路径

  Configuration conf = new Configuration(); //获取环境变量

  FileSystem fs = FileSystem.get(conf); //获取文件系统

  FSDataOutputStream fsout = fs.create(path,new Progressable() {//创建写出流

 @Override

  public void progress() { //默认的实用方法

  System.out.println(++index); //打印出序列号

 }

 });

  fsout.write(buff); //开始写出操作

  IOUtils.closeStream(fsout); //关闭写出流

 }

}

读者可以通过如下命令运行此代码,运行结果请读者自行查阅。

$ hadoop jar FSWriteSample2.jar FSWriteSample2

注意:FSDataOutputStream与FSDataInputStream类似,也有getPos方法,返回值是文件内已读取的长度。但是不同之处在于,FSDataOutputStream不能够使用seek方法对文件重新定位。