in src/main/java/com/amazonaws/partners/saasfactory/metering/aggregation/StripeBillingPublish.java [107:171]
private List<AggregationEntry> getAggregationEntries(String tenantID) {
HashMap<String,String> expressionNames = new HashMap<>();
expressionNames.put(PRIMARY_KEY_EXPRESSION_NAME, PRIMARY_KEY_NAME);
expressionNames.put(SORT_KEY_EXPRESSION_NAME, SORT_KEY_NAME);
expressionNames.put(SUBMITTED_KEY_EXPRESSION_NAME, SUBMITTED_KEY_ATTRIBUTE_NAME);
HashMap<String, AttributeValue> expressionValues = new HashMap<>();
AttributeValue tenantIDValue = AttributeValue.builder()
.s(tenantID)
.build();
expressionValues.put(TENANT_ID_EXPRESSION_VALUE, tenantIDValue);
AttributeValue aggregationEntryPrefixValue = AttributeValue.builder()
.s(AGGREGATION_ENTRY_PREFIX)
.build();
expressionValues.put(AGGREGATION_EXPRESSION_VALUE, aggregationEntryPrefixValue);
// Filter for those entries that have not yet been submitted to the billing provider
AttributeValue keySubmittedValue = AttributeValue.builder()
.bool(false)
.build();
expressionValues.put(KEY_SUBMITTED_EXPRESSION_VALUE, keySubmittedValue);
QueryResponse result = null;
List<AggregationEntry> aggregationEntries = new ArrayList<>();
do {
QueryRequest request = QueryRequest.builder()
.tableName(this.tableConfig.getTableName())
.keyConditionExpression(String.format("%s = %s and begins_with(%s, %s)",
PRIMARY_KEY_EXPRESSION_NAME,
TENANT_ID_EXPRESSION_VALUE,
SORT_KEY_EXPRESSION_NAME,
AGGREGATION_EXPRESSION_VALUE))
.filterExpression(String.format("%s = %s", SUBMITTED_KEY_EXPRESSION_NAME, KEY_SUBMITTED_EXPRESSION_VALUE))
.expressionAttributeNames(expressionNames)
.expressionAttributeValues(expressionValues)
.build();
if (result != null && !result.lastEvaluatedKey().isEmpty()) {
request = request.toBuilder()
.exclusiveStartKey(result.lastEvaluatedKey())
.build();
}
try {
result = this.ddb.query(request);
} catch (ResourceNotFoundException e) {
this.logger.error("Table {} does not exist", this.tableConfig.getTableName());
return new ArrayList<>();
} catch (InternalServerErrorException e) {
this.logger.error(e.getMessage());
return new ArrayList<>();
}
for (Map<String, AttributeValue> item : result.items()) {
String[] aggregationInformation = item.get(SORT_KEY_NAME).s().split(ATTRIBUTE_DELIMITER);
Instant periodStart = Instant.ofEpochMilli(Long.parseLong(aggregationInformation[PERIOD_START_ARRAY_LOCATION]));
Long quantity = Long.valueOf(item.get(QUANTITY_ATTRIBUTE_NAME).n());
String idempotencyKey = item.get(IDEMPOTENTCY_KEY_ATTRIBUTE_NAME).s();
AggregationEntry entry = new AggregationEntry(tenantID,
periodStart,
quantity,
idempotencyKey);
aggregationEntries.add(entry);
}
} while (!result.lastEvaluatedKey().isEmpty());
return aggregationEntries;
}