public static void main()

in src/main/java/com/amazonaws/kda/flink/benchmarking/BenchmarkScheduler.java [52:130]


	public static void main(String[] args) {

		Logger logger = LoggerFactory.getLogger(BenchmarkScheduler.class);

		BenchmarkingSpecs benchMarkingSpecs = parseBenchamrkingSpecs(args[0]);
		benchMarkingSpecs.setJobId(UUID.randomUUID().toString());
		benchMarkingSpecs.setNumberofChildJobs(benchMarkingSpecs.getChildJobs().size());
		benchMarkingSpecs
				.setJobStartTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));

		String targetKinesisStreams = benchMarkingSpecs.getTargetKinesisStreams().stream()
				.collect(Collectors.joining("$"));
		String startingHashKeys = KinesisStreamUtil.getHashKeysForOpenShards(
				AmazonKinesisClientBuilder.standard().withRegion(benchMarkingSpecs.getRegion()).build(),
				benchMarkingSpecs.getTargetKinesisStreams().get(0)).stream().collect(Collectors.joining("$"));

		/**
		 * Define JobDetail and Trigger for each Job provided in the Job Template
		 * 
		 */
		List<JobSchedule> jobSchedules = new ArrayList<JobSchedule>();
		for (ChildJob childJob : benchMarkingSpecs.getChildJobs()) {
			List<String> interactions = KDSProducerUtil.createInteractions(childJob.getNumberofInteractions());

			childJob.setJobId(UUID.randomUUID().toString());
			childJob.setParentJobId(benchMarkingSpecs.getJobId());

			JobDetail jobDetail = newJob(KinesisProducerForFlinkSessionWindow.class)
					.withIdentity(childJob.getJobName().concat("-").concat(benchMarkingSpecs.getJobStartTime()),
							childJob.getJobName())
					.usingJobData("jobId", childJob.getJobId()).usingJobData("jobName", childJob.getJobName())
					.usingJobData("parentJobId", childJob.getParentJobId())
					.usingJobData("isUsingDynamoDBLocal", benchMarkingSpecs.isUsingDynamoDBLocal())
					.usingJobData("dynamoDBLocalURI", benchMarkingSpecs.getDynamoDBLocalURI())
					.usingJobData("childJobSummaryDDBTblName", benchMarkingSpecs.getChildJobSummaryDDBTableName())
					.usingJobData("region", benchMarkingSpecs.getRegion())
					.usingJobData("masterJobId", benchMarkingSpecs.getJobId())
					.usingJobData("targetKinesisStreams", targetKinesisStreams)
					.usingJobData("startingHashKeys", startingHashKeys)
					.usingJobData("interactionsIds", interactions.stream().collect(Collectors.joining("$")))
					.usingJobData("stringSeparator", "$").usingJobData("batchSize", childJob.getBatchSize())
					.usingJobData("startingHashKeyIndex", 0).build();

			Trigger trigger = newTrigger()
					.withIdentity(childJob.getJobName().concat("-").concat("-trigger"),
							childJob.getJobName().concat("-").concat("min-group"))
					.startNow().withSchedule(simpleSchedule().withIntervalInSeconds(childJob.getBatchCadence())
							.withRepeatCount(childJob.getNumberofBatches()))
					.build();

			JobSchedule jobSchedule = new JobSchedule();
			jobSchedule.setJobDetail(jobDetail);
			jobSchedule.setTrigger(trigger);
			jobSchedules.add(jobSchedule);
		}

		/**
		 * Schedule the Jobs via Quartz Enterprise Job Scheduler
		 */
		try {
			Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
			scheduler.start();
			for (JobSchedule jobSchedule : jobSchedules) {
				scheduler.scheduleJob(jobSchedule.getJobDetail(), jobSchedule.getTrigger());
			}
			logger.info(
					"Put Main thread in sleeping mode for " + benchMarkingSpecs.getJobDurationInMinutes() + " minutes");

			// Update DynamoDB
			trackJobs(benchMarkingSpecs);

			Thread.sleep(benchMarkingSpecs.getJobDurationInMinutes() * 60000);
			scheduler.shutdown();
		} catch (SchedulerException se) {
			se.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}