public void execute()

in src/main/java/com/amazonaws/kda/flink/benchmarking/KinesisProducerForFlinkSessionWindow.java [42:97]


	public void execute(JobExecutionContext context) throws JobExecutionException {

		String dynamoDBLocalURI = null;
		// Get job specific settings
		JobKey key = context.getJobDetail().getKey();
		JobDataMap dataMap = context.getJobDetail().getJobDataMap();
		// System.out.println("Job Key: " + key.getName());
		// System.out.println("\nFire Instance Id: " + context.getFireInstanceId());

		String jobId = dataMap.getString("jobId");
		String jobName = dataMap.getString("jobName");
		boolean isUsingDynamoDBLocal = dataMap.getBoolean("isUsingDynamoDBLocal");
		if (isUsingDynamoDBLocal)
			dynamoDBLocalURI = dataMap.getString("dynamoDBLocalURI");
		String childJobSummaryDDBTblName = dataMap.getString("childJobSummaryDDBTblName");
		String region = dataMap.getString("region");
		String interactionsIds = dataMap.getString("interactionsIds");
		String stringSeparator = dataMap.getString("stringSeparator");
		String targetKinesisStreams = dataMap.getString("targetKinesisStreams");
		String startingHashKeys = dataMap.getString("startingHashKeys");
		int batchSize = dataMap.getInt("batchSize");
		
		List<String> eventList = new ArrayList<String>();
		AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard().withRegion(region).build();
		List<String> interactionList = KDSProducerUtil.tokenizeStrings(interactionsIds, stringSeparator);
		List<String> targetKinesisStreamsList = KDSProducerUtil.tokenizeStrings(targetKinesisStreams, stringSeparator);
		List<String> startingHashKeyList = KDSProducerUtil.tokenizeStrings(startingHashKeys, stringSeparator);
		Iterator<String> hashKeysIterator = Iterables.cycle(startingHashKeyList).iterator();

		AmazonDynamoDB dynamoDB = null;
		if (isUsingDynamoDBLocal)
			dynamoDB = AmazonDynamoDBClientBuilder.standard()
					.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(dynamoDBLocalURI, region))
					.build();
		else
			dynamoDB = AmazonDynamoDBClientBuilder.standard().withRegion(region).build();
		
		DynamoDB dynamoDBClient = new DynamoDB(dynamoDB);

		for (String interactionId : interactionList) {
			eventList = KDSProducerUtil.createEvents(eventList, batchSize, interactionId);
			for (String targetStream : targetKinesisStreamsList) {
				KDSProducerUtil.writeMessagesToKinesis(kinesis, targetStream, eventList, hashKeysIterator);
						DDBUtil.insertChildJobDetailedStatus(dynamoDBClient, targetStream, jobId, context.getFireInstanceId(),
						targetStream, interactionId, batchSize, System.currentTimeMillis());
			}
		}
		
		// Check if this is the last Job execution. If yes, then prepare for next Hourly
		// Window.
		if (!Optional.ofNullable(context.getTrigger().getNextFireTime()).isPresent()) {
			System.out.printf("The last instance of the job. Job Key: %s, Job Id: %s \n", key.getName(), jobId);
			DDBUtil.updateChildJobStatus(dynamoDBClient, childJobSummaryDDBTblName, jobName, jobId,
					LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), "Completed");
		}
	}