private List getAggregationEntries()

in src/main/java/com/amazonaws/partners/saasfactory/metering/aggregation/StripeBillingPublish.java [107:171]


    private List<AggregationEntry> getAggregationEntries(String tenantID) {
        HashMap<String,String> expressionNames = new HashMap<>();
        expressionNames.put(PRIMARY_KEY_EXPRESSION_NAME, PRIMARY_KEY_NAME);
        expressionNames.put(SORT_KEY_EXPRESSION_NAME, SORT_KEY_NAME);
        expressionNames.put(SUBMITTED_KEY_EXPRESSION_NAME, SUBMITTED_KEY_ATTRIBUTE_NAME);

        HashMap<String, AttributeValue> expressionValues = new HashMap<>();
        AttributeValue tenantIDValue = AttributeValue.builder()
                .s(tenantID)
                .build();
        expressionValues.put(TENANT_ID_EXPRESSION_VALUE, tenantIDValue);

        AttributeValue aggregationEntryPrefixValue = AttributeValue.builder()
                .s(AGGREGATION_ENTRY_PREFIX)
                .build();
        expressionValues.put(AGGREGATION_EXPRESSION_VALUE, aggregationEntryPrefixValue);

        // Filter for those entries that have not yet been submitted to the billing provider
        AttributeValue keySubmittedValue = AttributeValue.builder()
                .bool(false)
                .build();
        expressionValues.put(KEY_SUBMITTED_EXPRESSION_VALUE, keySubmittedValue);

        QueryResponse result = null;
        List<AggregationEntry> aggregationEntries = new ArrayList<>();
        do {
            QueryRequest request = QueryRequest.builder()
                    .tableName(this.tableConfig.getTableName())
                    .keyConditionExpression(String.format("%s = %s and begins_with(%s, %s)",
                                                PRIMARY_KEY_EXPRESSION_NAME,
                                                TENANT_ID_EXPRESSION_VALUE,
                                                SORT_KEY_EXPRESSION_NAME,
                                                AGGREGATION_EXPRESSION_VALUE))
                    .filterExpression(String.format("%s = %s", SUBMITTED_KEY_EXPRESSION_NAME, KEY_SUBMITTED_EXPRESSION_VALUE))
                    .expressionAttributeNames(expressionNames)
                    .expressionAttributeValues(expressionValues)
                    .build();
            if (result != null && !result.lastEvaluatedKey().isEmpty()) {
                request = request.toBuilder()
                        .exclusiveStartKey(result.lastEvaluatedKey())
                        .build();
            }
            try {
                result = this.ddb.query(request);
            } catch (ResourceNotFoundException e) {
                this.logger.error("Table {} does not exist", this.tableConfig.getTableName());
                return new ArrayList<>();
            } catch (InternalServerErrorException e) {
                this.logger.error(e.getMessage());
                return new ArrayList<>();
            }
            for (Map<String, AttributeValue> item : result.items()) {
                String[] aggregationInformation = item.get(SORT_KEY_NAME).s().split(ATTRIBUTE_DELIMITER);
                Instant periodStart = Instant.ofEpochMilli(Long.parseLong(aggregationInformation[PERIOD_START_ARRAY_LOCATION]));
                Long quantity = Long.valueOf(item.get(QUANTITY_ATTRIBUTE_NAME).n());
                String idempotencyKey = item.get(IDEMPOTENTCY_KEY_ATTRIBUTE_NAME).s();
                AggregationEntry entry = new AggregationEntry(tenantID,
                        periodStart,
                        quantity,
                        idempotencyKey);
                aggregationEntries.add(entry);
            }
        } while (!result.lastEvaluatedKey().isEmpty());
        return aggregationEntries;
    }