public Response processRequest()

in source/src/main/java/com/amazonaws/comprehend/esproxy/lambda/processor/IndexProcessor.java [81:135]


    public Response processRequest(@NonNull final Request request, @NonNull final LambdaLogger logger) {
        logger.log("Index request detected");
        String payloadStr = HTTPTransformer.transformHttpEntityToString(request.getEntity());
        Map<String, ComprehendConfiguration> configMap = configRetriever.retrieveStoredConfig();
        String indexName = RequestIdentifier.getIndexName(request);

        if (Strings.isNullOrEmpty(payloadStr) || configMap.isEmpty()) {
            // If no payload or no comprehend config, pass through the request
            logger.log("The Payload or Comprehend config is empty, return pass through requests");
            return esClient.performRequest(request);
        }

        try {
            JsonNode inputJson = ingestionSerializer.deserialize(payloadStr);
            List<Callable<SingularResponse>> callableList = createCallableList(indexName, inputJson, configMap);

            if (callableList.isEmpty()) {
                // If no field matches, pass through the request
                logger.log("No config field detected, return pass through requests");
                return esClient.performRequest(request);
            }

            List<Future<SingularResponse>> executionResult = executorService.invokeAll(callableList,
                    Constants.INDEX_EXECUTOR_TIMEOUT_SECONDS, TimeUnit.SECONDS);
            JSONObject payloadJson = new JSONObject(payloadStr);

            // Attach the comprehend response
            for (Future<SingularResponse> result : executionResult) {
                try {
                    SingularResponse responseObject = result.get();
                    payloadJson.put(responseObject.getFieldNameAndOperation(),
                            removeResponseMetadata(responseObject.getComprehendResult()));

                    // Add flattened response for Kibana plotting
                    JSONObject flattenedResponse = responseObject.getFlattenedResult();
                    if (flattenedResponse != null) {
                        payloadJson.put(String.format("%s_%s", responseObject.getFieldNameAndOperation(),
                                Constants.KIBANA_KEY_NAME), removeResponseMetadata(flattenedResponse));
                    }
                } catch (ExecutionException | InterruptedException e) {
                    logger.log("Got exception when retrieving the comprehend response: " + e);
                    throw new InternalErrorException(CustomerMessage.INTERNAL_ERROR);
                }
            }
            payloadJson.put(Constants.TIME_STAMP_KEY, Instant.now().toString());

            // Send the enriched request to ES
            logger.log("Ingest Comprehend enriched results to OpenSearchService");
            return esClient.performRequest(request.getMethod(), request.getEndpoint(), payloadJson.toString());
        } catch (Exception e) {
            logger.log("Exceptions happen when trying to process index request. " + e);
            throw new InternalErrorException(CustomerMessage.INTERNAL_ERROR, e);
        }

    }