public void runStateIndexerCoordinator()

in BDB-701-Moving-From-Transactional-to-Stateful-Batch-Processing/lambda/src/main/java/com/amazon/aws/blog/StateIndexerLambdaImpl.java [109:167]


    public void runStateIndexerCoordinator(final StateIndexerCoordinatorInput input) throws Exception {
        final float numWorkers = DEFAULT_MAX_NUMBER_OF_LAMBDA_WORKERS;
        System.out.println("Using " + S3_BUCKET + " bucket with prefix " + input.getStatefulOutput().split("/")[input.getStatefulOutput().split("/").length - 1]);

        // Lists all part files from S3 based on the provided folder prefix
        //
        final ListObjectsRequest listRequest = new ListObjectsRequest().withBucketName(S3_BUCKET)
                .withPrefix(input.getStatefulOutput().split("/")[input.getStatefulOutput().split("/").length - 1]);
        ObjectListing listResult = s3.listObjects(listRequest);
        final List<S3ObjectSummary> summaries = listResult.getObjectSummaries();

        while (listResult.isTruncated()) {
            listResult = s3.listNextBatchOfObjects(listResult);
            summaries.addAll(listResult.getObjectSummaries());
        }

        // Retrieve the part file S3 locations, excluding any files that are not Spark part-files
        final List<String> partFiles = summaries
                .stream()
                .map(S3ObjectSummary::getKey)
                .filter(part -> part.contains("part-"))
                .collect(Collectors.toList());

        final ExecutorService executorService = Executors.newFixedThreadPool((int) numWorkers);
        List<Future<StateIndexerWorkerOutput>> futures = new LinkedList<>();

        final int totalNumParts = partFiles.size();
        final int numPartsPerWorker = (int) Math.ceil(totalNumParts / numWorkers);

        System.out.println("Using " + ELASTICACHE_ID + " cache ID");
        System.out.println("Using " + numPartsPerWorker + " parts per worker");
        System.out.println("Processing " + partFiles + " part files");

        // Assign workers in a multi-threaded fashion
        for (int i = 0; i < partFiles.size(); i += numPartsPerWorker) {
            final List<String> partsForWorker = new LinkedList<>();
            for (int j = i; j < i + numPartsPerWorker && j < partFiles.size(); j++) {
                partsForWorker.add(partFiles.get(j));
            }

            final StateIndexerWorkerInput workerInput = new StateIndexerWorkerInput(partsForWorker);
            StateIndexerWorker callable = new StateIndexerWorker(workerInput);
            futures.add(executorService.submit(callable));
        }
        executorService.shutdown();

        // Wait for all workers to complete. Consider better failure handling at this point.
        int artifactsIndexed = 0;
        for (final Future<StateIndexerWorkerOutput> future : futures) {
            try {
                final StateIndexerWorkerOutput workerOutput = future.get();
                artifactsIndexed += workerOutput.getNumArtifactsIndexed();
            } catch (InterruptedException | ExecutionException e) {
                throw new Exception("Failure while indexing", e);
            }
        }

        System.out.println("Indexed " + artifactsIndexed + " artifacts");
    }