protected void submitRequestEntries()

in flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java [161:200]


    protected void submitRequestEntries(
            List<DynamoDbWriteRequest> requestEntries,
            Consumer<List<DynamoDbWriteRequest>> requestResultConsumer) {

        List<WriteRequest> items = new ArrayList<>();

        if (CollectionUtil.isNullOrEmpty(overwriteByPartitionKeys)) {
            for (DynamoDbWriteRequest request : requestEntries) {
                items.add(convertToWriteRequest(request));
            }
        } else {
            // deduplication needed
            Map<String, WriteRequest> container = new HashMap<>();
            PrimaryKeyBuilder keyBuilder = new PrimaryKeyBuilder(overwriteByPartitionKeys);
            for (DynamoDbWriteRequest request : requestEntries) {
                WriteRequest req = convertToWriteRequest(request);
                container.put(keyBuilder.build(req), req);
            }
            items.addAll(container.values());
        }

        CompletableFuture<BatchWriteItemResponse> future =
                clientProvider
                        .getClient()
                        .batchWriteItem(
                                BatchWriteItemRequest.builder()
                                        .requestItems(singletonMap(tableName, items))
                                        .build());

        future.whenComplete(
                (response, err) -> {
                    if (err != null) {
                        handleFullyFailedRequest(err, requestEntries, requestResultConsumer);
                    } else if (!CollectionUtil.isNullOrEmpty(response.unprocessedItems())) {
                        handlePartiallyUnprocessedRequest(response, requestResultConsumer);
                    } else {
                        requestResultConsumer.accept(Collections.emptyList());
                    }
                });
    }