之前有文章介紹過HDFS租約帶來的問題,
導致spark應用無法正常讀取文件,
只能將異常檔找出並且刪除後,
任務才能繼續執行。
但是刪除檔實在是下下策,
而且檔本身其實並未損壞,
只是因為已經close的用戶端沒有及時的釋放租約導致。
按照Hadoop官網的說法,
HDFS會啟動一個單獨的執行緒,
專門處理未及時釋放的租約,
自動釋放超過“硬超時”(默認1小時)仍未釋放的租約,
但是從問題的現象上來看,
這個執行緒並沒有正常的工作,
甚至懷疑這個執行緒是否沒有啟動,
我使用的是CDH集群,
可能與相關的設置有關,
這一點需要確認。
如果Hadoop沒有自動清理租約,
我們有辦法手動的刷新租約嗎?答案是肯定的。
在網上查看資料時,
發現HDFS源碼中的DistributedFileSystem類提供了一個叫做recoverLease的方法,
可以主動的刷新租約。
但是非常奇怪,
既然已經為外界提供了這個介面,
為什麼不提供shell指令給使用者使用呢?為什麼只能通過代碼的方式調用呢?我使用的是hadoop-2.6.0,
也許後期的版本有所更新,
這一點也需要求證。
下面看一下這個方法的源碼:
/** * Start the lease recovery of a file * * @param f a file * @return true if the file is already closed * @throws IOException if an error occurs */ public boolean recoverLease(final Path f) throws IOException { Path absF = fixRelativePart(f); return new FileSystemLinkResolver
{ @Override public Boolean doCall(final Path p) throws IOException, UnresolvedLinkException { return dfs.recoverLease(getPathName(p)); } @Override public Boolean next(final FileSystem fs, final Path p) throws IOException { if (fs instanceof DistributedFileSystem) { DistributedFileSystem myDfs = (DistributedFileSystem)fs; return myDfs.recoverLease(p); } throw new UnsupportedOperationException("Cannot recoverLease through" + " a symlink to a non-DistributedFileSystem: " + f + " -> " + p); } }.resolve(this, absF); }有興趣的朋友可以下載hadoop源碼來仔細推敲一下內部的實現原理,
這裡我們只說如何調用,
解決我們的問題:
public static void recoverLease(String path) throws IOException { DistributedFileSystem fs = new DistributedFileSystem; Configuration conf = new Configuration; fs.initialize(URI.create(path), conf); fs.recoverLease(new Path(path)); fs.close; }這是我編寫的一個調用改介面的簡單的封裝方法,
需要注意的是,
此處傳入的path,
必須是包含檔案系統以及namenode和埠號的全路徑,
比如:
hdfs://namenode1:9000/xxx/xxx.log如果只需要恢復單個檔,
調用上述方法即可,
但是通常情況下,
我們需要對一個目錄進行遞迴的處理,
即恢復指定目錄下所有租約異常的檔。
這個時候,
我們需要先找出指定目錄下所有租約異常的檔,
形成一個Set或者List,
然後再遍歷這個容器,
對每個檔進行恢復。
尋找檔列表的方法如下:
public static Set getOpenforwriteFileList(String dir) throws IOException { /*拼接URL地址,
發送給namenode監聽的dfs.namenode.http-address埠,
獲取所需資料*/ StringBuilder url = new StringBuilder; url.append("/fsck?ugi=").append("dev"); url.append("&openforwrite=1"); /*獲得namenode的主機名稱以及dfs.namenode.http-address監聽埠,
例如:http://hadoopnode1:50070*/ Path dirpath; URI namenodeAddress; dirpath = HDFSUtil.getResolvedPath(dir); namenodeAddress = HDFSUtil.getDFSHttpAddress(dirpath); url.insert(0, namenodeAddress); try { url.append("&path=").append(URLEncoder.encode( Path.getPathWithoutSchemeAndAuthority(new Path(dir)).toString, "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace; } Configuration conf = new Configuration; URLConnectionFactory connectionFactory = URLConnectionFactory.newDefaultURLConnectionFactory(conf); URL path = null; try { path = new URL(url.toString); } catch (MalformedURLException e) { e.printStackTrace; } URLConnection connection; BufferedReader input = null; try { connection = connectionFactory.openConnection(path, UserGroupInformation.isSecurityEnabled); InputStream stream = connection.getInputStream; input = new BufferedReader(new InputStreamReader(stream, "UTF-8")); } catch (IOException | AuthenticationException e) { e.printStackTrace; } if (input == null) { System.err.println("Cannot get response from namenode, url = " + url); return null; } String line; Set resultSet = new HashSet<>; try { while ((line = input.readLine) != null) { if (line.contains("MISSING") || line.contains("OPENFORWRITE")) { String regEx = "/[^ ]*"; Pattern pattern = Pattern.compile(regEx); Matcher matcher = pattern.matcher(line); while (matcher.find) { resultSet.add(matcher.group.replaceAll(":", "")); } } } } catch (IOException e) { e.printStackTrace; } finally { input.close; } return resultSet; }其實獲取租約異常列表的方法是我從HDFS源碼的org.apache.hadoop.hdfs.tools.DFSck中仿照而來的,
通過向NameNode的dfs.namenode.http-address埠通信,
獲取openforwrite狀態的檔清單,
然後通過正則匹配以及字串切割,
獲取所需的內容。
順便提一句,
由於此代碼是Java代碼,
並且返回的Set類型為java.util.Set,
如果在Scala代碼中調用,
則需要將Set類型轉化為scala.collection.immutable.Set,
具體方法如下:
/*獲取需要被恢復租約的檔列表,
返回類型為java.util.Set*/ val javaFilesSet = HDFSUtil.getOpenforwriteFileList(hdfsPrefix + recoverDirPath) if (null == javaFilesSet || javaFilesSet.isEmpty) { println("No files need to recover lease : " + hdfsPrefix + recoverDirPath) return } /*將java.util.Set轉換成scala.collection.immutable.Set*/ import scala.collection.JavaConverters._ val filesSet = javaFilesSet.asScala.toSet至此,
利用以上兩個方法,
即可獲取指定目錄下的所有租約異常的檔列表,
然後遍歷調用租約恢復介面,
即可實現批量恢復。