最近在重新研读HDFS QJM的细节实现,所谓“温故而知新”,感觉还是收获不少。之前笔者曾简单地翻译过HDFS QJM的设计文档,感兴趣的同学可以点此链接:HDFS QJM的架构设计。本文笔者打算挑选其中的一些细节要点,进程阐述。
在HDFS QJM出现之前,editlog的一种推荐存储方式是基于NAS网络存储设备。这种方式会带来一些局限性:
对于特定硬件的要求。部署操作的复杂性。这么做的一个理由是它可以做到一定的高可用性。但是它的劣势是大于它所带来的好处的,于是社区提出了在软件层面来做这样的事,提出了以下3点基本要求:
没有特定硬件的要求。软件层Fencing机理的实现。(Fencing的操作在这里是为了防止早些的writer对象进行写操作)。无单点问题,所有editlog都是完全高可用的。QJM的全称是Quorum Journal Manager,管理的节点为JournalNode,NameNode往这些JournalNode上读/写editlog信息。在每次的写操作过程中,这些信息会发送到所有的JournalNode节点中,关键的一点是,它并不需要要求所有节点成功的回复信息,只需要多数以上(这里指半数以上)的成功信息即可。这就是Quorum原理的核心所在。
QuorumJournalManager内部控制editlog的写入步骤如下:
1.停止之前的writer对象。当前写editlog的writer对象在写的时候,必须保证之前的writer对象没有在写editlog信息。 2.恢复上一次未写入完全的editlog信息内容。因为存在一种可能性,早期的writer对象存在写失败的可能性,造成各个JournalNode上写入的editlog的内容长度不同,这里需要做一个数据同步恢复。这点会在下文中具体分析到。 3.开始写入一个新的editlog数据片段。 4.写入editlog信息片段。 5.Finalize(确认)editlog信息成功写入。只要步骤4中的执行结果在多数JournalNode中为成功即可。
下面我们来看其中的2个关键步骤的细节实现。
这个步骤的目的,笔者已经阐述过,是为了防止早期的writer对象继续写入editlog信息,造成脏数据的写入。那么问题来了,QJM内部是如何实现这点的呢?答案如下。
每个writer对象变为活跃的时候,QJM为每个writer对象分配一个唯一的epoch数字,这个数字是单调递增的,在QJM的写入过程中,不允许epoch数字小于当前epoch数字的writer对象写入数据。
所以,从上面的内容来看,这里提到了一个重要的变量:epoch数字。我们可以简单理解为“迭代轮次”的概念,比如说第一轮,第二轮,第三轮。。 但是在每次epoch数字的迭代增加中,是有一番讲究的,里面会涉及到协商选择的,细节步骤如下:
a.首先QJM会发送一条请求消息(getJournalState()请求),去获取每个JournalNode上当前的epoch数字值。这个值在JournalNode上被保存在名为lastPromisedEpoch的变量里。 b.QJM收到所有JournalNode返回来的epoch数字后,取出其中的最大值,然后在此值上加1,以此作为新的epoch值。 c.QJM以新的epoch值为内容,向各JournalNode发送newEpoch请求,每个JournalNode收到此请求时,比较自身保存的epoch值(即本地lastPromisedEpoch变量),如果比此值大,则更新本地值。
本地epoch值更新后,会在每次的写操作中被用作其中的一个检测条件,代码如下:
private synchronized void checkRequest(RequestInfo reqInfo) throws IOException { // 判断当前请求信息的epoch值是否小于当前的epoch值,如果是则抛出异常 if (reqInfo.getEpoch() < lastPromisedEpoch.get()) { throw new IOException("IPC's epoch " + reqInfo.getEpoch() + " is less than the last promised epoch " + lastPromisedEpoch.get()); } else if (reqInfo.getEpoch() > lastPromisedEpoch.get()) { // 否则用新的epoch值更新当前epoch值 updateLastPromisedEpoch(reqInfo.getEpoch()); } ... }在这里,我们对照代码再来看看QJM(QuorumJournalManager类)内部新的epoch值的生成过程:
Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch() throws IOException { Preconditions.checkState(!loggers.isEpochEstablished(), "epoch already created"); // 步骤1:向所有JournalNode发送getJournalState请求,获取它们的epoch值 Map<AsyncLogger, GetJournalStateResponseProto> lastPromises = loggers.waitForWriteQuorum(loggers.getJournalState(), getJournalStateTimeoutMs, "getJournalState()"); long maxPromised = Long.MIN_VALUE; // 步骤2:从这些epoch值中选出最大的值 for (GetJournalStateResponseProto resp : lastPromises.values()) { maxPromised = Math.max(maxPromised, resp.getLastPromisedEpoch()); } assert maxPromised >= 0; // 新的epoch值在最大的值基础上递增1 long myEpoch = maxPromised + 1; // 步骤3:向JournalNode节点发送新的epoch值 Map<AsyncLogger, NewEpochResponseProto> resps = loggers.waitForWriteQuorum(loggers.newEpoch(nsInfo, myEpoch), newEpochTimeoutMs, "newEpoch(" + myEpoch + ")"); loggers.setEpoch(myEpoch); return resps; }这部分小节内容即上面的步骤2。社区在这块可是做了相当详细的设计,毕竟数据失败,延时这类的情况在普通硬件中是极有可能发生的,所以我们要在软件层做这样的容错处理。
在QJM中,引入了lastWriterEpoch来保存最近一次writer写入对象的epoch值,定义如下:
/** * The epoch number of the last writer to actually write a transaction. * This is used to differentiate log segments after a crash at the very * beginning of a segment. See the the 'testNewerVersionOfSegmentWins' * test case. */ private PersistentLongFile lastWriterEpoch;此变量在每次开始写入新的数据片段时会被更新,
public synchronized void startLogSegment(RequestInfo reqInfo, long txid, int layoutVersion) throws IOException { assert fjm != null; checkFormatted(); checkRequest(reqInfo); ... long curLastWriterEpoch = lastWriterEpoch.get(); // 更新当前最新writer值epoch值 if (curLastWriterEpoch == reqInfo.getEpoch()) { LOG.info("Updating lastWriterEpoch from " + curLastWriterEpoch + " to " + reqInfo.getEpoch() + " for client " + Server.getRemoteIp()); lastWriterEpoch.set(reqInfo.getEpoch()); } ... }然后在每次的写操作中,会进行epoch值的检查,
private synchronized void checkWriteRequest(RequestInfo reqInfo) throws IOException { checkRequest(reqInfo); if (reqInfo.getEpoch() != lastWriterEpoch.get()) { throw new IOException("IPC's epoch " + reqInfo.getEpoch() + " is not the current writer epoch " + lastWriterEpoch.get()); } }重新回到本小节前面提到的失败情况,举个例子,比如一个JournalNode节点突然crash了,其上的editlog就会出现落后的情况,当它重新启动的时候,就会从其它正常节点上同步好数据。下面我们来看QJM内部是如何执行的,主要分为以下几步:
步骤1.决定哪个数据片段需要去恢复。这个步骤是紧接着newEpoch请求的,在向每个JournalNode节点发送newEpoch请求收到回复后,比较得出其中最大的数据片段id(事务ID),可意为写入的最新的数据段。 步骤2.向JournalNode发送PrepareRecovery RPC请求。PrepareRecovery请求是为了告诉JournalNode准备针对指定事务id,进行数据恢复。PrepareRecovery请求的返回信息为当前各个JournalNode上的给定事务id内容的信息。因为每个JournalNode上对于指定待恢复的数据片段,可能会存在数据内容不一致的情况。 步骤3.在获取针对给定事务id的数据片段信息后,QJM会针对各种情况对此选择一个理想的数据恢复源。 步骤4.QJM将需要同步的数据和数据源地址封装到AcceptRecovery RPC请求中,发送给各个JournalNode用于数据恢复。 步骤5.确认数据段恢复成功。
下面我们针对代码,对照上面的步骤实现:
public void recoverUnfinalizedSegments() throws IOException { Preconditions.checkState(!isActiveWriter, "already active writer"); LOG.info("Starting recovery process for unclosed journal segments..."); // 步骤1-1.发送newEpoch请求 Map<AsyncLogger, NewEpochResponseProto> resps = createNewUniqueEpoch(); LOG.info("Successfully started new epoch " + loggers.getEpoch()); if (LOG.isDebugEnabled()) { LOG.debug("newEpoch(" + loggers.getEpoch() + ") responses:\n" + QuorumCall.mapToString(resps)); } // 步骤1-2.从NewEpoch请求回复内容获取最大的txId,以此作为待恢复的数据片段 long mostRecentSegmentTxId = Long.MIN_VALUE; for (NewEpochResponseProto r : resps.values()) { if (r.hasLastSegmentTxId()) { mostRecentSegmentTxId = Math.max(mostRecentSegmentTxId, r.getLastSegmentTxId()); } } // On a completely fresh system, none of the journals have any // segments, so there's nothing to recover. if (mostRecentSegmentTxId != Long.MIN_VALUE) { // 开始进行数据片段的恢复 recoverUnclosedSegment(mostRecentSegmentTxId); } isActiveWriter = true; }进入recoverUnclosedSegment方法,继续阅读接下来步骤的相关代码:
private void recoverUnclosedSegment(long segmentTxId) throws IOException { Preconditions.checkArgument(segmentTxId > 0); LOG.info("Beginning recovery of unclosed segment starting at txid " + segmentTxId); // 步骤2.发送PrepareRecovery请求 QuorumCall<AsyncLogger,PrepareRecoveryResponseProto> prepare = loggers.prepareRecovery(segmentTxId); Map<AsyncLogger, PrepareRecoveryResponseProto> prepareResponses= loggers.waitForWriteQuorum(prepare, prepareRecoveryTimeoutMs, "prepareRecovery(" + segmentTxId + ")"); LOG.info("Recovery prepare phase complete. Responses:\n" + QuorumCall.mapToString(prepareResponses)); // 根据返回结果,选择其中最优的JournalNode上的数据为数据源,里面会涉及到各种情况的比较, // 具体比较逻辑在SegmentRecoveryComparator比较器中 Entry<AsyncLogger, PrepareRecoveryResponseProto> bestEntry = Collections.max( prepareResponses.entrySet(), SegmentRecoveryComparator.INSTANCE); AsyncLogger bestLogger = bestEntry.getKey(); PrepareRecoveryResponseProto bestResponse = bestEntry.getValue(); ... SegmentStateProto logToSync = bestResponse.getSegmentState(); assert segmentTxId == logToSync.getStartTxId(); // Sanity check: none of the loggers should be aware of a higher // txid than the txid we intend to truncate to for (Map.Entry<AsyncLogger, PrepareRecoveryResponseProto> e : prepareResponses.entrySet()) { AsyncLogger logger = e.getKey(); PrepareRecoveryResponseProto resp = e.getValue(); if (resp.hasLastCommittedTxId() && resp.getLastCommittedTxId() > logToSync.getEndTxId()) { throw new AssertionError("Decided to synchronize log to " + logToSync + " but logger " + logger + " had seen txid " + resp.getLastCommittedTxId() + " committed"); } } URL syncFromUrl = bestLogger.buildURLToFetchLogs(segmentTxId); // 步骤4.给定同步的地址,执行恢复操作 QuorumCall<AsyncLogger,Void> accept = loggers.acceptRecovery(logToSync, syncFromUrl); loggers.waitForWriteQuorum(accept, acceptRecoveryTimeoutMs, "acceptRecovery(" + TextFormat.shortDebugString(logToSync) + ")"); // 步骤5.确认数据的恢复 QuorumCall<AsyncLogger, Void> finalize = loggers.finalizeLogSegment(logToSync.getStartTxId(), logToSync.getEndTxId()); loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs, String.format("finalizeLogSegment(%s-%s)", logToSync.getStartTxId(), logToSync.getEndTxId())); }以上就是对于HDFS QJM的简单分析,笔者只是选取了个人认为比较重要的部分,细节内容读者朋友可阅读QJM的设计文档。
[1].https://issues.apache.org/jira/browse/HDFS-3077. Quorum-based protocol for reading and writing edit logs
