in src/dataServices/dynamoDbBundleService.ts [211:357]
private async lockItems(
requests: BatchReadWriteRequest[],
tenantId?: string,
): Promise<{
successfulLock: boolean;
errorType?: BatchReadWriteErrorType;
errorMessage?: string;
lockedItems: ItemRequest[];
}> {
// We don't need to put a lock on CREATE requests because there are no Documents in the DB for the CREATE
// request yet
const allNonCreateRequests = requests.filter((request) => {
return request.operation !== 'create';
});
const itemsToLock: ItemRequest[] = allNonCreateRequests.map((request) => {
return {
resourceType: request.resourceType,
id: request.id,
operation: request.operation,
};
});
if (itemsToLock.length > DynamoDbBundleService.dynamoDbMaxBatchSize) {
const message = `Cannot lock more than ${DynamoDbBundleService.dynamoDbMaxBatchSize} items`;
logger.error(message);
return Promise.resolve({
successfulLock: false,
errorType: 'SYSTEM_ERROR',
errorMessage: message,
lockedItems: [],
});
}
logger.info('Locking begins');
const lockedItems: ItemRequest[] = [];
// We need to read the items so we can find the versionId of each item
const itemReadPromises = itemsToLock.map(async (itemToLock) => {
const projectionExpression = 'id, resourceType, meta';
try {
return await this.dynamoDbHelper.getMostRecentResource(
itemToLock.resourceType,
itemToLock.id,
projectionExpression,
tenantId,
);
} catch (e) {
if (e instanceof ResourceNotFoundError) {
return e;
}
throw e;
}
});
const itemResponses = await Promise.all(itemReadPromises);
const idItemsFailedToRead: string[] = [];
for (let i = 0; i < itemResponses.length; i += 1) {
const itemResponse = itemResponses[i];
// allow for update as create scenario
if (
itemResponse instanceof ResourceNotFoundError &&
!(itemsToLock[i].operation === 'update' && this.updateCreateSupported)
) {
idItemsFailedToRead.push(`${itemsToLock[i].resourceType}/${itemsToLock[i].id}`);
}
}
if (idItemsFailedToRead.length > 0) {
return Promise.resolve({
successfulLock: false,
errorType: 'USER_ERROR',
errorMessage: `Failed to find resources: ${idItemsFailedToRead}`,
lockedItems: [],
});
}
const addLockRequests = [];
for (let i = 0; i < itemResponses.length; i += 1) {
const itemResponse = itemResponses[i];
if (itemResponse instanceof ResourceNotFoundError) {
// eslint-disable-next-line no-continue
continue;
}
const { resourceType, id, meta } = itemResponse.resource;
const vid = parseInt(meta.versionId, 10);
const lockedItem: ItemRequest = {
resourceType,
id,
vid,
operation: allNonCreateRequests[i].operation,
};
if (lockedItem.operation === 'update') {
lockedItem.isOriginalUpdateItem = true;
}
lockedItems.push(lockedItem);
addLockRequests.push(
DynamoDbParamBuilder.buildUpdateDocumentStatusParam(
DOCUMENT_STATUS.AVAILABLE,
DOCUMENT_STATUS.LOCKED,
id,
vid,
resourceType,
tenantId,
),
);
}
const params = {
TransactItems: addLockRequests,
};
let itemsLockedSuccessfully: ItemRequest[] = [];
try {
if (params.TransactItems.length > 0) {
await this.dynamoDb.transactWriteItems(params).promise();
itemsLockedSuccessfully = itemsLockedSuccessfully.concat(lockedItems);
}
logger.info('Finished locking');
return Promise.resolve({
successfulLock: true,
lockedItems: itemsLockedSuccessfully,
});
} catch (e) {
logger.error('Failed to lock', e);
if ((e as any).code === 'TransactionCanceledException') {
return Promise.resolve({
successfulLock: false,
errorType: 'CONFLICT_ERROR',
errorMessage: `Failed to lock resources for transaction due to conflict. Please try again after ${
DynamoDbParamBuilder.LOCK_DURATION_IN_MS / 1000
} seconds.`,
lockedItems: itemsLockedSuccessfully,
});
}
return Promise.resolve({
successfulLock: false,
errorType: 'SYSTEM_ERROR',
errorMessage: `Failed to lock resources for transaction. Please try again after ${
DynamoDbParamBuilder.LOCK_DURATION_IN_MS / 1000
} seconds.`,
lockedItems: itemsLockedSuccessfully,
});
}
}