in src/main/java/com/amazonaws/kda/flink/benchmarking/BenchmarkScheduler.java [154:182]
public static void trackJobs(BenchmarkingSpecs benchMarkingSpecs) {
int numInteractionsProcessed = 0;
AmazonDynamoDB dynamoDB = null;
if (benchMarkingSpecs.isUsingDynamoDBLocal())
dynamoDB = AmazonDynamoDBClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
benchMarkingSpecs.getDynamoDBLocalURI(), benchMarkingSpecs.getRegion()))
.build();
else
dynamoDB = AmazonDynamoDBClientBuilder.standard().withRegion(benchMarkingSpecs.getRegion()).build();
DynamoDB dynamoDBClient = new DynamoDB(dynamoDB);
// Insert a record to kda_flink_perf_benchmarking_master_job_summary DDB table
for (ChildJob childJob : benchMarkingSpecs.getChildJobs()) {
numInteractionsProcessed += childJob.getNumberofInteractions();
}
DDBUtil.insertParentJobStatus(dynamoDBClient, benchMarkingSpecs.getParentJobSummaryDDBTableName(),
benchMarkingSpecs.getJobName(), benchMarkingSpecs.getJobId(), numInteractionsProcessed,
benchMarkingSpecs.getJobStartTime(), "Started");
// Insert records to kda_flink_perf_benchmarking_child_job_summary DDB Table
for (ChildJob childJob : benchMarkingSpecs.getChildJobs()) {
DDBUtil.insertChildJobStatus(dynamoDBClient, benchMarkingSpecs.getChildJobSummaryDDBTableName(),
childJob.getJobName(), childJob.getJobId(), childJob.getParentJobId(),
childJob.getNumberofInteractions(), benchMarkingSpecs.getJobStartTime(), "In Progress");
}
}