in hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java [133:422]
public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) {
final long trxnLogIndex = context.getIndex();
CommitKeyRequest commitKeyRequest = getOmRequest().getCommitKeyRequest();
KeyArgs commitKeyArgs = commitKeyRequest.getKeyArgs();
String volumeName = commitKeyArgs.getVolumeName();
String bucketName = commitKeyArgs.getBucketName();
String keyName = commitKeyArgs.getKeyName();
OMMetrics omMetrics = ozoneManager.getMetrics();
AuditLogger auditLogger = ozoneManager.getAuditLogger();
Map<String, String> auditMap = buildKeyArgsAuditMap(commitKeyArgs);
OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
getOmRequest());
Exception exception = null;
OmKeyInfo omKeyInfo = null;
OmBucketInfo omBucketInfo = null;
OMClientResponse omClientResponse = null;
boolean bucketLockAcquired = false;
Result result;
OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
boolean isHSync = commitKeyRequest.hasHsync() && commitKeyRequest.getHsync();
boolean isRecovery = commitKeyRequest.hasRecovery() && commitKeyRequest.getRecovery();
// isHsync = true, a commit request as a result of client side hsync call
// isRecovery = true, a commit request as a result of client side recoverLease call
// none of isHsync and isRecovery is true, a commit request as a result of client side normal
// outputStream#close call.
if (isHSync) {
omMetrics.incNumKeyHSyncs();
} else {
omMetrics.incNumKeyCommits();
}
LOG.debug("isHSync = {}, isRecovery = {}, volumeName = {}, bucketName = {}, keyName = {}",
isHSync, isRecovery, volumeName, bucketName, keyName);
try {
String dbOzoneKey =
omMetadataManager.getOzoneKey(volumeName, bucketName, keyName);
List<OmKeyLocationInfo>
locationInfoList = getOmKeyLocationInfos(ozoneManager, commitKeyArgs);
mergeOmLockDetails(
omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName,
bucketName));
bucketLockAcquired = getOmLockDetails().isLockAcquired();
validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
omBucketInfo = getBucketInfo(omMetadataManager, volumeName, bucketName);
// Check for directory exists with same name, if it exists throw error.
if (LOG.isDebugEnabled()) {
LOG.debug("BucketName: {}, BucketLayout: {}",
omBucketInfo.getBucketName(), omBucketInfo.getBucketLayout());
}
if (omBucketInfo.getBucketLayout()
.shouldNormalizePaths(ozoneManager.getEnableFileSystemPaths())) {
if (checkDirectoryAlreadyExists(volumeName, bucketName, keyName,
omMetadataManager)) {
throw new OMException("Can not create file: " + keyName +
" as there is already directory in the given path", NOT_A_FILE);
}
// Ensure the parent exist.
if (!"".equals(OzoneFSUtils.getParent(keyName))
&& !checkDirectoryAlreadyExists(volumeName, bucketName,
OzoneFSUtils.getParent(keyName), omMetadataManager)) {
throw new OMException("Cannot create file : " + keyName
+ " as parent directory doesn't exist",
OMException.ResultCodes.DIRECTORY_NOT_FOUND);
}
}
// If bucket versioning is turned on during the update, between key
// creation and key commit, old versions will be just overwritten and
// not kept. Bucket versioning will be effective from the first key
// creation after the knob turned on.
OmKeyInfo keyToDelete =
omMetadataManager.getKeyTable(getBucketLayout()).get(dbOzoneKey);
long writerClientId = commitKeyRequest.getClientID();
boolean isSameHsyncKey = false;
boolean isOverwrittenHsyncKey = false;
final String clientIdString = String.valueOf(writerClientId);
if (null != keyToDelete) {
isSameHsyncKey = java.util.Optional.of(keyToDelete)
.map(WithMetadata::getMetadata)
.map(meta -> meta.get(OzoneConsts.HSYNC_CLIENT_ID))
.filter(id -> id.equals(clientIdString))
.isPresent();
if (!isSameHsyncKey) {
isOverwrittenHsyncKey = java.util.Optional.of(keyToDelete)
.map(WithMetadata::getMetadata)
.map(meta -> meta.get(OzoneConsts.HSYNC_CLIENT_ID))
.filter(id -> !id.equals(clientIdString))
.isPresent() && !isRecovery;
}
}
if (isRecovery && keyToDelete != null) {
String clientId = keyToDelete.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID);
if (clientId == null) {
throw new OMException("Failed to recovery key, as " +
dbOzoneKey + " is already closed", KEY_ALREADY_CLOSED);
}
writerClientId = Long.parseLong(clientId);
}
String dbOpenKey = omMetadataManager.getOpenKey(volumeName, bucketName,
keyName, writerClientId);
omKeyInfo =
omMetadataManager.getOpenKeyTable(getBucketLayout()).get(dbOpenKey);
if (omKeyInfo == null) {
String action = isRecovery ? "recovery" : isHSync ? "hsync" : "commit";
throw new OMException("Failed to " + action + " key, as " + dbOpenKey +
" entry is not found in the OpenKey table", KEY_NOT_FOUND);
} else if (omKeyInfo.getMetadata().containsKey(OzoneConsts.DELETED_HSYNC_KEY) ||
omKeyInfo.getMetadata().containsKey(OzoneConsts.OVERWRITTEN_HSYNC_KEY)) {
throw new OMException("Open Key " + keyName + " is already deleted/overwritten",
KEY_NOT_FOUND);
}
if (omKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY) &&
omKeyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID)) {
if (!isRecovery) {
throw new OMException("Cannot commit key " + dbOpenKey + " with " + OzoneConsts.LEASE_RECOVERY +
" metadata while recovery flag is not set in request", KEY_UNDER_LEASE_RECOVERY);
}
}
OmKeyInfo openKeyToDelete = null;
String dbOpenKeyToDeleteKey = null;
if (isOverwrittenHsyncKey) {
// find the overwritten openKey and add OVERWRITTEN_HSYNC_KEY to it.
dbOpenKeyToDeleteKey = omMetadataManager.getOpenKey(volumeName, bucketName,
keyName, Long.parseLong(keyToDelete.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID)));
openKeyToDelete = omMetadataManager.getOpenKeyTable(getBucketLayout()).get(dbOpenKeyToDeleteKey);
openKeyToDelete.getMetadata().put(OzoneConsts.OVERWRITTEN_HSYNC_KEY, "true");
openKeyToDelete.setModificationTime(Time.now());
openKeyToDelete.setUpdateID(trxnLogIndex);
omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
dbOpenKeyToDeleteKey, openKeyToDelete, trxnLogIndex);
}
omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());
// non-null indicates it is necessary to update the open key
OmKeyInfo newOpenKeyInfo = null;
if (isHSync) {
if (!OmKeyHSyncUtil.isHSyncedPreviously(omKeyInfo, clientIdString, dbOpenKey)) {
// Update open key as well if it is the first hsync of this key
omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID, clientIdString);
newOpenKeyInfo = omKeyInfo.copyObject();
}
}
validateAtomicRewrite(keyToDelete, omKeyInfo, auditMap);
// Optimistic locking validation has passed. Now set the rewrite fields to null so they are
// not persisted in the key table.
omKeyInfo.setExpectedDataGeneration(null);
omKeyInfo.getMetadata().putAll(KeyValueUtil.getFromProtobuf(
commitKeyArgs.getMetadataList()));
omKeyInfo.setDataSize(commitKeyArgs.getDataSize());
// Update the block length for each block, return the allocated but
// uncommitted blocks
List<OmKeyLocationInfo> uncommitted =
omKeyInfo.updateLocationInfoList(locationInfoList, false);
// Set the UpdateID to current transactionLogIndex
omKeyInfo.setUpdateID(trxnLogIndex);
Map<String, RepeatedOmKeyInfo> oldKeyVersionsToDeleteMap = null;
long correctedSpace = omKeyInfo.getReplicatedSize();
// if keyToDelete isn't null, usedNamespace needn't check and
// increase.
if (keyToDelete != null && (isSameHsyncKey)) {
correctedSpace -= keyToDelete.getReplicatedSize();
checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
correctedSpace);
} else if (keyToDelete != null && !omBucketInfo.getIsVersionEnabled()) {
// Subtract the size of blocks to be overwritten.
correctedSpace -= keyToDelete.getReplicatedSize();
RepeatedOmKeyInfo oldVerKeyInfo = getOldVersionsToCleanUp(
keyToDelete, trxnLogIndex);
checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
correctedSpace);
// using pseudoObjId as objectId can be same in case of overwrite key
long pseudoObjId = ozoneManager.getObjectIdFromTxId(trxnLogIndex);
String delKeyName = omMetadataManager.getOzoneDeletePathKey(
pseudoObjId, dbOzoneKey);
if (null == oldKeyVersionsToDeleteMap) {
oldKeyVersionsToDeleteMap = new HashMap<>();
}
// Remove any block from oldVerKeyInfo that share the same container ID
// and local ID with omKeyInfo blocks'.
// Otherwise, it causes data loss once those shared blocks are added
// to deletedTable and processed by KeyDeletingService for deletion.
filterOutBlocksStillInUse(omKeyInfo, oldVerKeyInfo);
if (!oldVerKeyInfo.getOmKeyInfoList().isEmpty()) {
oldKeyVersionsToDeleteMap.put(delKeyName, oldVerKeyInfo);
}
} else {
checkBucketQuotaInNamespace(omBucketInfo, 1L);
checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
correctedSpace);
omBucketInfo.incrUsedNamespace(1L);
}
// let the uncommitted blocks pretend as key's old version blocks
// which will be deleted as RepeatedOmKeyInfo
final OmKeyInfo pseudoKeyInfo = isHSync ? null
: wrapUncommittedBlocksAsPseudoKey(uncommitted, omKeyInfo);
if (pseudoKeyInfo != null) {
long pseudoObjId = ozoneManager.getObjectIdFromTxId(trxnLogIndex);
String delKeyName = omMetadataManager.getOzoneDeletePathKey(
pseudoObjId, dbOzoneKey);
if (null == oldKeyVersionsToDeleteMap) {
oldKeyVersionsToDeleteMap = new HashMap<>();
}
oldKeyVersionsToDeleteMap.computeIfAbsent(delKeyName,
key -> new RepeatedOmKeyInfo()).addOmKeyInfo(pseudoKeyInfo);
}
// Add to cache of open key table and key table.
if (!isHSync) {
// If !isHSync = true, put a tombstone in OpenKeyTable cache,
// indicating the key is removed from OpenKeyTable.
// So that this key can't be committed again.
omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
dbOpenKey, trxnLogIndex);
// Prevent hsync metadata from getting committed to the final key
omKeyInfo.getMetadata().remove(OzoneConsts.HSYNC_CLIENT_ID);
if (isRecovery) {
omKeyInfo.getMetadata().remove(OzoneConsts.LEASE_RECOVERY);
}
} else if (newOpenKeyInfo != null) {
// isHSync is true and newOpenKeyInfo is set, update OpenKeyTable
omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
dbOpenKey, newOpenKeyInfo, trxnLogIndex);
}
omMetadataManager.getKeyTable(getBucketLayout()).addCacheEntry(
dbOzoneKey, omKeyInfo, trxnLogIndex);
omBucketInfo.incrUsedBytes(correctedSpace);
omClientResponse = new OMKeyCommitResponse(omResponse.build(),
omKeyInfo, dbOzoneKey, dbOpenKey, omBucketInfo.copyObject(),
oldKeyVersionsToDeleteMap, isHSync, newOpenKeyInfo, dbOpenKeyToDeleteKey, openKeyToDelete);
result = Result.SUCCESS;
} catch (IOException | InvalidPathException ex) {
result = Result.FAILURE;
exception = ex;
omClientResponse = new OMKeyCommitResponse(createErrorOMResponse(
omResponse, exception), getBucketLayout());
} finally {
if (bucketLockAcquired) {
mergeOmLockDetails(omMetadataManager.getLock()
.releaseWriteLock(BUCKET_LOCK, volumeName, bucketName));
}
if (omClientResponse != null) {
omClientResponse.setOmLockDetails(getOmLockDetails());
}
}
// Debug logging for any key commit operation, successful or not
LOG.debug("Key commit {} with isHSync = {}, isRecovery = {}, omKeyInfo = {}",
result == Result.SUCCESS ? "succeeded" : "failed", isHSync, isRecovery, omKeyInfo);
if (!isHSync) {
markForAudit(auditLogger, buildAuditMessage(OMAction.COMMIT_KEY, auditMap,
exception, getOmRequest().getUserInfo()));
processResult(commitKeyRequest, volumeName, bucketName, keyName,
omMetrics, exception, omKeyInfo, result);
}
return omClientResponse;
}