private long applyEditLogOp()

in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java [386:1059]


  private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
      StartupOption startOpt, int logVersion, long lastInodeId) throws IOException {
    long inodeId = HdfsConstants.GRANDFATHER_INODE_ID;
    if (LOG.isTraceEnabled()) {
      LOG.trace("replaying edit log: " + op);
    }
    final boolean toAddRetryCache = fsNamesys.hasRetryCache() && op.hasRpcIds();

    switch (op.opCode) {
    case OP_ADD: {
      AddCloseOp addCloseOp = (AddCloseOp)op;
      final String path =
          renameReservedPathsOnUpgrade(addCloseOp.path, logVersion);
      if (FSNamesystem.LOG.isDebugEnabled()) {
        FSNamesystem.LOG.debug(op.opCode + ": " + path +
            " numblocks : " + addCloseOp.blocks.length +
            " clientHolder " + addCloseOp.clientName +
            " clientMachine " + addCloseOp.clientMachine);
      }
      // There are 3 cases here:
      // 1. OP_ADD to create a new file
      // 2. OP_ADD to update file blocks
      // 3. OP_ADD to open file for append (old append)

      // See if the file already exists (persistBlocks call)
      INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE);
      INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path, true);
      if (oldFile != null && addCloseOp.overwrite) {
        // This is OP_ADD with overwrite
        FSDirDeleteOp.deleteForEditLog(fsDir, iip, addCloseOp.mtime);
        iip = INodesInPath.replace(iip, iip.length() - 1, null);
        oldFile = null;
      }
      INodeFile newFile = oldFile;
      if (oldFile == null) { // this is OP_ADD on a new file (case 1)
        // versions > 0 support per file replication
        // get name and replication
        final short replication = fsNamesys.getBlockManager()
            .adjustReplication(addCloseOp.replication);
        assert addCloseOp.blocks.length == 0;

        // add to the file tree
        inodeId = getAndUpdateLastInodeId(addCloseOp.inodeId, logVersion, lastInodeId);
        newFile = FSDirWriteFileOp.addFileForEditLog(fsDir, inodeId,
            iip.getExistingINodes(), iip.getLastLocalName(),
            addCloseOp.permissions, addCloseOp.aclEntries,
            addCloseOp.xAttrs, replication, addCloseOp.mtime,
            addCloseOp.atime, addCloseOp.blockSize, true,
            addCloseOp.clientName, addCloseOp.clientMachine,
            addCloseOp.storagePolicyId, addCloseOp.erasureCodingPolicyId);
        assert newFile != null;
        iip = INodesInPath.replace(iip, iip.length() - 1, newFile);
        fsNamesys.leaseManager.addLease(addCloseOp.clientName, newFile.getId());

        // add the op into retry cache if necessary
        if (toAddRetryCache) {
          HdfsFileStatus stat =
              FSDirStatAndListingOp.createFileStatusForEditLog(fsDir, iip);
          fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
              addCloseOp.rpcCallId, stat);
        }
      } else { // This is OP_ADD on an existing file (old append)
        if (!oldFile.isUnderConstruction()) {
          // This is case 3: a call to append() on an already-closed file.
          if (FSNamesystem.LOG.isDebugEnabled()) {
            FSNamesystem.LOG.debug("Reopening an already-closed file " +
                "for append");
          }
          LocatedBlock lb = FSDirAppendOp.prepareFileForAppend(fsNamesys, iip,
              addCloseOp.clientName, addCloseOp.clientMachine, false, false,
              false);
          // add the op into retry cache if necessary
          if (toAddRetryCache) {
            HdfsFileStatus stat =
                FSDirStatAndListingOp.createFileStatusForEditLog(fsDir, iip);
            fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
                addCloseOp.rpcCallId, new LastBlockWithStatus(lb, stat));
          }
        }
      }
      // Fall-through for case 2.
      // Regardless of whether it's a new file or an updated file,
      // update the block list.
      
      // Update the salient file attributes.
      newFile.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID, false);
      newFile.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
      ErasureCodingPolicy ecPolicy =
          FSDirErasureCodingOp.unprotectedGetErasureCodingPolicy(
              fsDir.getFSNamesystem(), iip);
      updateBlocks(fsDir, addCloseOp, iip, newFile, ecPolicy);
      break;
    }
    case OP_CLOSE: {
      AddCloseOp addCloseOp = (AddCloseOp)op;
      final String path =
          renameReservedPathsOnUpgrade(addCloseOp.path, logVersion);
      if (FSNamesystem.LOG.isDebugEnabled()) {
        FSNamesystem.LOG.debug(op.opCode + ": " + path +
            " numblocks : " + addCloseOp.blocks.length +
            " clientHolder " + addCloseOp.clientName +
            " clientMachine " + addCloseOp.clientMachine);
      }

      final INodesInPath iip = fsDir.getINodesInPath(path, DirOp.READ);
      final INodeFile file = INodeFile.valueOf(iip.getLastINode(), path);

      // Update the salient file attributes.
      file.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID, false);
      file.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
      ErasureCodingPolicy ecPolicy =
          FSDirErasureCodingOp.unprotectedGetErasureCodingPolicy(
              fsDir.getFSNamesystem(), iip);
      updateBlocks(fsDir, addCloseOp, iip, file, ecPolicy);

      // Now close the file
      if (!file.isUnderConstruction() &&
          logVersion <= LayoutVersion.BUGFIX_HDFS_2991_VERSION) {
        // There was a bug (HDFS-2991) in hadoop < 0.23.1 where OP_CLOSE
        // could show up twice in a row. But after that version, this
        // should be fixed, so we should treat it as an error.
        throw new IOException(
            "File is not under construction: " + path);
      }
      // One might expect that you could use removeLease(holder, path) here,
      // but OP_CLOSE doesn't serialize the holder. So, remove the inode.
      if (file.isUnderConstruction()) {
        fsNamesys.getLeaseManager().removeLease(file.getId());
        file.toCompleteFile(file.getModificationTime(), 0,
            fsNamesys.getBlockManager().getMinReplication());
      }
      break;
    }
    case OP_APPEND: {
      AppendOp appendOp = (AppendOp) op;
      final String path = renameReservedPathsOnUpgrade(appendOp.path,
          logVersion);
      if (FSNamesystem.LOG.isDebugEnabled()) {
        FSNamesystem.LOG.debug(op.opCode + ": " + path +
            " clientName " + appendOp.clientName +
            " clientMachine " + appendOp.clientMachine +
            " newBlock " + appendOp.newBlock);
      }
      INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE);
      INodeFile file = INodeFile.valueOf(iip.getLastINode(), path);
      if (!file.isUnderConstruction()) {
        LocatedBlock lb = FSDirAppendOp.prepareFileForAppend(fsNamesys, iip,
            appendOp.clientName, appendOp.clientMachine, appendOp.newBlock,
            false, false);
        // add the op into retry cache if necessary
        if (toAddRetryCache) {
          HdfsFileStatus stat =
              FSDirStatAndListingOp.createFileStatusForEditLog(fsDir, iip);
          fsNamesys.addCacheEntryWithPayload(appendOp.rpcClientId,
              appendOp.rpcCallId, new LastBlockWithStatus(lb, stat));
        }
      }
      break;
    }
    case OP_UPDATE_BLOCKS: {
      UpdateBlocksOp updateOp = (UpdateBlocksOp)op;
      final String path =
          renameReservedPathsOnUpgrade(updateOp.path, logVersion);
      if (FSNamesystem.LOG.isDebugEnabled()) {
        FSNamesystem.LOG.debug(op.opCode + ": " + path +
            " numblocks : " + updateOp.blocks.length);
      }
      INodesInPath iip = fsDir.getINodesInPath(path, DirOp.READ);
      INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
      // Update in-memory data structures
      ErasureCodingPolicy ecPolicy =
          FSDirErasureCodingOp.unprotectedGetErasureCodingPolicy(
              fsDir.getFSNamesystem(), iip);
      updateBlocks(fsDir, updateOp, iip, oldFile, ecPolicy);

      if (toAddRetryCache) {
        fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId);
      }
      break;
    }
    case OP_ADD_BLOCK: {
      AddBlockOp addBlockOp = (AddBlockOp) op;
      String path = renameReservedPathsOnUpgrade(addBlockOp.getPath(), logVersion);
      if (FSNamesystem.LOG.isDebugEnabled()) {
        FSNamesystem.LOG.debug(op.opCode + ": " + path +
            " new block id : " + addBlockOp.getLastBlock().getBlockId());
      }
      INodesInPath iip = fsDir.getINodesInPath(path, DirOp.READ);
      INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
      // add the new block to the INodeFile
      ErasureCodingPolicy ecPolicy =
          FSDirErasureCodingOp.unprotectedGetErasureCodingPolicy(
              fsDir.getFSNamesystem(), iip);
      addNewBlock(addBlockOp, oldFile, ecPolicy);
      break;
    }
    case OP_SET_REPLICATION: {
      SetReplicationOp setReplicationOp = (SetReplicationOp)op;
      String src = renameReservedPathsOnUpgrade(
          setReplicationOp.path, logVersion);
      INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
      short replication = fsNamesys.getBlockManager().adjustReplication(
          setReplicationOp.replication);
      FSDirAttrOp.unprotectedSetReplication(fsDir, iip, replication);
      break;
    }
    case OP_CONCAT_DELETE: {
      ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op;
      String trg = renameReservedPathsOnUpgrade(concatDeleteOp.trg, logVersion);
      String[] srcs = new String[concatDeleteOp.srcs.length];
      for (int i=0; i<srcs.length; i++) {
        srcs[i] =
            renameReservedPathsOnUpgrade(concatDeleteOp.srcs[i], logVersion);
      }
      INodesInPath targetIIP = fsDir.getINodesInPath(trg, DirOp.WRITE);
      INodeFile[] srcFiles = new INodeFile[srcs.length];
      for (int i = 0; i < srcs.length; i++) {
        INodesInPath srcIIP = fsDir.getINodesInPath(srcs[i], DirOp.WRITE);
        srcFiles[i] = srcIIP.getLastINode().asFile();
      }
      FSDirConcatOp.unprotectedConcat(fsDir, targetIIP, srcFiles,
          concatDeleteOp.timestamp);
      
      if (toAddRetryCache) {
        fsNamesys.addCacheEntry(concatDeleteOp.rpcClientId,
            concatDeleteOp.rpcCallId);
      }
      break;
    }
    case OP_RENAME_OLD: {
      RenameOldOp renameOp = (RenameOldOp)op;
      final String src = renameReservedPathsOnUpgrade(renameOp.src, logVersion);
      final String dst = renameReservedPathsOnUpgrade(renameOp.dst, logVersion);
      FSDirRenameOp.renameForEditLog(fsDir, src, dst, renameOp.timestamp);
      
      if (toAddRetryCache) {
        fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId);
      }
      break;
    }
    case OP_DELETE: {
      DeleteOp deleteOp = (DeleteOp)op;
      final String src = renameReservedPathsOnUpgrade(
          deleteOp.path, logVersion);
      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE_LINK);
      FSDirDeleteOp.deleteForEditLog(fsDir, iip, deleteOp.timestamp);

      if (toAddRetryCache) {
        fsNamesys.addCacheEntry(deleteOp.rpcClientId, deleteOp.rpcCallId);
      }
      break;
    }
    case OP_MKDIR: {
      MkdirOp mkdirOp = (MkdirOp)op;
      inodeId = getAndUpdateLastInodeId(mkdirOp.inodeId, logVersion,
          lastInodeId);
      FSDirMkdirOp.mkdirForEditLog(fsDir, inodeId,
          renameReservedPathsOnUpgrade(mkdirOp.path, logVersion),
          mkdirOp.permissions, mkdirOp.aclEntries, mkdirOp.timestamp);
      break;
    }
    case OP_SET_GENSTAMP_V1: {
      SetGenstampV1Op setGenstampV1Op = (SetGenstampV1Op)op;
      blockManager.getBlockIdManager().setLegacyGenerationStamp(
          setGenstampV1Op.genStampV1);
      break;
    }
    case OP_SET_PERMISSIONS: {
      SetPermissionsOp setPermissionsOp = (SetPermissionsOp)op;
      final String src =
          renameReservedPathsOnUpgrade(setPermissionsOp.src, logVersion);
      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
      FSDirAttrOp.unprotectedSetPermission(fsDir, iip,
          setPermissionsOp.permissions);
      break;
    }
    case OP_SET_OWNER: {
      SetOwnerOp setOwnerOp = (SetOwnerOp)op;
      final String src = renameReservedPathsOnUpgrade(
          setOwnerOp.src, logVersion);
      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
      FSDirAttrOp.unprotectedSetOwner(fsDir, iip,
          setOwnerOp.username, setOwnerOp.groupname);
      break;
    }
    case OP_SET_NS_QUOTA: {
      SetNSQuotaOp setNSQuotaOp = (SetNSQuotaOp)op;
      final String src = renameReservedPathsOnUpgrade(
          setNSQuotaOp.src, logVersion);
      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
      FSDirAttrOp.unprotectedSetQuota(fsDir, iip,
          setNSQuotaOp.nsQuota, HdfsConstants.QUOTA_DONT_SET, null);
      break;
    }
    case OP_CLEAR_NS_QUOTA: {
      ClearNSQuotaOp clearNSQuotaOp = (ClearNSQuotaOp)op;
      final String src = renameReservedPathsOnUpgrade(
          clearNSQuotaOp.src, logVersion);
      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
      FSDirAttrOp.unprotectedSetQuota(fsDir, iip,
          HdfsConstants.QUOTA_RESET, HdfsConstants.QUOTA_DONT_SET, null);
      break;
    }
    case OP_SET_QUOTA: {
      SetQuotaOp setQuotaOp = (SetQuotaOp) op;
      final String src = renameReservedPathsOnUpgrade(
          setQuotaOp.src, logVersion);
      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
      FSDirAttrOp.unprotectedSetQuota(fsDir, iip,
          setQuotaOp.nsQuota, setQuotaOp.dsQuota, null);
      break;
    }
    case OP_SET_QUOTA_BY_STORAGETYPE: {
      FSEditLogOp.SetQuotaByStorageTypeOp setQuotaByStorageTypeOp =
          (FSEditLogOp.SetQuotaByStorageTypeOp) op;
      final String src = renameReservedPathsOnUpgrade(
          setQuotaByStorageTypeOp.src, logVersion);
      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
      FSDirAttrOp.unprotectedSetQuota(fsDir, iip,
          HdfsConstants.QUOTA_DONT_SET, setQuotaByStorageTypeOp.dsQuota,
          setQuotaByStorageTypeOp.type);
      break;
    }
    case OP_TIMES: {
      TimesOp timesOp = (TimesOp)op;
      final String src = renameReservedPathsOnUpgrade(
          timesOp.path, logVersion);
      final INodesInPath iip = fsDir.getINodesInPath(src, DirOp.WRITE);
      FSDirAttrOp.unprotectedSetTimes(fsDir, iip,
          timesOp.mtime, timesOp.atime, true);
      break;
    }
    case OP_SYMLINK: {
      if (!FileSystem.areSymlinksEnabled()) {
        throw new IOException("Symlinks not supported - please remove symlink before upgrading to this version of HDFS");
      }
      SymlinkOp symlinkOp = (SymlinkOp)op;
      inodeId = getAndUpdateLastInodeId(symlinkOp.inodeId, logVersion,
          lastInodeId);
      final String path = renameReservedPathsOnUpgrade(symlinkOp.path,
          logVersion);
      final INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE_LINK);
      FSDirSymlinkOp.unprotectedAddSymlink(fsDir, iip.getExistingINodes(),
          iip.getLastLocalName(), inodeId, symlinkOp.value, symlinkOp.mtime,
          symlinkOp.atime, symlinkOp.permissionStatus);
      
      if (toAddRetryCache) {
        fsNamesys.addCacheEntry(symlinkOp.rpcClientId, symlinkOp.rpcCallId);
      }
      break;
    }
    case OP_RENAME: {
      RenameOp renameOp = (RenameOp)op;
      FSDirRenameOp.renameForEditLog(fsDir,
          renameReservedPathsOnUpgrade(renameOp.src, logVersion),
          renameReservedPathsOnUpgrade(renameOp.dst, logVersion),
          renameOp.timestamp, renameOp.options);
      
      if (toAddRetryCache) {
        fsNamesys.addCacheEntry(renameOp.rpcClientId, renameOp.rpcCallId);
      }
      break;
    }
    case OP_GET_DELEGATION_TOKEN: {
      GetDelegationTokenOp getDelegationTokenOp
        = (GetDelegationTokenOp)op;

      fsNamesys.getDelegationTokenSecretManager()
        .addPersistedDelegationToken(getDelegationTokenOp.token,
                                     getDelegationTokenOp.expiryTime);
      break;
    }
    case OP_RENEW_DELEGATION_TOKEN: {
      RenewDelegationTokenOp renewDelegationTokenOp
        = (RenewDelegationTokenOp)op;
      fsNamesys.getDelegationTokenSecretManager()
        .updatePersistedTokenRenewal(renewDelegationTokenOp.token,
                                     renewDelegationTokenOp.expiryTime);
      break;
    }
    case OP_CANCEL_DELEGATION_TOKEN: {
      CancelDelegationTokenOp cancelDelegationTokenOp
        = (CancelDelegationTokenOp)op;
      fsNamesys.getDelegationTokenSecretManager()
          .updatePersistedTokenCancellation(
              cancelDelegationTokenOp.token);
      break;
    }
    case OP_UPDATE_MASTER_KEY: {
      UpdateMasterKeyOp updateMasterKeyOp = (UpdateMasterKeyOp)op;
      fsNamesys.getDelegationTokenSecretManager()
        .updatePersistedMasterKey(updateMasterKeyOp.key);
      break;
    }
    case OP_REASSIGN_LEASE: {
      ReassignLeaseOp reassignLeaseOp = (ReassignLeaseOp)op;

      Lease lease = fsNamesys.leaseManager.getLease(
          reassignLeaseOp.leaseHolder);
      final String path =
          renameReservedPathsOnUpgrade(reassignLeaseOp.path, logVersion);
      INodeFile pendingFile = fsDir.getINode(path, DirOp.READ).asFile();
      Preconditions.checkState(pendingFile.isUnderConstruction());
      fsNamesys.reassignLeaseInternal(lease, reassignLeaseOp.newHolder,
              pendingFile);
      break;
    }
    case OP_START_LOG_SEGMENT:
    case OP_END_LOG_SEGMENT: {
      // no data in here currently.
      break;
    }
    case OP_CREATE_SNAPSHOT: {
      CreateSnapshotOp createSnapshotOp = (CreateSnapshotOp) op;
      final String snapshotRoot =
          renameReservedPathsOnUpgrade(createSnapshotOp.snapshotRoot,
              logVersion);
      INodesInPath iip = fsDir.unprotectedResolvePath(snapshotRoot);
      String path = fsNamesys.getSnapshotManager().createSnapshot(
          fsDir.getFSNamesystem().getLeaseManager(),
          iip, snapshotRoot, createSnapshotOp.snapshotName,
          createSnapshotOp.mtime);
      if (toAddRetryCache) {
        fsNamesys.addCacheEntryWithPayload(createSnapshotOp.rpcClientId,
            createSnapshotOp.rpcCallId, path);
      }
      break;
    }
    case OP_DELETE_SNAPSHOT: {
      DeleteSnapshotOp deleteSnapshotOp = (DeleteSnapshotOp) op;
      BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
      List<INode> removedINodes = new ChunkedArrayList<INode>();
      final String snapshotRoot =
          renameReservedPathsOnUpgrade(deleteSnapshotOp.snapshotRoot,
              logVersion);
      INodesInPath iip = fsDir.unprotectedResolvePath(snapshotRoot);
      fsNamesys.getSnapshotManager().deleteSnapshot(iip,
          deleteSnapshotOp.snapshotName,
          new INode.ReclaimContext(fsNamesys.dir.getBlockStoragePolicySuite(),
              collectedBlocks, removedINodes, null), deleteSnapshotOp.mtime);
      fsNamesys.getBlockManager().removeBlocksAndUpdateSafemodeTotal(
          collectedBlocks);
      collectedBlocks.clear();
      fsNamesys.dir.removeFromInodeMap(removedINodes);
      removedINodes.clear();

      if (toAddRetryCache) {
        fsNamesys.addCacheEntry(deleteSnapshotOp.rpcClientId,
            deleteSnapshotOp.rpcCallId);
      }
      break;
    }
    case OP_RENAME_SNAPSHOT: {
      RenameSnapshotOp renameSnapshotOp = (RenameSnapshotOp) op;
      final String snapshotRoot =
          renameReservedPathsOnUpgrade(renameSnapshotOp.snapshotRoot,
              logVersion);
      INodesInPath iip = fsDir.unprotectedResolvePath(snapshotRoot);
      fsNamesys.getSnapshotManager().renameSnapshot(iip,
          snapshotRoot, renameSnapshotOp.snapshotOldName,
          renameSnapshotOp.snapshotNewName, renameSnapshotOp.mtime);
      
      if (toAddRetryCache) {
        fsNamesys.addCacheEntry(renameSnapshotOp.rpcClientId,
            renameSnapshotOp.rpcCallId);
      }
      break;
    }
    case OP_ALLOW_SNAPSHOT: {
      AllowSnapshotOp allowSnapshotOp = (AllowSnapshotOp) op;
      final String snapshotRoot =
          renameReservedPathsOnUpgrade(allowSnapshotOp.snapshotRoot, logVersion);
      fsNamesys.getSnapshotManager().setSnapshottable(
          snapshotRoot, false);
      break;
    }
    case OP_DISALLOW_SNAPSHOT: {
      DisallowSnapshotOp disallowSnapshotOp = (DisallowSnapshotOp) op;
      final String snapshotRoot =
          renameReservedPathsOnUpgrade(disallowSnapshotOp.snapshotRoot,
              logVersion);
      fsNamesys.getSnapshotManager().resetSnapshottable(
          snapshotRoot);
      break;
    }
    case OP_SET_GENSTAMP_V2: {
      SetGenstampV2Op setGenstampV2Op = (SetGenstampV2Op) op;
      // update the impending gen stamp, but not the actual genstamp,
      // see HDFS-14941
      blockManager.getBlockIdManager()
          .setImpendingGenerationStamp(setGenstampV2Op.genStampV2);
      break;
    }
    case OP_ALLOCATE_BLOCK_ID: {
      AllocateBlockIdOp allocateBlockIdOp = (AllocateBlockIdOp) op;
      if (BlockIdManager.isStripedBlockID(allocateBlockIdOp.blockId)) {
        // ALLOCATE_BLOCK_ID is added for sequential block id, thus if the id
        // is negative, it must belong to striped blocks
        blockManager.getBlockIdManager().setLastAllocatedStripedBlockId(
            allocateBlockIdOp.blockId);
      } else {
        blockManager.getBlockIdManager().setLastAllocatedContiguousBlockId(
            allocateBlockIdOp.blockId);
      }
      break;
    }
    case OP_ROLLING_UPGRADE_START: {
      if (startOpt == StartupOption.ROLLINGUPGRADE) {
        final RollingUpgradeStartupOption rollingUpgradeOpt
            = startOpt.getRollingUpgradeStartupOption(); 
        if (rollingUpgradeOpt == RollingUpgradeStartupOption.ROLLBACK) {
          throw new RollingUpgradeOp.RollbackException();
        }
      }
      // start rolling upgrade
      final long startTime = ((RollingUpgradeOp) op).getTime();
      fsNamesys.startRollingUpgradeInternal(startTime);
      fsNamesys.triggerRollbackCheckpoint();
      break;
    }
    case OP_ROLLING_UPGRADE_FINALIZE: {
      final long finalizeTime = ((RollingUpgradeOp) op).getTime();
      if (fsNamesys.isRollingUpgrade()) {
        // Only do it when NN is actually doing rolling upgrade.
        // We can get FINALIZE without corresponding START, if NN is restarted
        // before this op is consumed and a new checkpoint is created.
        fsNamesys.finalizeRollingUpgradeInternal(finalizeTime);
      }
      fsNamesys.getFSImage().updateStorageVersion();
      fsNamesys.getFSImage().renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK,
          NameNodeFile.IMAGE);
      fsNamesys.setNeedRollbackFsImage(false);
      break;
    }
    case OP_ADD_CACHE_DIRECTIVE: {
      AddCacheDirectiveInfoOp addOp = (AddCacheDirectiveInfoOp) op;
      CacheDirectiveInfo result = fsNamesys.
          getCacheManager().addDirectiveFromEditLog(addOp.directive);
      if (toAddRetryCache) {
        Long id = result.getId();
        fsNamesys.addCacheEntryWithPayload(op.rpcClientId, op.rpcCallId, id);
      }
      break;
    }
    case OP_MODIFY_CACHE_DIRECTIVE: {
      ModifyCacheDirectiveInfoOp modifyOp =
          (ModifyCacheDirectiveInfoOp) op;
      fsNamesys.getCacheManager().modifyDirectiveFromEditLog(
          modifyOp.directive);
      if (toAddRetryCache) {
        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
      }
      break;
    }
    case OP_REMOVE_CACHE_DIRECTIVE: {
      RemoveCacheDirectiveInfoOp removeOp =
          (RemoveCacheDirectiveInfoOp) op;
      fsNamesys.getCacheManager().removeDirective(removeOp.id, null);
      if (toAddRetryCache) {
        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
      }
      break;
    }
    case OP_ADD_CACHE_POOL: {
      AddCachePoolOp addOp = (AddCachePoolOp) op;
      fsNamesys.getCacheManager().addCachePool(addOp.info);
      if (toAddRetryCache) {
        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
      }
      break;
    }
    case OP_MODIFY_CACHE_POOL: {
      ModifyCachePoolOp modifyOp = (ModifyCachePoolOp) op;
      fsNamesys.getCacheManager().modifyCachePool(modifyOp.info);
      if (toAddRetryCache) {
        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
      }
      break;
    }
    case OP_REMOVE_CACHE_POOL: {
      RemoveCachePoolOp removeOp = (RemoveCachePoolOp) op;
      fsNamesys.getCacheManager().removeCachePool(removeOp.poolName);
      if (toAddRetryCache) {
        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
      }
      break;
    }
    case OP_SET_ACL: {
      SetAclOp setAclOp = (SetAclOp) op;
      INodesInPath iip = fsDir.getINodesInPath(setAclOp.src, DirOp.WRITE);
      FSDirAclOp.unprotectedSetAcl(fsDir, iip, setAclOp.aclEntries, true);
      break;
    }
    case OP_SET_XATTR: {
      SetXAttrOp setXAttrOp = (SetXAttrOp) op;
      INodesInPath iip = fsDir.getINodesInPath(setXAttrOp.src, DirOp.WRITE);
      FSDirXAttrOp.unprotectedSetXAttrs(fsDir, iip,
                                        setXAttrOp.xAttrs,
                                        EnumSet.of(XAttrSetFlag.CREATE,
                                                   XAttrSetFlag.REPLACE));
      if (toAddRetryCache) {
        fsNamesys.addCacheEntry(setXAttrOp.rpcClientId, setXAttrOp.rpcCallId);
      }
      break;
    }
    case OP_REMOVE_XATTR: {
      RemoveXAttrOp removeXAttrOp = (RemoveXAttrOp) op;
      INodesInPath iip = fsDir.getINodesInPath(removeXAttrOp.src, DirOp.WRITE);
      FSDirXAttrOp.unprotectedRemoveXAttrs(fsDir, iip,
                                           removeXAttrOp.xAttrs);
      if (toAddRetryCache) {
        fsNamesys.addCacheEntry(removeXAttrOp.rpcClientId,
            removeXAttrOp.rpcCallId);
      }
      break;
    }
    case OP_TRUNCATE: {
      TruncateOp truncateOp = (TruncateOp) op;
      INodesInPath iip = fsDir.getINodesInPath(truncateOp.src, DirOp.WRITE);
      FSDirTruncateOp.unprotectedTruncate(fsNamesys, iip,
          truncateOp.clientName, truncateOp.clientMachine,
          truncateOp.newLength, truncateOp.timestamp, truncateOp.truncateBlock);
      break;
    }
    case OP_SET_STORAGE_POLICY: {
      SetStoragePolicyOp setStoragePolicyOp = (SetStoragePolicyOp) op;
      final String path = renameReservedPathsOnUpgrade(setStoragePolicyOp.path,
          logVersion);
      final INodesInPath iip = fsDir.getINodesInPath(path, DirOp.WRITE);
      FSDirAttrOp.unprotectedSetStoragePolicy(
          fsDir, fsNamesys.getBlockManager(), iip,
          setStoragePolicyOp.policyId);
      break;
    }
    case OP_ADD_ERASURE_CODING_POLICY:
      AddErasureCodingPolicyOp addOp = (AddErasureCodingPolicyOp) op;
      fsNamesys.getErasureCodingPolicyManager().addPolicy(
          addOp.getEcPolicy());

      if (toAddRetryCache) {
        fsNamesys.addCacheEntryWithPayload(op.rpcClientId, op.rpcCallId,
            addOp.getEcPolicy());
      }
      break;
    case OP_ENABLE_ERASURE_CODING_POLICY:
      EnableErasureCodingPolicyOp enableOp = (EnableErasureCodingPolicyOp) op;
      fsNamesys.getErasureCodingPolicyManager().enablePolicy(
          enableOp.getEcPolicy());
      if (toAddRetryCache) {
        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
      }
      break;
    case OP_DISABLE_ERASURE_CODING_POLICY:
      DisableErasureCodingPolicyOp disableOp =
          (DisableErasureCodingPolicyOp) op;
      fsNamesys.getErasureCodingPolicyManager().disablePolicy(
          disableOp.getEcPolicy());
      if (toAddRetryCache) {
        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
      }
      break;
    case OP_REMOVE_ERASURE_CODING_POLICY:
      RemoveErasureCodingPolicyOp removeOp = (RemoveErasureCodingPolicyOp) op;
      fsNamesys.getErasureCodingPolicyManager().removePolicy(
          removeOp.getEcPolicy());
      if (toAddRetryCache) {
        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
      }
      break;
    default:
      throw new IOException("Invalid operation read " + op.opCode);
    }
    return inodeId;
  }