in src/main/java/com/amazonaws/services/dynamodbv2/transactions/Transaction.java [1090:1209]
protected Map<String, AttributeValue> applyAndKeepLock(Request request, Map<String, AttributeValue> lockedItem) {
Map<String, AttributeValue> returnItem = null;
// 1. Remember what return values the caller wanted.
String returnValues = request.getReturnValues(); // save the returnValues because we will mutate it
if(returnValues == null) {
returnValues = "NONE";
}
// 3. No-op if the locked item shows it was already applied.
if(! lockedItem.containsKey(AttributeName.APPLIED.toString())) {
try {
Map<String, ExpectedAttributeValue> expected = new HashMap<String, ExpectedAttributeValue>();
expected.put(AttributeName.TXID.toString(), new ExpectedAttributeValue().withValue(new AttributeValue(txId)));
expected.put(AttributeName.APPLIED.toString(), new ExpectedAttributeValue().withExists(false));
// TODO assert if the caller request contains any of our internally defined fields?
// but we aren't copying the request object, so our retries might trigger the assertion.
// at least could assert that they have the values that we want.
if(request instanceof PutItem) {
PutItemRequest put = ((PutItem)request).getRequest();
// Add the lock id and "is transient" flags to the put request (put replaces)
put.getItem().put(AttributeName.TXID.toString(), new AttributeValue(txId));
put.getItem().put(AttributeName.APPLIED.toString(), new AttributeValue(BOOLEAN_TRUE_ATTR_VAL));
if(lockedItem.containsKey(AttributeName.TRANSIENT.toString())) {
put.getItem().put(AttributeName.TRANSIENT.toString(), lockedItem.get(AttributeName.TRANSIENT.toString()));
}
put.getItem().put(AttributeName.DATE.toString(), lockedItem.get(AttributeName.DATE.toString()));
put.setExpected(expected);
put.setReturnValues(returnValues);
returnItem = txManager.getClient().putItem(put).getAttributes();
} else if(request instanceof UpdateItem) {
UpdateItemRequest update = ((UpdateItem)request).getRequest();
update.setExpected(expected);
update.setReturnValues(returnValues);
if(update.getAttributeUpdates() != null) {
// Defensively delete the attributes in the request that could interfere with the transaction
update.getAttributeUpdates().remove(AttributeName.TXID.toString());
update.getAttributeUpdates().remove(AttributeName.TRANSIENT.toString());
update.getAttributeUpdates().remove(AttributeName.DATE.toString());
} else {
update.setAttributeUpdates(new HashMap<String, AttributeValueUpdate>(1));
}
update.getAttributeUpdates().put(AttributeName.APPLIED.toString(), new AttributeValueUpdate()
.withAction(AttributeAction.PUT)
.withValue(new AttributeValue(BOOLEAN_TRUE_ATTR_VAL)));
returnItem = txManager.getClient().updateItem(update).getAttributes();
} else if(request instanceof DeleteItem) {
// no-op - delete doesn't change the item until unlock post-commit
} else if(request instanceof GetItem) {
// no-op
} else {
throw new TransactionAssertionException(txId, "Request may not be null");
}
} catch (ConditionalCheckFailedException e) {
// ignore - apply already happened
}
}
// If it is a redrive, don't return an item.
// TODO propagate a flag for whether this is a caller request or if it's being redriven by another transaction manager picking it up.
// In that case it doesn't matter what we do here.
// Also change the returnValues in the write requests based on this.
if("ALL_OLD".equals(returnValues) && isTransient(lockedItem)) {
return null;
} else if(request instanceof GetItem) {
GetItemRequest getRequest = ((GetItem)request).getRequest();
Request lockingRequest = txItem.getRequestForKey(request.getTableName(), request.getKey(txManager));
if(lockingRequest instanceof DeleteItem) {
return null; // If the item we're getting is deleted in this transaction
} else if(lockingRequest instanceof GetItem && isTransient(lockedItem)) {
return null; // If the item has only a read lock and is transient
} else if(getRequest.getAttributesToGet() != null) {
// Remove attributes that weren't asked for in the request
Set<String> attributesToGet = new HashSet<String>(getRequest.getAttributesToGet());
Iterator<Map.Entry<String, AttributeValue>> it = lockedItem.entrySet().iterator();
while(it.hasNext()) {
Map.Entry<String, AttributeValue> attr = it.next();
if(! attributesToGet.contains(attr.getKey())) {
it.remove(); // TODO does this need to keep the tx attributes?
}
}
}
return lockedItem;
} else if(request instanceof DeleteItem) {
if("ALL_OLD".equals(returnValues)) {
return lockedItem; // Deletes are left alone in apply, so return the locked item
}
return null; // In the case of NONE or ALL_NEW, it doesn't matter - item is (being) deleted.
} else if("ALL_OLD".equals(returnValues)) {
if(returnItem != null) {
return returnItem; // If the apply write succeeded, we have the ALL_OLD from the request
}
returnItem = txItem.loadItemImage(request.getRid());
if(returnItem == null) {
throw new UnknownCompletedTransactionException(txId, "Transaction must have completed since the old copy of the image is missing");
}
return returnItem;
} else if("ALL_NEW".equals(returnValues)) {
if(returnItem != null) {
return returnItem; // If the apply write succeeded, we have the ALL_NEW from the request
}
returnItem = getItem(request.getTableName(), request.getKey(txManager));
if(returnItem == null) {
throw new UnknownCompletedTransactionException(txId, "Transaction must have completed since the item no longer exists");
}
String owner = getOwner(returnItem);
if(! txId.equals(owner)) {
throw new ItemNotLockedException(txId, owner, request.getTableName(), returnItem);
}
return returnItem;
} else if("NONE".equals(returnValues)) {
return null;
} else {
throw new TransactionAssertionException(txId, "Unsupported return values: " + returnValues);
}
}