in BDB-701-Moving-From-Transactional-to-Stateful-Batch-Processing/lambda/src/main/java/com/amazon/aws/blog/StateIndexerLambdaImpl.java [109:167]
public void runStateIndexerCoordinator(final StateIndexerCoordinatorInput input) throws Exception {
final float numWorkers = DEFAULT_MAX_NUMBER_OF_LAMBDA_WORKERS;
System.out.println("Using " + S3_BUCKET + " bucket with prefix " + input.getStatefulOutput().split("/")[input.getStatefulOutput().split("/").length - 1]);
// Lists all part files from S3 based on the provided folder prefix
//
final ListObjectsRequest listRequest = new ListObjectsRequest().withBucketName(S3_BUCKET)
.withPrefix(input.getStatefulOutput().split("/")[input.getStatefulOutput().split("/").length - 1]);
ObjectListing listResult = s3.listObjects(listRequest);
final List<S3ObjectSummary> summaries = listResult.getObjectSummaries();
while (listResult.isTruncated()) {
listResult = s3.listNextBatchOfObjects(listResult);
summaries.addAll(listResult.getObjectSummaries());
}
// Retrieve the part file S3 locations, excluding any files that are not Spark part-files
final List<String> partFiles = summaries
.stream()
.map(S3ObjectSummary::getKey)
.filter(part -> part.contains("part-"))
.collect(Collectors.toList());
final ExecutorService executorService = Executors.newFixedThreadPool((int) numWorkers);
List<Future<StateIndexerWorkerOutput>> futures = new LinkedList<>();
final int totalNumParts = partFiles.size();
final int numPartsPerWorker = (int) Math.ceil(totalNumParts / numWorkers);
System.out.println("Using " + ELASTICACHE_ID + " cache ID");
System.out.println("Using " + numPartsPerWorker + " parts per worker");
System.out.println("Processing " + partFiles + " part files");
// Assign workers in a multi-threaded fashion
for (int i = 0; i < partFiles.size(); i += numPartsPerWorker) {
final List<String> partsForWorker = new LinkedList<>();
for (int j = i; j < i + numPartsPerWorker && j < partFiles.size(); j++) {
partsForWorker.add(partFiles.get(j));
}
final StateIndexerWorkerInput workerInput = new StateIndexerWorkerInput(partsForWorker);
StateIndexerWorker callable = new StateIndexerWorker(workerInput);
futures.add(executorService.submit(callable));
}
executorService.shutdown();
// Wait for all workers to complete. Consider better failure handling at this point.
int artifactsIndexed = 0;
for (final Future<StateIndexerWorkerOutput> future : futures) {
try {
final StateIndexerWorkerOutput workerOutput = future.get();
artifactsIndexed += workerOutput.getNumArtifactsIndexed();
} catch (InterruptedException | ExecutionException e) {
throw new Exception("Failure while indexing", e);
}
}
System.out.println("Indexed " + artifactsIndexed + " artifacts");
}