in BDB-701-Moving-From-Transactional-to-Stateful-Batch-Processing/lambda/src/main/java/com/amazon/aws/blog/StateIndexerLambdaImpl.java [178:255]
public StateIndexerWorkerOutput runStateIndexerWorker(final StateIndexerWorkerInput input) throws Exception {
final AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.standard().build();
final DynamoDBMapper ddbMapper = new DynamoDBMapper(ddbClient);
final Set<HostAndPort> jedisClusterNodes = new HashSet<>();
jedisClusterNodes.add(getElastiCacheAddress());
final JedisCluster jc = new JedisCluster(jedisClusterNodes,
DefaultJedisClientConfig.builder().ssl(true).build(), DEFAULT_REDIRECTIONS, new GenericObjectPoolConfig<>());
final List<Map.Entry<StatefulArtifactIndex, String>> indices = new LinkedList<>();
// Reach each part file and create the approach index entry for each stateful artifact
//
for (final String partFile : input.getPartFiles()) {
GetObjectRequest objectRequest = new GetObjectRequest(S3_BUCKET, partFile);
S3Object s3Object = s3.getObject(objectRequest);
try (final BufferedReader reader =
new BufferedReader(new InputStreamReader(s3Object.getObjectContent()))) {
long offset = 0;
String line;
while ((line = reader.readLine()) != null) {
long size = line.getBytes(StandardCharsets.UTF_8).length;
final String[] lineArr = line.split(",");
final String orderId = lineArr[0];
final StatefulArtifactIndex index = new StatefulArtifactIndex();
index.setOrderId(orderId);
index.setFile(partFile);
index.setByteSize(String.valueOf(size));
index.setByteOffset(String.valueOf(offset));
indices.add(new AbstractMap.SimpleImmutableEntry<>(index, line));
offset += size + 1;
}
} catch (IOException e) {
throw new Exception(String.format("Could not read the part file for %s", partFile), e);
}
}
// Index the part files in a multi-threaded fashion
//
final int numThreadsPerWorker = DEFAULT_THREADS_PER_LAMBDA_WORKER;
final ExecutorService executorService = Executors.newFixedThreadPool(numThreadsPerWorker);
List<Future<Boolean>> futures = new LinkedList<>();
for (final Map.Entry<StatefulArtifactIndex, String> indexPair : indices) {
try {
Callable<Boolean> callable = () -> {
final StatefulArtifactIndex index = indexPair.getKey();
ddbMapper.save(index);
jc.set(String.format("%s-%s-%s", index.getFile(), String.valueOf(index.getByteOffset()),
String.valueOf(index.getByteSize())), indexPair.getValue());
return true;
};
futures.add(executorService.submit(callable));
} catch (Exception e) {
e.printStackTrace();
throw new Exception(String.format("Failure while indexing %s", indexPair.getKey().getOrderId()), e);
}
}
executorService.shutdown();
int artifactsIndexed = 0;
for (final Future<Boolean> future : futures) {
try {
future.get();
artifactsIndexed++;
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
throw new Exception("Failure while indexing", e);
}
}
final StateIndexerWorkerOutput indexerWorkerOutput = new StateIndexerWorkerOutput();
indexerWorkerOutput.setNumArtifactsIndexed(artifactsIndexed);
return indexerWorkerOutput;
}