in source/src/main/java/com/amazonaws/comprehend/esproxy/lambda/processor/BulkProcessor.java [231:265]
private static void attachComprehendResponse(@NonNull final List<String> payloadList,
@NonNull final Future<BatchResponse> result, LambdaLogger logger) {
BatchResponse<?> responseObject = null;
try {
responseObject = result.get();
logger.log(String.format("Successfully got Comprehend batch response for %d documents", payloadList.size()));
} catch (Exception e) {
logger.log("Got an exception while getting comprehend result. " + e);
if (e instanceof ExecutionException) {
logger.log("Got ExecutionException while retrieving results from ExecutorService: " + e.getCause());
}
throw new RuntimeException(e);
}
List<BatchFieldLocator> locatorList = responseObject.getLocatorList();
List<JSONObject> resultList = responseObject.getBatchResultList();
List<JSONObject> flattenedResultList = responseObject.getBatchFlattenedResultList();
for (int i = 0; i < locatorList.size(); i++) {
BatchFieldLocator locator = locatorList.get(i);
int contentRow = locator.getContentRowNum();
JSONObject content = new JSONObject(payloadList.get(contentRow));
// Attach the comprehend result into the original content
content.put(locator.getFieldNameAndOperation(), resultList.get(i));
// Add flattened response for Kibana plotting
if (flattenedResultList != null && flattenedResultList.get(i) != null) {
content.put(String.format("%s_%s", locator.getFieldNameAndOperation(),
Constants.KIBANA_KEY_NAME), flattenedResultList.get(i));
}
// Add timestamp
content.put(Constants.TIME_STAMP_KEY, Instant.now().toString());
// Replace the value in the payload list
payloadList.set(contentRow, content.toString());
}
}