mantis-control-plane/mantis-control-plane-dynamodb/src/main/java/io/mantisrx/extensions/dynamodb/DynamoDBStore.java [245:331]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        final QueryRequest request = QueryRequest.builder()
                .tableName(this.mantisTable)
                .keyConditionExpression(String.format("%s = %s and begins_with(%s, %s)", PK_E, PK_V, SK_E, SK_V))
                .expressionAttributeNames(expressionAttributesNames)
                .expressionAttributeValues(expressionAttributeValues)
//                .projectionExpression(String.format("%s,%s", PK_E, SK_E))
                .build();
        log.info("querying for all items in partition {} in table {}", partitionKey, tableName);
        final QueryResponse response = this.client.query(request);
        final List<WriteRequest> deleteRequests = new ArrayList<>();
        log.info("retrieved {} from {} and {}", response.items().size(), tableName, partitionKey);
        response.items()
                .forEach(v -> deleteRequests.add(WriteRequest.builder().deleteRequest(
                        DeleteRequest.builder()
                        .key(ImmutableMap.<String, AttributeValue>of(
                                PK, AttributeValue.builder().s(v.get(PK).s()).build(),
                                SK, AttributeValue.builder().s(v.get(SK).s()).build())).build()).build()
                ));
        doBatchWriteRequest(deleteRequests);
        log.info("deleted {} from {} and {}", deleteRequests.size(), tableName, partitionKey);
        return true;
    }

    private WriteRequest writeRequestFrom(String tableName, String partitionKey, String secondaryKey, String data, Duration ttl) {
        final Map<String, AttributeValue> items = new HashMap<>();

        items.put(PK, AttributeValue.builder().s(tableName).build());
        items.put(SK, AttributeValue.builder().s(String.format("%s#%s", partitionKey, secondaryKey)).build());
        items.put(DATA_KEY, AttributeValue.builder().s(data).build());
        items.put(PARTITION_KEY, AttributeValue.builder().s(partitionKey).build());
        items.put(SECONDARY_KEY, AttributeValue.builder().s(secondaryKey).build());
        items.put(TABLE_NAME_KEY, AttributeValue.builder().s(tableName).build());
        if (!ttl.isZero()) {
            items.put(TTL_KEY, AttributeValue.builder()
                    .n(String.valueOf((System.currentTimeMillis()/1000L) + ttl.getSeconds())).build());
        }
        return WriteRequest.builder().putRequest(PutRequest.builder().item(items).build()).build();
    }

    private List<WriteRequest> writeRequestsFrom(String tableName, String partitionKey, Map<String,String> mapSKToData, Duration ttl) {
        List<WriteRequest> writeRequests = new ArrayList<>();
        mapSKToData.forEach((key, value) -> writeRequests.add(
                writeRequestFrom(tableName, partitionKey, key, value, ttl)
        ));
        return writeRequests;
    }
    private WriteRequest deleteRequestFrom(String tableName, String partitionKey, String secondaryKey) {
        final Map<String, AttributeValue> items = new HashMap<>();
        items.put(PK, AttributeValue.builder().s(tableName).build());
        items.put(SK, AttributeValue.builder().s(String.format("%s#%s", partitionKey, secondaryKey)).build());

        return WriteRequest.builder().deleteRequest(DeleteRequest.builder().key(items).build()).build();
    }
    private WriteRequest deleteRequestFrom(String dyanmoPK, String dynamoSK) {
        final Map<String, AttributeValue> items = new HashMap<>();
        log.info("preparing to delete pk {} sk {}", dyanmoPK, dynamoSK);
        items.put(PK, AttributeValue.builder().s(dyanmoPK).build());
        items.put(SK, AttributeValue.builder().s(dynamoSK).build());

        return WriteRequest.builder().deleteRequest(DeleteRequest.builder().key(items).build()).build();
    }
    private List<WriteRequest> deleteRequestsFrom(Map<String, String> mapPKSK) {
        List<WriteRequest> writeRequests = new ArrayList<>();
        mapPKSK.forEach((key, value) -> writeRequests.add(
                deleteRequestFrom(key, value)
        ));
        return writeRequests;
    }

    private boolean doBatchWriteRequest(List<WriteRequest> writeRequests) throws IOException {
        for(int i = 0; i < writeRequests.size(); i +=MAX_ITEMS) {
            final List<WriteRequest> writes = writeRequests.subList(i, Integer.min(i+MAX_ITEMS,writeRequests.size()));
            log.info("processing {} items to {}", writes.size(), this.mantisTable);
            BatchWriteItemRequest batchWriteItemRequest = BatchWriteItemRequest.builder()
                    .requestItems(ImmutableMap.of(this.mantisTable, writes))
                    .build();

            BatchWriteItemResponse batchWriteItemResponse = this.client.batchWriteItem(batchWriteItemRequest);

            while (!batchWriteItemResponse.hasUnprocessedItems()) {
                Map<String, List<WriteRequest>> unprocessedItems = batchWriteItemResponse.unprocessedItems();
                log.warn("handling {} unprocessed items", unprocessedItems.size());

                batchWriteItemResponse = this.client.batchWriteItem(batchWriteItemRequest);
            }
        }
        return true;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



mantis-control-plane/mantis-control-plane-store/mantis-control-plane-store-dynamodb/src/main/java/io/mantisrx/server/master/store/DynamoStore.java [195:281]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        final QueryRequest request = QueryRequest.builder()
                .tableName(this.mantisTable)
                .keyConditionExpression(String.format("%s = %s and begins_with(%s, %s)", PK_E, PK_V, SK_E, SK_V))
                .expressionAttributeNames(expressionAttributesNames)
                .expressionAttributeValues(expressionAttributeValues)
//                .projectionExpression(String.format("%s,%s", PK_E, SK_E))
                .build();
        log.info("querying for all items in partition {} in table {}", partitionKey, tableName);
        final QueryResponse response = this.client.query(request);
        final List<WriteRequest> deleteRequests = new ArrayList<>();
        log.info("retrieved {} from {} and {}", response.items().size(), tableName, partitionKey);
        response.items()
                .forEach(v -> deleteRequests.add(WriteRequest.builder().deleteRequest(
                        DeleteRequest.builder()
                        .key(ImmutableMap.<String, AttributeValue>of(
                                PK, AttributeValue.builder().s(v.get(PK).s()).build(),
                                SK, AttributeValue.builder().s(v.get(SK).s()).build())).build()).build()
                ));
        doBatchWriteRequest(deleteRequests);
        log.info("deleted {} from {} and {}", deleteRequests.size(), tableName, partitionKey);
        return true;
    }

    private WriteRequest writeRequestFrom(String tableName, String partitionKey, String secondaryKey, String data, Duration ttl) {
        final Map<String, AttributeValue> items = new HashMap<>();

        items.put(PK, AttributeValue.builder().s(tableName).build());
        items.put(SK, AttributeValue.builder().s(String.format("%s#%s", partitionKey, secondaryKey)).build());
        items.put(DATA_KEY, AttributeValue.builder().s(data).build());
        items.put(PARTITION_KEY, AttributeValue.builder().s(partitionKey).build());
        items.put(SECONDARY_KEY, AttributeValue.builder().s(secondaryKey).build());
        items.put(TABLE_NAME_KEY, AttributeValue.builder().s(tableName).build());
        if (!ttl.isZero()) {
            items.put(TTL_KEY, AttributeValue.builder()
                    .n(String.valueOf((System.currentTimeMillis()/1000L) + ttl.getSeconds())).build());
        }
        return WriteRequest.builder().putRequest(PutRequest.builder().item(items).build()).build();
    }

    private List<WriteRequest> writeRequestsFrom(String tableName, String partitionKey, Map<String,String> mapSKToData, Duration ttl) {
        List<WriteRequest> writeRequests = new ArrayList<>();
        mapSKToData.forEach((key, value) -> writeRequests.add(
                writeRequestFrom(tableName, partitionKey, key, value, ttl)
        ));
        return writeRequests;
    }
    private WriteRequest deleteRequestFrom(String tableName, String partitionKey, String secondaryKey) {
        final Map<String, AttributeValue> items = new HashMap<>();
        items.put(PK, AttributeValue.builder().s(tableName).build());
        items.put(SK, AttributeValue.builder().s(String.format("%s#%s", partitionKey, secondaryKey)).build());

        return WriteRequest.builder().deleteRequest(DeleteRequest.builder().key(items).build()).build();
    }
    private WriteRequest deleteRequestFrom(String dyanmoPK, String dynamoSK) {
        final Map<String, AttributeValue> items = new HashMap<>();
        log.info("preparing to delete pk {} sk {}", dyanmoPK, dynamoSK);
        items.put(PK, AttributeValue.builder().s(dyanmoPK).build());
        items.put(SK, AttributeValue.builder().s(dynamoSK).build());

        return WriteRequest.builder().deleteRequest(DeleteRequest.builder().key(items).build()).build();
    }
    private List<WriteRequest> deleteRequestsFrom(Map<String, String> mapPKSK) {
        List<WriteRequest> writeRequests = new ArrayList<>();
        mapPKSK.forEach((key, value) -> writeRequests.add(
                deleteRequestFrom(key, value)
        ));
        return writeRequests;
    }

    private boolean doBatchWriteRequest(List<WriteRequest> writeRequests) throws IOException {
        for(int i = 0; i < writeRequests.size(); i +=MAX_ITEMS) {
            final List<WriteRequest> writes = writeRequests.subList(i, Integer.min(i+MAX_ITEMS,writeRequests.size()));
            log.info("processing {} items to {}", writes.size(), this.mantisTable);
            BatchWriteItemRequest batchWriteItemRequest = BatchWriteItemRequest.builder()
                    .requestItems(ImmutableMap.of(this.mantisTable, writes))
                    .build();

            BatchWriteItemResponse batchWriteItemResponse = this.client.batchWriteItem(batchWriteItemRequest);

            while (!batchWriteItemResponse.hasUnprocessedItems()) {
                Map<String, List<WriteRequest>> unprocessedItems = batchWriteItemResponse.unprocessedItems();
                log.warn("handling {} unprocessed items", unprocessedItems.size());

                batchWriteItemResponse = this.client.batchWriteItem(batchWriteItemRequest);
            }
        }
        return true;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



