java实现对HDFS增删改查(CRUD)等操作

jopen 10年前

实现对HDFS增删改查CRUD等操作

1 查找

列出某个目录下的文件名称,hdfs命令如下所示:

hdfs dfs –ls/usr/app

java代码片段:

public void list(String srcPath) {               Configuration conf = new Configuration();               LOG.info("[Defaultfs] :" +conf.get("fs.default.name"));             conf.set("hadoop.job.ugi","app,app");   //It is not necessary for the default user.               FileSystem fs;              try {                       fs= FileSystem.get(conf);                       RemoteIterator<LocatedFileStatus>rmIterator = fs.listLocatedStatus(new Path(srcPath));                       while (rmIterator.hasNext()) {                                 Path path = rmIterator.next().getPath();                                 if(fs.isDirectory(path)){                                           LOG.info("-----------DirectoryName: "+path.getName());                                 }                                 else if(fs.isFile(path)){                                           LOG.info("-----------FileName: "+path.getName());                                 }                       }              }catch (IOException e) {                       LOG.error("list fileSysetm object stream.:" , e);                       new RuntimeException(e);              }    }  

</div> </div>

输出结果:

2014-03-11 22:38:15,329 INFO  (com.hdfs.client.SyncDFS:48) ------------File Name: README.txt

2014-03-11 22:38:15,331 INFO  (com.hdfs.client.SyncDFS:45) ------------Directory Name: blog_blogpost

2014-03-11 22:38:15,333 INFO  (com.hdfs.client.SyncDFS:45) ------------Directory Name: test

读取文件中的内容,hdfs命令如下:

hdfs dfs –cat /input

java 代码:

    public void readFile(String file){                   Configurationconf = new Configuration();                   FileSystemfs;                   try {                            fs= FileSystem.get(conf);                            Pathpath = new Path(file);                            if(!fs.exists(path)){                                     LOG.warn("file'"+ file+"' doesn't exist!");                                     return ;                            }                            FSDataInputStreamin = fs.open(path);                            Stringfilename = file.substring(file.lastIndexOf('/') + 1, file.length());                            OutputStreamout = new BufferedOutputStream(new FileOutputStream(                                                                           new File(filename)));                                    byte[] b = new byte[1024];                            int numBytes = 0;                            while ((numBytes = in.read(b)) > 0) {                                     out.write(b,0, numBytes);                            }                            in.close();                            out.close();                            fs.close();                   }catch (IOException e) {                            LOG.error("ifExists fs Exception caught! :" , e);                            new RuntimeException(e);                   }         }  
</div> </div>

获取文件的修改时间,java代码:

</div> </div>
    /**                * Gets the information about the file modifiedtime.                * @param source                * @throws IOException                */               public void getModificationTime(String source) throws IOException{                                                   Configurationconf = new Configuration();                                                   FileSystemfs = FileSystem.get(conf);                         PathsrcPath = new Path(source);                                                   // Check if the file alreadyexists                         if (!(fs.exists(srcPath))) {                         System.out.println("No such destination " + srcPath);                         return;                         }                         // Get the filename out of thefile path                         Stringfilename = source.substring(source.lastIndexOf('/') + 1, source.length());                                                   FileStatusfileStatus = fs.getFileStatus(srcPath);                         long modificationTime =fileStatus.getModificationTime();                                                   LOG.info("modified datetime: " + System.out.format("File %s; Modification time : %0.2f%n",filename,modificationTime));                                         }  

获取文件块定位信息,java代码:

    /**                  * Gets the file block location info                  * @param source                  * @throws IOException                  */                 public void getBlockLocations(String source) throws IOException{                           Configurationconf = new Configuration();                           FileSystemfs = FileSystem.get(conf);                           PathsrcPath = new Path(source);                                                       // Check if the file alreadyexists                           if (!(ifExists(source))) {                                    System.out.println("No such destination " + srcPath);                                    return;                           }                           // Get the filename out of thefile path                           Stringfilename = source.substring(source.lastIndexOf('/') + 1, source.length());                                                       FileStatusfileStatus = fs.getFileStatus(srcPath);                                                       BlockLocation[]blkLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());                           int blkCount = blkLocations.length;                                                       System.out.println("File :" + filename + "stored at:");                           for (int i=0; i < blkCount; i++) {                                    String[]hosts = blkLocations[i].getHosts();                                    LOG.info("host ip:" +System.out.format("Host %d: %s %n", i, hosts));                           }                 }  
</div> </div>

获取Hadoop集群中data node的DNS主机名,java代码:

    public void getHostnames () throwsIOException{                           Configurationconfig = new Configuration();                                                     FileSystemfs = FileSystem.get(config);                           DistributedFileSystemhdfs = (DistributedFileSystem) fs;                           DatanodeInfo[]dataNodeStats = hdfs.getDataNodeStats();                                                       String[]names = new String[dataNodeStats.length];                           for (int i = 0; i < dataNodeStats.length; i++) {                                    names[i]= dataNodeStats[i].getHostName();                                    LOG.info("datenode hostname:"+(dataNodeStats[i].getHostName()));                           }                 }  
</div> </div>

 

2 创建

创建一个目录,指定具体的文件路径。hdfs命令如下:

hdfs dfs –mkdir/usr/app/tmp  


java代码:

       

    public void mkdir(String dir){                         Configurationconf = new Configuration();                         FileSystemfs = null;                         try {                                  fs= FileSystem.get(conf);                                  Pathpath = new Path(dir);                                  if(!fs.exists(path)){                                           fs.mkdirs(path);                                           LOG.debug("create directory '"+dir+"' successfully!");                                  }else{                                           LOG.debug("directory '"+dir+"' exits!");                                  }                         }catch (IOException e) {                                  LOG.error("FileSystem get configuration with anerror");                                  e.printStackTrace();                         }finally{                                  if(fs!= null){                                           try {                                                     fs.close();                                           }catch (IOException e) {                                                     LOG.error("close fs object stream. :" , e);                                                     new RuntimeException(e);                                           }                                  }                         }               }  
</div> </div>

将本地文件上传到hdfs上去,java代码如下:

</div> </div>
    public void copyFromLocal (String source, String dest) {                                                       Configurationconf = new Configuration();                           FileSystemfs;                           try {                                    fs= FileSystem.get(conf);                                    PathsrcPath = new Path(source);                                                                         PathdstPath = new Path(dest);                                    // Check if the file alreadyexists                                    if (!(fs.exists(dstPath))) {                                             LOG.warn("dstPathpath doesn't exist" );                                             LOG.error("No such destination " + dstPath);                                             return;                                    }                                                                         // Get the filename out of thefile path                                    Stringfilename = source.substring(source.lastIndexOf('/') + 1, source.length());                                                                         try{                                             //if the file exists in thedestination path, it will throw exception.        //                                   fs.copyFromLocalFile(srcPath,dstPath);                                             //remove and overwrite files withthe method                                             //copyFromLocalFile(booleandelSrc, boolean overwrite, Path src, Path dst)                                             fs.copyFromLocalFile(false, true, srcPath, dstPath);                                             LOG.info("File " + filename + "copied to " + dest);                                    }catch(Exception e){                                             LOG.error("copyFromLocalFile exception caught!:" , e);                                             new RuntimeException(e);                                    }finally{                                             fs.close();                                    }                           }catch (IOException e1) {                                    LOG.error("copyFromLocal IOException objectstream. :" ,e1);                                    new RuntimeException(e1);                           }                 }  


        

添加一个文件到指定的目录下,java代码如下:

      

    public void addFile(String source, String dest)  {                                 // Conf object will readthe HDFS configuration parameters                                 Configurationconf = new Configuration();                                 FileSystemfs;                                 try {                                          fs= FileSystem.get(conf);                                          // Get the filename out of thefile path                                          Stringfilename = source.substring(source.lastIndexOf('/') + 1, source.length());                                                                                   // Create the destination pathincluding the filename.                                          if (dest.charAt(dest.length() - 1) != '/') {                                                    dest= dest + "/" + filename;                                          }else {                                                    dest= dest + filename;                                          }                                                                                   // Check if the file alreadyexists                                          Pathpath = new Path(dest);                                          if (fs.exists(path)) {                                                    LOG.error("File " + dest + " already exists");                                                    return;                                          }                                                                                   // Create a new file and writedata to it.                                          FSDataOutputStreamout = fs.create(path);                                          InputStreamin = new BufferedInputStream(new FileInputStream(                                                                                                                                                          new File(source)));                                                                                   byte[] b = new byte[1024];                                          int numBytes = 0;                                          //In this way read and write datato destination file.                                          while ((numBytes = in.read(b)) > 0) {                                                    out.write(b,0, numBytes);                                          }                                          in.close();                                          out.close();                                          fs.close();                                 }catch (IOException e) {                                          LOG.error("addFile Exception caught! :" , e);                                          new RuntimeException(e);                                 }              }  
</div> </div>

 

3 修改

重新命名hdfs中的文件名称,java代码如下:

    

    public void renameFile (String fromthis, String tothis){                      Configurationconf = new Configuration();                                             FileSystemfs;                      try {                               fs= FileSystem.get(conf);                               PathfromPath = new Path(fromthis);                               PathtoPath = new Path(tothis);                                                               if (!(fs.exists(fromPath))) {                                        LOG.info("No such destination " + fromPath);                                   return;                               }                                                               if (fs.exists(toPath)) {                                        LOG.info("Already exists! " + toPath);                                   return;                               }                                                               try{                                        boolean isRenamed = fs.rename(fromPath,toPath);     //renames file name indeed.                                        if(isRenamed){                                                  LOG.info("Renamed from " + fromthis + " to " + tothis);                                        }                               }catch(Exception e){                                        LOG.error("renameFile Exception caught! :" , e);                                        new RuntimeException(e);                               }finally{                                        fs.close();                               }                      }catch (IOException e1) {                               LOG.error("fs Exception caught! :" , e1);                               new RuntimeException(e1);                      }            }  
</div> </div>

 

4 删除

在hdfs上,删除指定的一个文件。Java代码:

    public void deleteFile(String file)  {                                    Configurationconf = new Configuration();                                    FileSystemfs;                                    try {                                             fs= FileSystem.get(conf);                                                             Pathpath = new Path(file);                                             if (!fs.exists(path)) {                                                       LOG.info("File " + file + " does not exists");                                                       return;                                             }                                             /*                                             * recursively delete the file(s) if it is adirectory.                                             * If you want to mark the path that will bedeleted as                                             * a result of closing the FileSystem.                                             *  deleteOnExit(Path f)                                             */                                             fs.delete(new Path(file), true);                                             fs.close();                                    }catch (IOException e) {                                             LOG.error("deleteFile Exception caught! :" , e);                                             new RuntimeException(e);                                    }                                    }  
</div> </div>

 

Appendix 完整代码

import java.io.BufferedInputStream;    import java.io.BufferedOutputStream;    import java.io.File;    import java.io.FileInputStream;    import java.io.FileOutputStream;    import java.io.IOException;    import java.io.InputStream;    import java.io.OutputStream;         import org.apache.commons.logging.Log;    import org.apache.commons.logging.LogFactory;    import org.apache.hadoop.conf.Configuration;    import org.apache.hadoop.fs.BlockLocation;    import org.apache.hadoop.fs.FSDataInputStream;    import org.apache.hadoop.fs.FSDataOutputStream;    import org.apache.hadoop.fs.FileStatus;    import org.apache.hadoop.fs.FileSystem;    import org.apache.hadoop.fs.LocatedFileStatus;    import org.apache.hadoop.fs.Path;    import org.apache.hadoop.fs.RemoteIterator;    importorg.apache.hadoop.hdfs.DistributedFileSystem;    import org.apache.hadoop.hdfs.protocol.DatanodeInfo;         public class SyncDFS {                         private static final Log LOG = LogFactory.getLog(SyncDFS.class);                         /**             * Reads the directory name(s) and file name(s)from the specified parameter "srcPath"             * @param srcPath             */             public void list(String srcPath) {                        Configuration conf = new Configuration();                        LOG.info("[Defaultfs] :" +conf.get("fs.default.name"));    //                conf.set("hadoop.job.ugi","app,app");   //It is not necessary for the default user.                        FileSystem fs;                       try {                                fs= FileSystem.get(conf);                                RemoteIterator<LocatedFileStatus>rmIterator = fs.listLocatedStatus(new Path(srcPath));                                while (rmIterator.hasNext()) {                                          Path path = rmIterator.next().getPath();                                          if(fs.isDirectory(path)){                                                    LOG.info("-----------DirectoryName: "+path.getName());                                          }                                          else if(fs.isFile(path)){                                                    LOG.info("-----------FileName: "+path.getName());                                          }                                }                       }catch (IOException e) {                                LOG.error("list fileSysetm object stream.:" , e);                                new RuntimeException(e);                       }             }                  /**             * Makes the specified directory if it doesn'texist.             * @param dir             */             public void mkdir(String dir){                       Configurationconf = new Configuration();                       FileSystemfs = null;                       try {                                fs= FileSystem.get(conf);                                Pathpath = new Path(dir);                                if(!fs.exists(path)){                                         fs.mkdirs(path);                                         LOG.debug("create directory '"+dir+"' successfully!");                                }else{                                         LOG.debug("directory '"+dir+"' exits!");                                }                       }catch (IOException e) {                                LOG.error("FileSystem get configuration with anerror");                                e.printStackTrace();                       }finally{                                if(fs!= null){                                         try {                                                   fs.close();                                         }catch (IOException e) {                                                   LOG.error("close fs object stream. :" , e);                                                   new RuntimeException(e);                                         }                                }                       }             }                         /**             * Reads the file content in console.             * @param file             */             public void readFile(String file){                       Configurationconf = new Configuration();                       FileSystemfs;                       try {                                fs= FileSystem.get(conf);                                Pathpath = new Path(file);                                if(!fs.exists(path)){                                         LOG.warn("file'"+ file+"' doesn't exist!");                                         return ;                                }                                FSDataInputStreamin = fs.open(path);                                Stringfilename = file.substring(file.lastIndexOf('/') + 1, file.length());                                OutputStreamout = new BufferedOutputStream(new FileOutputStream(                                                                                new File(filename)));                                     byte[] b = new byte[1024];                                int numBytes = 0;                                while ((numBytes = in.read(b)) > 0) {                                         out.write(b,0, numBytes);                                }                                in.close();                                out.close();                                fs.close();                       }catch (IOException e) {                                LOG.error("ifExists fs Exception caught! :" , e);                                new RuntimeException(e);                       }             }                         public boolean ifExists(String source){                       if(source == null || source.length() ==0){                                return false;                       }                       Configurationconf = new Configuration();                       FileSystemfs = null;                       try {                                fs= FileSystem.get(conf);                                LOG.debug("judge file '"+source +  "'");                                return fs.exists(new Path(source));                       }catch (IOException e) {                                LOG.error("ifExists fs Exception caught! :" , e);                                new RuntimeException(e);                                return false;                       }finally{                                if(fs != null){                                         try {                                                   fs.close();                                         }catch (IOException e) {                                                   LOG.error("fs.close Exception caught! :" , e);                                                   new RuntimeException(e);                                         }                                }                                                      }                                   }                         /**             * Recursively copies the source pathdirectories or files to the destination path of DFS.             * It is the same functionality as thefollowing comand:             *      hadoopfs -copyFromLocal <local fs><hadoop fs>             * @param source             * @param dest             */             public void copyFromLocal (String source, String dest) {                                               Configurationconf = new Configuration();                       FileSystemfs;                       try {                                fs= FileSystem.get(conf);                                PathsrcPath = new Path(source);                                                                 PathdstPath = new Path(dest);                                // Check if the file alreadyexists                                if (!(fs.exists(dstPath))) {                                         LOG.warn("dstPathpath doesn't exist" );                                         LOG.error("No such destination " + dstPath);                                         return;                                }                                                                 // Get the filename out of thefile path                                Stringfilename = source.substring(source.lastIndexOf('/') + 1, source.length());                                                                 try{                                         //if the file exists in thedestination path, it will throw exception.    //                                   fs.copyFromLocalFile(srcPath,dstPath);                                         //remove and overwrite files withthe method                                         //copyFromLocalFile(booleandelSrc, boolean overwrite, Path src, Path dst)                                         fs.copyFromLocalFile(false, true, srcPath, dstPath);                                         LOG.info("File " + filename + "copied to " + dest);                                }catch(Exception e){                                         LOG.error("copyFromLocalFile exception caught!:" , e);                                         new RuntimeException(e);                                }finally{                                         fs.close();                                }                       }catch (IOException e1) {                                LOG.error("copyFromLocal IOException objectstream. :" ,e1);                                new RuntimeException(e1);                       }                       }                                     public void renameFile (String fromthis, String tothis){                       Configurationconf = new Configuration();                                               FileSystemfs;                       try {                                fs= FileSystem.get(conf);                                PathfromPath = new Path(fromthis);                                PathtoPath = new Path(tothis);                                                                 if (!(fs.exists(fromPath))) {                                         LOG.info("No such destination " + fromPath);                                    return;                                }                                                                 if (fs.exists(toPath)) {                                         LOG.info("Already exists! " + toPath);                                    return;                                }                                                                 try{                                         boolean isRenamed = fs.rename(fromPath,toPath);     //renames file name indeed.                                         if(isRenamed){                                                   LOG.info("Renamed from " + fromthis + " to " + tothis);                                         }                                }catch(Exception e){                                         LOG.error("renameFile Exception caught! :" , e);                                         new RuntimeException(e);                                }finally{                                         fs.close();                                }                       }catch (IOException e1) {                                LOG.error("fs Exception caught! :" , e1);                                new RuntimeException(e1);                       }             }                         /**             * Uploads or adds a file to HDFS             * @param source             * @param dest             */             public void addFile(String source, String dest)  {                                // Conf object will readthe HDFS configuration parameters                                Configurationconf = new Configuration();                                FileSystemfs;                                try {                                         fs= FileSystem.get(conf);                                         // Get the filename out of thefile path                                         Stringfilename = source.substring(source.lastIndexOf('/') + 1, source.length());                                                                                 // Create the destination pathincluding the filename.                                         if (dest.charAt(dest.length() - 1) != '/') {                                                   dest= dest + "/" + filename;                                         }else {                                                   dest= dest + filename;                                         }                                                                                 // Check if the file alreadyexists                                         Pathpath = new Path(dest);                                         if (fs.exists(path)) {                                                   LOG.error("File " + dest + " already exists");                                                   return;                                         }                                                                                 // Create a new file and writedata to it.                                         FSDataOutputStreamout = fs.create(path);                                         InputStreamin = new BufferedInputStream(new FileInputStream(                                                                                                                                                         new File(source)));                                                                                 byte[] b = new byte[1024];                                         int numBytes = 0;                                         //In this way read and write datato destination file.                                         while ((numBytes = in.read(b)) > 0) {                                                   out.write(b,0, numBytes);                                         }                                         in.close();                                         out.close();                                         fs.close();                                }catch (IOException e) {                                         LOG.error("addFile Exception caught! :" , e);                                         new RuntimeException(e);                                }             }                         /**             *Deletes the files if it is a directory.             * @param file             */             public void deleteFile(String file)  {                                Configurationconf = new Configuration();                                FileSystemfs;                                try {                                         fs= FileSystem.get(conf);                                                     Pathpath = new Path(file);                                         if (!fs.exists(path)) {                                                   LOG.info("File " + file + " does not exists");                                                   return;                                         }                                         /*                                         * recursively delete the file(s) if it is adirectory.                                         * If you want to mark the path that will bedeleted as                                         * a result of closing the FileSystem.                                         *  deleteOnExit(Path f)                                         */                                         fs.delete(new Path(file), true);                                         fs.close();                                }catch (IOException e) {                                         LOG.error("deleteFile Exception caught! :" , e);                                         new RuntimeException(e);                                }                            }             /**             * Gets the information about the file modifiedtime.             * @param source             * @throws IOException             */             public void getModificationTime(String source) throws IOException{                                               Configurationconf = new Configuration();                                               FileSystemfs = FileSystem.get(conf);                       PathsrcPath = new Path(source);                                               // Check if the file alreadyexists                       if (!(fs.exists(srcPath))) {                       System.out.println("No such destination " + srcPath);                       return;                       }                       // Get the filename out of thefile path                       Stringfilename = source.substring(source.lastIndexOf('/') + 1, source.length());                                               FileStatusfileStatus = fs.getFileStatus(srcPath);                       long modificationTime =fileStatus.getModificationTime();                                               LOG.info("modified datetime: " + System.out.format("File %s; Modification time : %0.2f%n",filename,modificationTime));                                     }                         /**             * Gets the file block location info             * @param source             * @throws IOException             */             public void getBlockLocations(String source) throws IOException{                       Configurationconf = new Configuration();                       FileSystemfs = FileSystem.get(conf);                       PathsrcPath = new Path(source);                                               // Check if the file alreadyexists                       if (!(ifExists(source))) {                                System.out.println("No such destination " + srcPath);                                return;                       }                       // Get the filename out of thefile path                       Stringfilename = source.substring(source.lastIndexOf('/') + 1, source.length());                                               FileStatusfileStatus = fs.getFileStatus(srcPath);                                               BlockLocation[]blkLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());                       int blkCount = blkLocations.length;                                               System.out.println("File :" + filename + "stored at:");                       for (int i=0; i < blkCount; i++) {                                String[]hosts = blkLocations[i].getHosts();                                LOG.info("host ip:" +System.out.format("Host %d: %s %n", i, hosts));                       }             }                         public void getHostnames () throws IOException{                       Configurationconfig = new Configuration();                                             FileSystemfs = FileSystem.get(config);                       DistributedFileSystemhdfs = (DistributedFileSystem) fs;                       DatanodeInfo[]dataNodeStats = hdfs.getDataNodeStats();                                               String[]names = new String[dataNodeStats.length];                       for (int i = 0; i < dataNodeStats.length; i++) {                                names[i]= dataNodeStats[i].getHostName();                                LOG.info("datenode hostname:"+(dataNodeStats[i].getHostName()));                       }             }             /**             * @param args             */             public static void main(String[] args) {                       SyncDFSdfs = new SyncDFS();                       dfs.list("/user/app");                                             dfs.mkdir("/user/app");                          //                dfs.readFile("/user/app/README.txt");                       LOG.info("--------------" +                                 dfs.ifExists("/user/warehouse/hbase.db/u_data/u.data")); //false                       LOG.info("--------------" + dfs.ifExists("/user/app/README.txt")); //true                                             //copied the local file(s) to thedfs.    //                dfs.copyFromLocal("/opt/test","/user/app");                                             //delete the file(s) from the dfs    //                dfs.deleteFile("/user/app/test");                       //rename diretory in dfs    //                dfs.renameFile("/user/app/test","/user/app/log");                       //rename file in dfs    //                dfs.renameFile("/user/app/log/derby.log","/user/app/log/derby_info.log");                                   }                     }  
</div> </div>