in flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java [161:200]
protected void submitRequestEntries(
List<DynamoDbWriteRequest> requestEntries,
Consumer<List<DynamoDbWriteRequest>> requestResultConsumer) {
List<WriteRequest> items = new ArrayList<>();
if (CollectionUtil.isNullOrEmpty(overwriteByPartitionKeys)) {
for (DynamoDbWriteRequest request : requestEntries) {
items.add(convertToWriteRequest(request));
}
} else {
// deduplication needed
Map<String, WriteRequest> container = new HashMap<>();
PrimaryKeyBuilder keyBuilder = new PrimaryKeyBuilder(overwriteByPartitionKeys);
for (DynamoDbWriteRequest request : requestEntries) {
WriteRequest req = convertToWriteRequest(request);
container.put(keyBuilder.build(req), req);
}
items.addAll(container.values());
}
CompletableFuture<BatchWriteItemResponse> future =
clientProvider
.getClient()
.batchWriteItem(
BatchWriteItemRequest.builder()
.requestItems(singletonMap(tableName, items))
.build());
future.whenComplete(
(response, err) -> {
if (err != null) {
handleFullyFailedRequest(err, requestEntries, requestResultConsumer);
} else if (!CollectionUtil.isNullOrEmpty(response.unprocessedItems())) {
handlePartiallyUnprocessedRequest(response, requestResultConsumer);
} else {
requestResultConsumer.accept(Collections.emptyList());
}
});
}