in src/main/java/com/amazonaws/kda/flink/benchmarking/KinesisProducerForFlinkSessionWindow.java [42:97]
public void execute(JobExecutionContext context) throws JobExecutionException {
String dynamoDBLocalURI = null;
// Get job specific settings
JobKey key = context.getJobDetail().getKey();
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
// System.out.println("Job Key: " + key.getName());
// System.out.println("\nFire Instance Id: " + context.getFireInstanceId());
String jobId = dataMap.getString("jobId");
String jobName = dataMap.getString("jobName");
boolean isUsingDynamoDBLocal = dataMap.getBoolean("isUsingDynamoDBLocal");
if (isUsingDynamoDBLocal)
dynamoDBLocalURI = dataMap.getString("dynamoDBLocalURI");
String childJobSummaryDDBTblName = dataMap.getString("childJobSummaryDDBTblName");
String region = dataMap.getString("region");
String interactionsIds = dataMap.getString("interactionsIds");
String stringSeparator = dataMap.getString("stringSeparator");
String targetKinesisStreams = dataMap.getString("targetKinesisStreams");
String startingHashKeys = dataMap.getString("startingHashKeys");
int batchSize = dataMap.getInt("batchSize");
List<String> eventList = new ArrayList<String>();
AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard().withRegion(region).build();
List<String> interactionList = KDSProducerUtil.tokenizeStrings(interactionsIds, stringSeparator);
List<String> targetKinesisStreamsList = KDSProducerUtil.tokenizeStrings(targetKinesisStreams, stringSeparator);
List<String> startingHashKeyList = KDSProducerUtil.tokenizeStrings(startingHashKeys, stringSeparator);
Iterator<String> hashKeysIterator = Iterables.cycle(startingHashKeyList).iterator();
AmazonDynamoDB dynamoDB = null;
if (isUsingDynamoDBLocal)
dynamoDB = AmazonDynamoDBClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(dynamoDBLocalURI, region))
.build();
else
dynamoDB = AmazonDynamoDBClientBuilder.standard().withRegion(region).build();
DynamoDB dynamoDBClient = new DynamoDB(dynamoDB);
for (String interactionId : interactionList) {
eventList = KDSProducerUtil.createEvents(eventList, batchSize, interactionId);
for (String targetStream : targetKinesisStreamsList) {
KDSProducerUtil.writeMessagesToKinesis(kinesis, targetStream, eventList, hashKeysIterator);
DDBUtil.insertChildJobDetailedStatus(dynamoDBClient, targetStream, jobId, context.getFireInstanceId(),
targetStream, interactionId, batchSize, System.currentTimeMillis());
}
}
// Check if this is the last Job execution. If yes, then prepare for next Hourly
// Window.
if (!Optional.ofNullable(context.getTrigger().getNextFireTime()).isPresent()) {
System.out.printf("The last instance of the job. Job Key: %s, Job Id: %s \n", key.getName(), jobId);
DDBUtil.updateChildJobStatus(dynamoDBClient, childJobSummaryDDBTblName, jobName, jobId,
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), "Completed");
}
}