in emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java [258:338]
private BatchWriteItemResult writeBatch(Reporter reporter, final int roomNeeded) {
final BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest()
.withRequestItems(writeBatchMap)
.withReturnConsumedCapacity(ReturnConsumedCapacity.INDEXES);
RetryResult<BatchWriteItemResult> retryResult = getRetryDriver().runWithRetry(
new Callable<BatchWriteItemResult>() {
@Override
public BatchWriteItemResult call() throws
UnsupportedEncodingException,
InterruptedException {
pauseExponentially(batchWriteRetries);
BatchWriteItemResult result = dynamoDB.batchWriteItem(batchWriteItemRequest);
Map<String, List<WriteRequest>> unprocessedItems = result.getUnprocessedItems();
if (unprocessedItems == null || unprocessedItems.isEmpty()) {
batchWriteRetries = 0;
} else {
batchWriteRetries++;
int unprocessedItemCount = 0;
for (List<WriteRequest> unprocessedWriteRequests : unprocessedItems.values()) {
unprocessedItemCount += unprocessedWriteRequests.size();
int batchSizeBytes = 0;
for (WriteRequest request : unprocessedWriteRequests) {
batchSizeBytes += DynamoDBUtil.getItemSizeBytes(
request.getPutRequest().getItem());
}
long maxItemsPerBatch =
config.getLong(MAX_ITEMS_PER_BATCH, DEFAULT_MAX_ITEMS_PER_BATCH);
long maxBatchSize = config.getLong(MAX_BATCH_SIZE, DEFAULT_MAX_BATCH_SIZE);
if (unprocessedWriteRequests.size() >= maxItemsPerBatch
|| (maxBatchSize - batchSizeBytes) < roomNeeded) {
throw new AmazonClientException("Full list of write requests not processed");
}
}
double consumed = 0.0;
for (ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) {
consumed = consumedCapacity.getTable().getCapacityUnits();
if (consumedCapacity.getLocalSecondaryIndexes() != null) {
for (Capacity lsiConsumedCapacity :
consumedCapacity.getLocalSecondaryIndexes().values()) {
consumed += lsiConsumedCapacity.getCapacityUnits();
}
}
}
int batchSize = 0;
for (List<WriteRequest> writeRequests :
batchWriteItemRequest.getRequestItems().values()) {
batchSize += writeRequests.size();
}
log.debug(
"BatchWriteItem attempted " + batchSize + " items, consumed " + consumed + " "
+ "wcu, left unprocessed " + unprocessedItemCount + " items," + " "
+ "now at " + "" + batchWriteRetries + " retries");
}
return result;
}
}, reporter, PrintCounter.DynamoDBWriteThrottle);
writeBatchMap.clear();
writeBatchMapSizeBytes = 0;
// If some items failed to go through, add them back to the writeBatchMap
Map<String, List<WriteRequest>> unprocessedItems = retryResult.result.getUnprocessedItems();
for (Entry<String, List<WriteRequest>> entry : unprocessedItems.entrySet()) {
String key = entry.getKey();
List<WriteRequest> requests = entry.getValue();
for (WriteRequest request : requests) {
writeBatchMapSizeBytes += DynamoDBUtil.getItemSizeBytes(request.getPutRequest().getItem());
}
writeBatchMap.put(key, requests);
}
return retryResult.result;
}