- MapReduce 2.0源码分析与编程实战
- 王晓华
- 1128字
- 2020-06-26 13:50:12
2.5 HDFS接口使用详解
我们知道,HDFS是构建在Java环境上的分布式存储框架,那么它能不能像其他Java服务框架一样可以通过使用Java提供的API程序来操作HDFS对文件进行读写操作,从而为其他Java服务提供链接与支持。
答案是肯定的。HDFS的文件解释与执行器本身就是一个运行良好的Java应用,可以对Java代码进行编译,生成可执行的字节码文件。因此,HDFS提供了一整套的API为Java提供对文件相关操作的支持。
小提示:同样,根据Java字节码执行原理,我们可以使用其他语言对HDFS进行操作,只需要在底层最终生成相应的字节码文件即可。
2.5.1 使用FileSystem API操作HDFS中的内容
使用API对HDFS进行操作,首先需要获得当前对象的文件系统实例,即FileSystem。
FileSystem在Hadoop框架源码“org.apache.hadoop.fs”包中,是关于Hadoop文件系统使用 Java 代码实现的相关操作类,主要包括文件系统的建立、文件定义、实现基本的文件操作等。
首先获取指定对象的文件系统实例,如下所示:
Configuration conf = new Configuration(); //获取环境变量
FileSystem fs = FileSystem.get(conf); //创建文件系统实例
FileSystem为我们提供了相应的方法对文件进行操作,源码如下所示:
public abstract URI getUri(); //获取能够唯一标识一个FileSystem的URI
public abstract FSDataInputStream pen(Path f, int bufferSize) throws IOException; //根据给定的Path f,打开一个文件的FSDataInputStream输入流
public abstract FSDataOutputStream create(Path f,FsPermission permission,Boolean verwrite,int ufferSize, short replication,long BlockSize,Progressable progress) throws IOException; //为写入进程打开一个FSDataOutputStream
public abstract FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException; //向一个已经存在的文件中执行追加操作
public abstract boolean rename(Path src, Path dst) throws IOException;//将文件重命名为dst所确定的形参
public abstract boolean delete(Path f) throws IOException; //删除文件
public abstract boolean delete(Path f, boolean recursive) throws IOException;//删除目录
public abstract FileStatus[] listStatus(Path f) throws IOException;//列出该目录中的文件
public abstract void setWorkingDirectory(Path new_dir);//设置当前工作目录
public abstract Path getWorkingDirectory();//获取当前工作目录
public abstract boolean mkdirs(Path f, FsPermission permission) throws IOException; //创建一个目录f
public abstract FileStatus getFileStatus(Path f) throws IOException;//获取对应的信息实例
源码注释上已经给出了对方法的详细说明,这里就不再重复。
小提示:如果读者对此阅读感到困难,请参考 Java IO的相关内容进行学习。有没有发现这些方法的设计和使用基本是一样的?这说明了什么呢?
上面这些方法提供了对一个标准文件系统进行的所有基本操作。根据需求的不同,程序设计人员可以设计和搭建不同的应用程序框架,从而实现不同的应用操作。
小提示:后面的实战部分会有一个云存储模型,如果读者从事的是相关工作,可以为你提供极大的便利哦。
下面我们根据FileSystem提供的API方法创建一个对文件进行查询并删除一个不需要的文件的演示程序,代码如程序2-1所示。
程序2-1
public class FileSystemOption {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); //获取环境变量
FileSystem fs = FileSystem.get(conf); //创建文件系统实例
Path dir = new Path("user/dir"); //设置一个目录路径
Path file = new Path(dir + "/a.txt"); //在设置目录下创建文件
fs.mkdirs(dir); //创建目录
fs.copyFromLocalFile(new Path("c://a.txt"), file); //复制一个文件
FileStatus status = fs.getFileStatus(file); //获取文件状态
System.out.println(status.getPath()); //获取绝对路径
System.out.println(status.getPath().toUri().getPath()); //获取相对路径
System.out.println(status.getBlockSize()); //获取当前Block大小
System.out.println(status.getGroup()); //获取所属组
System.out.println(status.getOwner()); //获取所有者
fs.delete(file, true); //删除文件目录
System.out.println(fs.isFile(file)); //确认删除结果
}
}
从代码可以看到,此程序首先获得一个当前文件系统实例,之后通过Path定义了一个文件目录,调用mkdir方法去创建此目录。copyFromLocalFile方法从本地磁盘上复制了一个文件到指定目录下,并重命名为“a.txt”。
FileStatus类是定义文件基本信息的一个类。获得的FileSystem实例通过调用getFileStatus方法去获取指定路径下文件的一些基本信息,包装为FileStatus一个实例后返回。
获得FileStatus实例后,可以打印出其中包含的基本信息,例如路径、所属BlockSize大小、所属用户等一些基本信息。
信息打印后,FileSystem实例调用delete方法删除文件。这里读者注意到一个值为“true”的形参,它是用于确认如果此文件是目录并且其中不为空也进行删除。
最后,FileSystem实例通过isFile确认删除结果。
小提示:更具体的解释请参看2.5.2节。不过作者建议读者自己将上述代码运行一遍,以便有所了解。
2.5.2 使用FileSystem API读取数据详解
在介绍HDFS提供的FileSystem API之前,有一个概念需要读者理解。在学习Java的I/O类中,使用File类库为文件的读写建立路径。HDFS也一样,需要使用对应的类库建立相关路径。
HDFS中通常使用Path类定义需要的路径,具体代码见下:
Path path = new Path(“sample.txt”);
下面我们先来看一个例子,见程序2-2。
程序2-2
public class ReadSample {
public static void main(String[] args) throws Exception {
Path path = new Path("sample.txt"); //获取文件路径
Configuration conf = new Configuration(); //获取环境变量
FileSystem fs = FileSystem.get(conf); //获取文件系统实例
FSDataInputStream fsin= fs.open(path); //建立输入流
byte[] buff = new byte[128]; //建立缓存数组
int length = 0; //辅助长度
while( (length = fsin.read(buff, 0, 128)) != -1 ){ //将数据读入缓存数组
System.out.println(new String(buff,0,length)); //打印数据
}
}
}
这是一个从指定的HDFS文件中读取数据并打印文件内容的程序,将代码生成对应的Jar文件上传到集群后,可以使用命令行程序运行,命令行如下:
$ hadoop jar ReadSample.jar ReadSample
下面我们对程序2-2作一个详细分析。
我们知道,任何文件系统都是与环境变量紧密联系在一起的,对于当前HDFS来说,在创建出当前文件系统实例之前,有必要获得当前的环境变量。代码如下:
Configuration conf = new Configuration();
Configuration 类为用户提供了获取当前环境变量的一个实例,实例封装了当前搭载环境的配置,这配置是由core-site.xml设置的。
而对于使用 HDFS 提供的 FileSystem,需要提供一套加载当前环境并建立读写路径的API,使用的方法如下所示:
public static FileSystem get(Configuration conf) throws IOException
…
public static FileSystem get(URI uri, Configuration conf) throws IOException
通过方法名可知,此代码为重载的方法,使用传入的环境变量以获取对应的HDFS文件系统。第一个方法是使用默认的URI地址获取当前对象中环境变量加载的文件系统,第二个方法是使用传入的URI获取路径指定的文件系统。
程序 2-2第六行代码使用 fs.open(Path path)方法打开数据的输入流,open方法的源码如下所示:
public FSDataInputStream open(Path path) throws IOException {//打开输入流
return open(path, getConf().getInt("io.file.buffer.size", 4096));//调用输入流创建方法
}
从源码分析可知,调用的open方法根据传进来的path路径,获取环境变量,并设置文件读取的缓冲区大小(如果未设定,则以默认的 4096 大小设定),之后返回一个数据流FSDataInputStream实例。在对FSDataInputStream进行分析之前,请读者将第六行代码替换为如下代码:
InputStream fsih=fs.open(Path);
程序依旧可以正常运行。
通过分析可知,InputStream 是一个标准的I/O类,目的是提供一个标准输入流。而通过替换代码可以了解,FSDataInputStream与DataInputStream互为继承关系,查看源码可知, FSDataInputStream 在继承DataInputStream 后又实现了两个接口,为HDFS提供了建立输入流的功能。部分源码如图所示:
public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable { //FSDataInputStream实现类
...
public synchronized void seek(long desired) throws IOException {//seek接口实现方法
((Seekable)in).seek(desired);
}
...
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
return ((PositionedReadable)in).read(position, buffer, offset, length); // read方法
}
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
((PositionedReadable)in).readFully(position, buffer, offset, length);//PositionedReadable中的readFully实现方法
}
public void readFully(long position, byte[] buffer) throws IOException { ((PositionedReadable)in).readFully(position, buffer, 0, buffer.length);//readFully实现方法
}
这里额外实现了两个接口。首先,PositionedReadable接口通过实现read方法以及多个重载的readFully方法为文件从指定偏移处读取数据至内存中。read方法中的形参定义说明如下:
● position指的是使用long定义的数据偏移量,用于指定读取的开始位置。
● buffer是设定的缓存byte数组,用以存放读取的数据,默认为4096。
● offset是从指定缓存数组开始计算的偏移量。
● length是每次读取的长度。
小提示:read方法的实质就是从所需要读取文件指定的position处读取长度为length字节的数据至指定的buffer数组中。
而另外一个接口Seekable为我们提供了 seek(long desired)方法,来实现对数据的重定位,seek可以移动到文件的任何一个绝对位置,例如使用seek(0)移动到文件的开始位置。seek与read的使用方法如程序2-3所示。
程序2-3
public class FSDSample {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); //获取环境变量
FileSystem fs = FileSystem.get(conf); //获取文件系统
FSDataInputStream fsin = fs.open(new Path("sample.txt")); //建立输入流
byte[] buff = new byte[128]; //建立辅助字节数组
int length = 0;
while( (length = fsin.read(buff, 0, 128)) != -1 ){ //将数据读入缓存
System.out.println(new String(buff,0,length)); //打印数据
}
System.out.println("length = " + fsin.getPos()); //打印输入流的长度
fsin.seek(0); //返回开始处
while( (length = fsin.read(buff, 0, 128)) != -1 ){ //将数据读入缓存
System.out.println(new String(buff,0,length)); //打印数据
}
fsin.seek(0); //返回开始处
byte[] buff2 = new byte[128]; //建立辅助字节数组
fsin.read(buff2, 0, 128); //将数据读入缓存数组System.out.println("buff2 =" + new String(buff2)); //打印数据
System.out.println(buff2.length); //打印数组长度
}
}
上述代码的作用是重复读取指定HDFS中文件的内容,第一次读取结束后,调用seek(0)方法将输入流重定向,返回文件开始处重新进行数据读取。
小提示:请读者换个seek位置进行读取,查看结果。
2.5.3 使用FileSystem API写入数据详解
任何文件系统 API,除了提供对文件的读操作支持外,都会提供相应的功能对文件的写操作提供支持。对于 FileSystem API来说也是如此。
首先我们看下面这段源码:
public FSDataOutputStream create(Path f) throws IOException {//使用creat方法创造一个输出流
return create(f, true); //调用重载的creat方法
}
...
public FSDataOutputStream create(Path f, boolean overwrite)//重载的creat方法
throws IOException {
return create(f, overwrite,//创建文件输入流
getConf().getInt("io.file.buffer.size", 4096),//使用默认缓存
getDefaultReplication(),//使用默认复制数
getDefaultBlockSize()); //使用默认Block尺寸
}
与 open方法类似,create(Path f)依次为我们打开了用于创建文件输出流的一条通道。
小提示:FSDataOutputStream 的方法具有多个重载版本,通过依次调用,对是否复写已有文件、文件的缓存、保存时复制的副本数量以及文件块大小等有了明确的规定,若没有指定,则以默认值取代。
请观察 creat 方法的返回值,这里的返回是一个 FSDataOutputStream 对象实例。与FSDataInputStream类似,FSDataOutputStream也是继承自OutputStream的一个子类,用于为FileSystem创建文件的输出流。FSDataOutputStream中的write方法可以对数据文件进行相应的写操作。示例如程序2-4所示。
程序2-4
public class FSWriteSample {
public static void main(String[] args) throws Exception {
Path path = new Path("writeSample.txt"); //创建写路径
Configuration conf = new Configuration(); //获取环境变量
FileSystem fs = FileSystem.get(conf); //获取文件系统
FSDataOutputStream fsout = fs.create(path); //创建输出流
byte[] buff = "hello world".getBytes(); //设置输出字节数组
fsout.write(buff); //开始写出数组
IOUtils.closeStream(fsout); //关闭写出流
}
}
为了将文件写入HDFS系统中,这里使用write方法,将对应的字节数组进行写出。在写出结束后,调用close方法,从而完成写操作。
除此之外,FileSystem实例还为我们提供了一系列的对文件在末尾进行追加操作的方法,源码如下所示:
public FSDataOutputStream append(Path f, int bufferSize) throws IOException { return append(f, bufferSize, null); //调用重载的追加方法
}
...
public abstract FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException; //追加方法
append方式是对已有文件进行数据追加的方法,请读者自行使用。
细心的读者可能已经注意到,源代码中默认的 append 方法传递的参数中有一个是Progressable progress的形参,Progressable 接口源码如下所示:
public interface Progressable {
public void progress(); //调用progress方法
}
其中只有一个progress方法,定义是每次在64K大小的文件被写入既定的输入流以后, progress方法会被调用一次,这可以用于显示输出进度。代码如程序2-5所示。
程序2-5
public class FSWriteSample2 {
static int index = 0;
public static void main(String[] args) throws Exception {
StringBuffer sb = new StringBuffer(); //创建辅助可变字符串
Random rand = new Random();
for(int i = 0 ; i < 9999999;i++){ //随机写入字符
sb.append((char)rand.nextInt(100));
}
byte[] buff = sb.toString().getBytes(); //生成字符数组
Path path = new Path("writeSample.txt"); //创建路径
Configuration conf = new Configuration(); //获取环境变量
FileSystem fs = FileSystem.get(conf); //获取文件系统
FSDataOutputStream fsout = fs.create(path,new Progressable() {//创建写出流
@Override
public void progress() { //默认的实用方法
System.out.println(++index); //打印出序列号
}
});
fsout.write(buff); //开始写出操作
IOUtils.closeStream(fsout); //关闭写出流
}
}
读者可以通过如下命令运行此代码,运行结果请读者自行查阅。
$ hadoop jar FSWriteSample2.jar FSWriteSample2
注意:FSDataOutputStream与FSDataInputStream类似,也有getPos方法,返回值是文件内已读取的长度。但是不同之处在于,FSDataOutputStream不能够使用seek方法对文件重新定位。