private BatchWriteItemResult writeBatch()

in emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/DynamoDBClient.java [258:338]


  private BatchWriteItemResult writeBatch(Reporter reporter, final int roomNeeded) {
    final BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest()
        .withRequestItems(writeBatchMap)
        .withReturnConsumedCapacity(ReturnConsumedCapacity.INDEXES);

    RetryResult<BatchWriteItemResult> retryResult = getRetryDriver().runWithRetry(
        new Callable<BatchWriteItemResult>() {
          @Override
          public BatchWriteItemResult call() throws
              UnsupportedEncodingException,
              InterruptedException {
            pauseExponentially(batchWriteRetries);
            BatchWriteItemResult result = dynamoDB.batchWriteItem(batchWriteItemRequest);

            Map<String, List<WriteRequest>> unprocessedItems = result.getUnprocessedItems();
            if (unprocessedItems == null || unprocessedItems.isEmpty()) {
              batchWriteRetries = 0;
            } else {
              batchWriteRetries++;

              int unprocessedItemCount = 0;
              for (List<WriteRequest> unprocessedWriteRequests : unprocessedItems.values()) {
                unprocessedItemCount += unprocessedWriteRequests.size();

                int batchSizeBytes = 0;
                for (WriteRequest request : unprocessedWriteRequests) {
                  batchSizeBytes += DynamoDBUtil.getItemSizeBytes(
                      request.getPutRequest().getItem());
                }

                long maxItemsPerBatch =
                    config.getLong(MAX_ITEMS_PER_BATCH, DEFAULT_MAX_ITEMS_PER_BATCH);
                long maxBatchSize = config.getLong(MAX_BATCH_SIZE, DEFAULT_MAX_BATCH_SIZE);

                if (unprocessedWriteRequests.size() >= maxItemsPerBatch
                    || (maxBatchSize - batchSizeBytes) < roomNeeded) {
                  throw new AmazonClientException("Full list of write requests not processed");
                }
              }

              double consumed = 0.0;
              for (ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) {
                consumed = consumedCapacity.getTable().getCapacityUnits();
                if (consumedCapacity.getLocalSecondaryIndexes() != null) {
                  for (Capacity lsiConsumedCapacity :
                      consumedCapacity.getLocalSecondaryIndexes().values()) {
                    consumed += lsiConsumedCapacity.getCapacityUnits();
                  }
                }
              }

              int batchSize = 0;
              for (List<WriteRequest> writeRequests :
                  batchWriteItemRequest.getRequestItems().values()) {
                batchSize += writeRequests.size();
              }

              log.debug(
                  "BatchWriteItem attempted " + batchSize + " items, consumed " + consumed + " "
                      + "wcu, left unprocessed " + unprocessedItemCount + " items," + " "
                      + "now at " + "" + batchWriteRetries + " retries");
            }
            return result;
          }
        }, reporter, PrintCounter.DynamoDBWriteThrottle);

    writeBatchMap.clear();
    writeBatchMapSizeBytes = 0;

    // If some items failed to go through, add them back to the writeBatchMap
    Map<String, List<WriteRequest>> unprocessedItems = retryResult.result.getUnprocessedItems();
    for (Entry<String, List<WriteRequest>> entry : unprocessedItems.entrySet()) {
      String key = entry.getKey();
      List<WriteRequest> requests = entry.getValue();
      for (WriteRequest request : requests) {
        writeBatchMapSizeBytes += DynamoDBUtil.getItemSizeBytes(request.getPutRequest().getItem());
      }
      writeBatchMap.put(key, requests);
    }
    return retryResult.result;
  }