in emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java [200:243]
public BatchWriteItemResult putBatch(String tableName, Map<String, AttributeValue> item,
long maxItemsPerBatch, Reporter reporter, boolean deletionMode)
throws UnsupportedEncodingException {
int itemSizeBytes = DynamoDBUtil.getItemSizeBytes(item);
if (itemSizeBytes > maxItemByteSize) {
throw new RuntimeException("Cannot pass items with size greater than " + maxItemByteSize
+ ". Item with size of " + itemSizeBytes + " was given.");
}
maxItemsPerBatch = DynamoDBUtil.getBoundedBatchLimit(config, maxItemsPerBatch);
BatchWriteItemResult result = null;
if (writeBatchMap.containsKey(tableName)) {
boolean writeRequestsForTableAtLimit =
writeBatchMap.get(tableName).size() >= maxItemsPerBatch;
boolean totalSizeOfWriteBatchesOverLimit =
writeBatchMapSizeBytes + itemSizeBytes > maxBatchSize;
if (writeRequestsForTableAtLimit || totalSizeOfWriteBatchesOverLimit) {
result = writeBatch(reporter, itemSizeBytes);
}
}
// writeBatchMap could be cleared from writeBatch()
List<WriteRequest> writeBatchList;
if (!writeBatchMap.containsKey(tableName)) {
writeBatchList = new ArrayList<>((int) maxItemsPerBatch);
writeBatchMap.put(tableName, writeBatchList);
} else {
writeBatchList = writeBatchMap.get(tableName);
}
log.debug("BatchWriteItem deletionMode " + deletionMode);
if (deletionMode) {
writeBatchList.add(new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(item)));
} else {
writeBatchList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(item)));
}
writeBatchMapSizeBytes += itemSizeBytes;
return result;
}