in src/main/java/com/amazonaws/kda/flink/benchmarking/BenchmarkScheduler.java [52:130]
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(BenchmarkScheduler.class);
BenchmarkingSpecs benchMarkingSpecs = parseBenchamrkingSpecs(args[0]);
benchMarkingSpecs.setJobId(UUID.randomUUID().toString());
benchMarkingSpecs.setNumberofChildJobs(benchMarkingSpecs.getChildJobs().size());
benchMarkingSpecs
.setJobStartTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
String targetKinesisStreams = benchMarkingSpecs.getTargetKinesisStreams().stream()
.collect(Collectors.joining("$"));
String startingHashKeys = KinesisStreamUtil.getHashKeysForOpenShards(
AmazonKinesisClientBuilder.standard().withRegion(benchMarkingSpecs.getRegion()).build(),
benchMarkingSpecs.getTargetKinesisStreams().get(0)).stream().collect(Collectors.joining("$"));
/**
* Define JobDetail and Trigger for each Job provided in the Job Template
*
*/
List<JobSchedule> jobSchedules = new ArrayList<JobSchedule>();
for (ChildJob childJob : benchMarkingSpecs.getChildJobs()) {
List<String> interactions = KDSProducerUtil.createInteractions(childJob.getNumberofInteractions());
childJob.setJobId(UUID.randomUUID().toString());
childJob.setParentJobId(benchMarkingSpecs.getJobId());
JobDetail jobDetail = newJob(KinesisProducerForFlinkSessionWindow.class)
.withIdentity(childJob.getJobName().concat("-").concat(benchMarkingSpecs.getJobStartTime()),
childJob.getJobName())
.usingJobData("jobId", childJob.getJobId()).usingJobData("jobName", childJob.getJobName())
.usingJobData("parentJobId", childJob.getParentJobId())
.usingJobData("isUsingDynamoDBLocal", benchMarkingSpecs.isUsingDynamoDBLocal())
.usingJobData("dynamoDBLocalURI", benchMarkingSpecs.getDynamoDBLocalURI())
.usingJobData("childJobSummaryDDBTblName", benchMarkingSpecs.getChildJobSummaryDDBTableName())
.usingJobData("region", benchMarkingSpecs.getRegion())
.usingJobData("masterJobId", benchMarkingSpecs.getJobId())
.usingJobData("targetKinesisStreams", targetKinesisStreams)
.usingJobData("startingHashKeys", startingHashKeys)
.usingJobData("interactionsIds", interactions.stream().collect(Collectors.joining("$")))
.usingJobData("stringSeparator", "$").usingJobData("batchSize", childJob.getBatchSize())
.usingJobData("startingHashKeyIndex", 0).build();
Trigger trigger = newTrigger()
.withIdentity(childJob.getJobName().concat("-").concat("-trigger"),
childJob.getJobName().concat("-").concat("min-group"))
.startNow().withSchedule(simpleSchedule().withIntervalInSeconds(childJob.getBatchCadence())
.withRepeatCount(childJob.getNumberofBatches()))
.build();
JobSchedule jobSchedule = new JobSchedule();
jobSchedule.setJobDetail(jobDetail);
jobSchedule.setTrigger(trigger);
jobSchedules.add(jobSchedule);
}
/**
* Schedule the Jobs via Quartz Enterprise Job Scheduler
*/
try {
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
scheduler.start();
for (JobSchedule jobSchedule : jobSchedules) {
scheduler.scheduleJob(jobSchedule.getJobDetail(), jobSchedule.getTrigger());
}
logger.info(
"Put Main thread in sleeping mode for " + benchMarkingSpecs.getJobDurationInMinutes() + " minutes");
// Update DynamoDB
trackJobs(benchMarkingSpecs);
Thread.sleep(benchMarkingSpecs.getJobDurationInMinutes() * 60000);
scheduler.shutdown();
} catch (SchedulerException se) {
se.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}