

之前有文章介紹過HDFS租約帶來的問題, 導致spark應用無法正常讀取文件, 只能將異常檔找出並且刪除後, 任務才能繼續執行。

但是刪除檔實在是下下策, 而且檔本身其實並未損壞, 只是因為已經close的用戶端沒有及時的釋放租約導致。

按照Hadoop官網的說法, HDFS會啟動一個單獨的執行緒, 專門處理未及時釋放的租約, 自動釋放超過“硬超時”(默認1小時)仍未釋放的租約, 但是從問題的現象上來看, 這個執行緒並沒有正常的工作, 甚至懷疑這個執行緒是否沒有啟動, 我使用的是CDH集群, 可能與相關的設置有關, 這一點需要確認。

如果Hadoop沒有自動清理租約, 我們有辦法手動的刷新租約嗎?答案是肯定的。

在網上查看資料時, 發現HDFS源碼中的DistributedFileSystem類提供了一個叫做recoverLease的方法, 可以主動的刷新租約。 但是非常奇怪, 既然已經為外界提供了這個介面, 為什麼不提供shell指令給使用者使用呢?為什麼只能通過代碼的方式調用呢?我使用的是hadoop-2.6.0, 也許後期的版本有所更新, 這一點也需要求證。


/** * Start the lease recovery of a file * * @param f a file * @return true if the file is already closed * @throws IOException if an error occurs */ public boolean recoverLease(final Path f) throws IOException { Path absF = fixRelativePart(f); return new FileSystemLinkResolver { @Override public Boolean doCall(final Path p) throws IOException, UnresolvedLinkException { return dfs.recoverLease(getPathName(p)); } @Override public Boolean next(final FileSystem fs, final Path p) throws IOException { if (fs instanceof DistributedFileSystem) { DistributedFileSystem myDfs = (DistributedFileSystem)fs; return myDfs.recoverLease(p); } throw new UnsupportedOperationException("Cannot recoverLease through" + " a symlink to a non-DistributedFileSystem: " + f + " -> " + p); } }.resolve(this, absF); }

有興趣的朋友可以下載hadoop源碼來仔細推敲一下內部的實現原理, 這裡我們只說如何調用, 解決我們的問題:

public static void recoverLease(String path) throws IOException { DistributedFileSystem fs = new DistributedFileSystem; Configuration conf = new Configuration; fs.initialize(URI.create(path), conf); fs.recoverLease(new Path(path)); fs.close; }

這是我編寫的一個調用改介面的簡單的封裝方法, 需要注意的是, 此處傳入的path, 必須是包含檔案系統以及namenode和埠號的全路徑, 比如:



調用上述方法即可, 但是通常情況下, 我們需要對一個目錄進行遞迴的處理, 即恢復指定目錄下所有租約異常的檔。

這個時候, 我們需要先找出指定目錄下所有租約異常的檔, 形成一個Set或者List, 然後再遍歷這個容器, 對每個檔進行恢復。


public static Set getOpenforwriteFileList(String dir) throws IOException { /*拼接URL地址, 發送給namenode監聽的dfs.namenode.http-address埠, 獲取所需資料*/ StringBuilder url = new StringBuilder; url.append("/fsck?ugi=").append("dev"); url.append("&openforwrite=1"); /*獲得namenode的主機名稱以及dfs.namenode.http-address監聽埠, 例如:http://hadoopnode1:50070*/ Path dirpath; URI namenodeAddress; dirpath = HDFSUtil.getResolvedPath(dir); namenodeAddress = HDFSUtil.getDFSHttpAddress(dirpath); url.insert(0, namenodeAddress); try { url.append("&path=").append(URLEncoder.encode( Path.getPathWithoutSchemeAndAuthority(new Path(dir)).toString, "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace; } Configuration conf = new Configuration; URLConnectionFactory connectionFactory = URLConnectionFactory.newDefaultURLConnectionFactory(conf); URL path = null; try { path = new URL(url.toString); } catch (MalformedURLException e) { e.printStackTrace; } URLConnection connection; BufferedReader input = null; try { connection = connectionFactory.openConnection(path, UserGroupInformation.isSecurityEnabled); InputStream stream = connection.getInputStream; input = new BufferedReader(new InputStreamReader(stream, "UTF-8")); } catch (IOException | AuthenticationException e) { e.printStackTrace; } if (input == null) { System.err.println("Cannot get response from namenode, url = " + url); return null; } String line; Set resultSet = new HashSet<>; try { while ((line = input.readLine) != null) { if (line.contains("MISSING") || line.contains("OPENFORWRITE")) { String regEx = "/[^ ]*"; Pattern pattern = Pattern.compile(regEx); Matcher matcher = pattern.matcher(line); while (matcher.find) { resultSet.add(matcher.group.replaceAll(":", "")); } } } } catch (IOException e) { e.printStackTrace; } finally { input.close; } return resultSet; }

其實獲取租約異常列表的方法是我從HDFS源碼的org.apache.hadoop.hdfs.tools.DFSck中仿照而來的, 通過向NameNode的dfs.namenode.http-address埠通信, 獲取openforwrite狀態的檔清單, 然後通過正則匹配以及字串切割, 獲取所需的內容。

順便提一句, 由於此代碼是Java代碼, 並且返回的Set類型為java.util.Set, 如果在Scala代碼中調用, 則需要將Set類型轉化為scala.collection.immutable.Set, 具體方法如下:

返回類型為java.util.Set*/ val javaFilesSet = HDFSUtil.getOpenforwriteFileList(hdfsPrefix + recoverDirPath) if (null == javaFilesSet || javaFilesSet.isEmpty) { println("No files need to recover lease : " + hdfsPrefix + recoverDirPath) return } /*將java.util.Set轉換成scala.collection.immutable.Set*/ import scala.collection.JavaConverters._ val filesSet = javaFilesSet.asScala.toSet

至此, 利用以上兩個方法, 即可獲取指定目錄下的所有租約異常的檔列表, 然後遍歷調用租約恢復介面, 即可實現批量恢復。

