master.scala中的completeRecovery方法:
/* * 完成Master的主备切换 */ def completeRecovery() { // Ensure "only-once" recovery semantics using a short synchronization period. synchronized { if (state != RecoveryState.RECOVERING) { return } state = RecoveryState.COMPLETING_RECOVERY } /* * 将Application和worker,过滤出来目前的状态还是UNKNOW的 * 然后遍历,分别调用removeWorker和finishApplication方法, * 对可能已经出故障,或者甚至已经死掉的Application和Worker,进行清理 */ // Kill off any workers and apps that didn't respond to us. workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker) apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication) // Reschedule drivers which were not claimed by any workers drivers.filter(_.worker.isEmpty).foreach { d => logWarning(s"Driver ${d.id} was not found after master recovery") if (d.desc.supervise) { logWarning(s"Re-launching ${d.id}") relaunchDriver(d) } else { removeDriver(d.id, DriverState.ERROR, None) logWarning(s"Did not re-launch ${d.id} because it was not supervised") } } state = RecoveryState.ALIVE schedule() logInfo("Recovery complete - resuming operations!") }