public StateIndexerWorkerOutput runStateIndexerWorker()

in BDB-701-Moving-From-Transactional-to-Stateful-Batch-Processing/lambda/src/main/java/com/amazon/aws/blog/StateIndexerLambdaImpl.java [178:255]


    public StateIndexerWorkerOutput runStateIndexerWorker(final StateIndexerWorkerInput input) throws Exception {

        final AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.standard().build();
        final DynamoDBMapper ddbMapper = new DynamoDBMapper(ddbClient);

        final Set<HostAndPort> jedisClusterNodes = new HashSet<>();
        jedisClusterNodes.add(getElastiCacheAddress());
        final JedisCluster jc = new JedisCluster(jedisClusterNodes,
                DefaultJedisClientConfig.builder().ssl(true).build(), DEFAULT_REDIRECTIONS, new GenericObjectPoolConfig<>());

        final List<Map.Entry<StatefulArtifactIndex, String>> indices = new LinkedList<>();

        // Reach each part file and create the approach index entry for each stateful artifact
        //
        for (final String partFile : input.getPartFiles()) {
            GetObjectRequest objectRequest = new GetObjectRequest(S3_BUCKET, partFile);
            S3Object s3Object = s3.getObject(objectRequest);

            try (final BufferedReader reader =
                         new BufferedReader(new InputStreamReader(s3Object.getObjectContent()))) {
                long offset = 0;
                String line;
                while ((line = reader.readLine()) != null) {
                    long size = line.getBytes(StandardCharsets.UTF_8).length;
                    final String[] lineArr = line.split(",");
                    final String orderId = lineArr[0];

                    final StatefulArtifactIndex index = new StatefulArtifactIndex();
                    index.setOrderId(orderId);
                    index.setFile(partFile);
                    index.setByteSize(String.valueOf(size));
                    index.setByteOffset(String.valueOf(offset));
                    indices.add(new AbstractMap.SimpleImmutableEntry<>(index, line));

                    offset += size + 1;
                }
            } catch (IOException e) {
                throw new Exception(String.format("Could not read the part file for %s", partFile), e);
            }
        }

        // Index the part files in a multi-threaded fashion
        //
        final int numThreadsPerWorker = DEFAULT_THREADS_PER_LAMBDA_WORKER;
        final ExecutorService executorService = Executors.newFixedThreadPool(numThreadsPerWorker);
        List<Future<Boolean>> futures = new LinkedList<>();

        for (final Map.Entry<StatefulArtifactIndex, String> indexPair : indices) {
            try {
                Callable<Boolean> callable = () -> {
                    final StatefulArtifactIndex index = indexPair.getKey();
                    ddbMapper.save(index);
                    jc.set(String.format("%s-%s-%s", index.getFile(), String.valueOf(index.getByteOffset()),
                            String.valueOf(index.getByteSize())), indexPair.getValue());
                    return true;
                };
                futures.add(executorService.submit(callable));
            } catch (Exception e) {
                e.printStackTrace();
                throw new Exception(String.format("Failure while indexing %s", indexPair.getKey().getOrderId()), e);
            }
        }
        executorService.shutdown();

        int artifactsIndexed = 0;
        for (final Future<Boolean> future : futures) {
            try {
                future.get();
                artifactsIndexed++;
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
                throw new Exception("Failure while indexing", e);
            }
        }
        final StateIndexerWorkerOutput indexerWorkerOutput = new StateIndexerWorkerOutput();
        indexerWorkerOutput.setNumArtifactsIndexed(artifactsIndexed);
        return indexerWorkerOutput;
    }