in emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/write/AbstractDynamoDBRecordWriter.java [100:139]
public void write(K key, V value) throws IOException {
if (value == null) {
throw new RuntimeException("Null record encountered. At least the key columns must be "
+ "specified.");
}
verifyInterval();
if (progressable != null) {
progressable.progress();
}
DynamoDBItemWritable item = convertValueToDynamoDBItem(key, value);
BatchWriteItemResult result = client.putBatch(tableName, item.getItem(),
permissibleWritesPerSecond - writesPerSecond, reporter, deletionMode);
batchSize++;
totalItemsWritten++;
if (result != null) {
if (result.getConsumedCapacity() != null) {
for (ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) {
double consumedUnits = consumedCapacity.getTable().getCapacityUnits();
if (consumedCapacity.getLocalSecondaryIndexes() != null) {
for (Capacity lsiConsumedCapacity :
consumedCapacity.getLocalSecondaryIndexes().values()) {
consumedUnits += lsiConsumedCapacity.getCapacityUnits();
}
}
totalIOPSConsumed += consumedUnits;
}
}
int unprocessedItems = 0;
for (List<WriteRequest> requests : result.getUnprocessedItems().values()) {
unprocessedItems += requests.size();
}
writesPerSecond += batchSize - unprocessedItems;
batchSize = unprocessedItems;
}
}