in src/main/java/com/amazonaws/services/kinesis/aggregators/datastore/DynamoDataStore.java [327:391]
public UpdateItemResult updateConditionalValue(final AmazonDynamoDB dynamoClient,
final String tableName, final UpdateKey key, final String attribute,
final AggregateAttributeModification update) throws Exception {
Map<String, AttributeValue> updateKey = StreamAggregatorUtils.getTableKey(key);
UpdateItemResult result;
final ReturnValue returnValue = ReturnValue.UPDATED_NEW;
final String setAttribute = StreamAggregatorUtils.methodToColumn(attribute);
// create the update that we want to write
final Map<String, AttributeValueUpdate> thisCalcUpdate = new HashMap<String, AttributeValueUpdate>() {
{
put(setAttribute,
new AttributeValueUpdate().withAction(AttributeAction.PUT).withValue(
new AttributeValue().withN("" + update.getFinalValue())));
}
};
// create the request
UpdateItemRequest req = new UpdateItemRequest().withTableName(tableName).withKey(updateKey).withReturnValues(
returnValue).withAttributeUpdates(thisCalcUpdate);
Map<String, ExpectedAttributeValue> expected = new HashMap<>();
final SummaryCalculation calc = update.getCalculationApplied();
// try an update to PUT the value if NOT EXISTS, to establish if we
// are the first writer for this key
expected = new HashMap<String, ExpectedAttributeValue>() {
{
put(setAttribute, new ExpectedAttributeValue().withExists(false));
}
};
req.setExpected(expected);
try {
result = DynamoUtils.updateWithRetries(dynamoClient, req);
// yay - we were the first writer, so our value was written
return result;
} catch (ConditionalCheckFailedException e1) {
// set the expected to the comparison contained in the update
// calculation
expected.clear();
expected.put(
setAttribute,
new ExpectedAttributeValue().withComparisonOperator(
calc.getDynamoComparisonOperator()).withValue(
new AttributeValue().withN("" + update.getFinalValue())));
req.setExpected(expected);
// do the conditional update on the summary
// calculation. this may result in no update being
// applied because the new value is greater than the
// current minimum for MIN, or less than the current
// maximum for MAX.
try {
result = DynamoUtils.updateWithRetries(dynamoClient, req);
return result;
} catch (ConditionalCheckFailedException e2) {
// no worries - we just weren't the min or max!
return null;
}
}
}