in source/src/main/java/com/amazonaws/comprehend/esproxy/lambda/processor/IndexProcessor.java [81:135]
public Response processRequest(@NonNull final Request request, @NonNull final LambdaLogger logger) {
logger.log("Index request detected");
String payloadStr = HTTPTransformer.transformHttpEntityToString(request.getEntity());
Map<String, ComprehendConfiguration> configMap = configRetriever.retrieveStoredConfig();
String indexName = RequestIdentifier.getIndexName(request);
if (Strings.isNullOrEmpty(payloadStr) || configMap.isEmpty()) {
// If no payload or no comprehend config, pass through the request
logger.log("The Payload or Comprehend config is empty, return pass through requests");
return esClient.performRequest(request);
}
try {
JsonNode inputJson = ingestionSerializer.deserialize(payloadStr);
List<Callable<SingularResponse>> callableList = createCallableList(indexName, inputJson, configMap);
if (callableList.isEmpty()) {
// If no field matches, pass through the request
logger.log("No config field detected, return pass through requests");
return esClient.performRequest(request);
}
List<Future<SingularResponse>> executionResult = executorService.invokeAll(callableList,
Constants.INDEX_EXECUTOR_TIMEOUT_SECONDS, TimeUnit.SECONDS);
JSONObject payloadJson = new JSONObject(payloadStr);
// Attach the comprehend response
for (Future<SingularResponse> result : executionResult) {
try {
SingularResponse responseObject = result.get();
payloadJson.put(responseObject.getFieldNameAndOperation(),
removeResponseMetadata(responseObject.getComprehendResult()));
// Add flattened response for Kibana plotting
JSONObject flattenedResponse = responseObject.getFlattenedResult();
if (flattenedResponse != null) {
payloadJson.put(String.format("%s_%s", responseObject.getFieldNameAndOperation(),
Constants.KIBANA_KEY_NAME), removeResponseMetadata(flattenedResponse));
}
} catch (ExecutionException | InterruptedException e) {
logger.log("Got exception when retrieving the comprehend response: " + e);
throw new InternalErrorException(CustomerMessage.INTERNAL_ERROR);
}
}
payloadJson.put(Constants.TIME_STAMP_KEY, Instant.now().toString());
// Send the enriched request to ES
logger.log("Ingest Comprehend enriched results to OpenSearchService");
return esClient.performRequest(request.getMethod(), request.getEndpoint(), payloadJson.toString());
} catch (Exception e) {
logger.log("Exceptions happen when trying to process index request. " + e);
throw new InternalErrorException(CustomerMessage.INTERNAL_ERROR, e);
}
}