public OMClientResponse validateAndUpdateCache()

in hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java [186:360]


  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) {
    final long trxnLogIndex = context.getIndex();
    CreateKeyRequest createKeyRequest = getOmRequest().getCreateKeyRequest();

    KeyArgs keyArgs = createKeyRequest.getKeyArgs();
    Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);

    String volumeName = keyArgs.getVolumeName();
    String bucketName = keyArgs.getBucketName();
    String keyName = keyArgs.getKeyName();

    OMMetrics omMetrics = ozoneManager.getMetrics();
    omMetrics.incNumKeyAllocates();

    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
    OzoneLockStrategy ozoneLockStrategy = getOzoneLockStrategy(ozoneManager);
    OmKeyInfo omKeyInfo = null;
    final List< OmKeyLocationInfo > locations = new ArrayList<>();

    boolean acquireLock = false;
    OMClientResponse omClientResponse = null;
    OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
        getOmRequest());
    Exception exception = null;
    Result result = null;
    List<OmKeyInfo> missingParentInfos = null;
    int numMissingParents = 0;
    try {

      mergeOmLockDetails(
          ozoneLockStrategy.acquireWriteLock(omMetadataManager, volumeName,
              bucketName, keyName));
      acquireLock = getOmLockDetails().isLockAcquired();
      validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
      //TODO: We can optimize this get here, if getKmsProvider is null, then
      // bucket encryptionInfo will be not set. If this assumption holds
      // true, we can avoid get from bucket table.

      // Check if Key already exists
      String dbKeyName = omMetadataManager.getOzoneKey(volumeName, bucketName,
          keyName);
      OmKeyInfo dbKeyInfo = omMetadataManager.getKeyTable(getBucketLayout())
          .getIfExist(dbKeyName);
      validateAtomicRewrite(dbKeyInfo, keyArgs);

      OmBucketInfo bucketInfo =
          getBucketInfo(omMetadataManager, volumeName, bucketName);

      // If FILE_EXISTS we just override like how we used to do for Key Create.
      if (LOG.isDebugEnabled()) {
        LOG.debug("BucketName: {}, BucketLayout: {}",
            bucketInfo.getBucketName(), bucketInfo.getBucketLayout());
      }

      OMFileRequest.OMPathInfo pathInfo = null;

      if (bucketInfo.getBucketLayout()
          .shouldNormalizePaths(ozoneManager.getEnableFileSystemPaths())) {
        pathInfo = OMFileRequest.verifyFilesInPath(omMetadataManager,
            volumeName, bucketName, keyName, Paths.get(keyName));
        OMFileRequest.OMDirectoryResult omDirectoryResult =
            pathInfo.getDirectoryResult();

        // Check if a file or directory exists with same key name.
        if (omDirectoryResult == DIRECTORY_EXISTS) {
          throw new OMException("Cannot write to " +
              "directory. createIntermediateDirs behavior is enabled and " +
              "hence / has special interpretation: " + keyName, NOT_A_FILE);
        } else
          if (omDirectoryResult == FILE_EXISTS_IN_GIVENPATH) {
            throw new OMException("Can not create file: " + keyName +
                " as there is already file in the given path", NOT_A_FILE);
          }

        missingParentInfos = getAllParentInfo(ozoneManager, keyArgs,
            pathInfo.getMissingParents(), bucketInfo,
                pathInfo, trxnLogIndex);

        numMissingParents = missingParentInfos.size();
      }

      ReplicationConfig replicationConfig = OzoneConfigUtil
          .resolveReplicationConfigPreference(keyArgs.getType(),
              keyArgs.getFactor(), keyArgs.getEcReplicationConfig(),
              bucketInfo.getDefaultReplicationConfig(),
              ozoneManager);

      omKeyInfo = prepareKeyInfo(omMetadataManager, keyArgs, dbKeyInfo,
          keyArgs.getDataSize(), locations, getFileEncryptionInfo(keyArgs),
          ozoneManager.getPrefixManager(), bucketInfo, pathInfo, trxnLogIndex,
          ozoneManager.getObjectIdFromTxId(trxnLogIndex),
          replicationConfig, ozoneManager.getConfiguration());

      validateEncryptionKeyInfo(bucketInfo, keyArgs);

      long openVersion = omKeyInfo.getLatestVersionLocations().getVersion();
      long clientID = createKeyRequest.getClientID();
      String dbOpenKeyName = omMetadataManager.getOpenKey(volumeName,
          bucketName, keyName, clientID);

      // Append new blocks
      List<OmKeyLocationInfo> newLocationList = keyArgs.getKeyLocationsList()
          .stream().map(OmKeyLocationInfo::getFromProtobuf)
          .collect(Collectors.toList());
      omKeyInfo.appendNewBlocks(newLocationList, false);

      // Here we refer to the implementation of HDFS:
      // If the key size is 600MB, when createKey, keyLocationInfo in
      // keyLocationList is 3, and  the every pre-allocated block length is
      // 256MB. If the number of factor  is 3, the total pre-allocated block
      // ize is 256MB * 3 * 3. We will allocate more 256MB * 3 * 3 - 600mb * 3
      // = 504MB in advance, and we  will subtract this part when we finally
      // commitKey.
      long preAllocatedSpace = newLocationList.size()
          * ozoneManager.getScmBlockSize()
          * replicationConfig.getRequiredNodes();
      // check bucket and volume quota
      checkBucketQuotaInBytes(omMetadataManager, bucketInfo,
          preAllocatedSpace);
      checkBucketQuotaInNamespace(bucketInfo, numMissingParents + 1L);
      bucketInfo.incrUsedNamespace(numMissingParents);

      if (numMissingParents > 0) {
        // Add cache entries for the prefix directories.
        // Skip adding for the file key itself, until Key Commit.
        OMFileRequest.addKeyTableCacheEntries(omMetadataManager, volumeName,
            bucketName, bucketInfo.getBucketLayout(),
            null, missingParentInfos, trxnLogIndex);
      }

      // Add to cache entry can be done outside of lock for this openKey.
      // Even if bucket gets deleted, when commitKey we shall identify if
      // bucket gets deleted.
      omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
          dbOpenKeyName, omKeyInfo, trxnLogIndex);

      // Prepare response
      omResponse.setCreateKeyResponse(CreateKeyResponse.newBuilder()
          .setKeyInfo(omKeyInfo.getNetworkProtobuf(getOmRequest().getVersion(),
              keyArgs.getLatestVersionLocation()))
          .setID(clientID)
          .setOpenVersion(openVersion).build())
          .setCmdType(Type.CreateKey);
      omClientResponse = new OMKeyCreateResponse(omResponse.build(),
          omKeyInfo, missingParentInfos, clientID, bucketInfo.copyObject());

      result = Result.SUCCESS;
    } catch (IOException | InvalidPathException ex) {
      result = Result.FAILURE;
      exception = ex;
      omMetrics.incNumKeyAllocateFails();
      omResponse.setCmdType(Type.CreateKey);
      omClientResponse = new OMKeyCreateResponse(
          createErrorOMResponse(omResponse, exception), getBucketLayout());
    } finally {
      if (acquireLock) {
        mergeOmLockDetails(ozoneLockStrategy
            .releaseWriteLock(omMetadataManager, volumeName,
                bucketName, keyName));
      }
      if (omClientResponse != null) {
        omClientResponse.setOmLockDetails(getOmLockDetails());
      }
    }

    // Audit Log outside the lock
    markForAudit(ozoneManager.getAuditLogger(), buildAuditMessage(
        OMAction.ALLOCATE_KEY, auditMap, exception,
        getOmRequest().getUserInfo()));

    logResult(createKeyRequest, omMetrics, exception, result,
            numMissingParents);

    return omClientResponse;
  }