public Response processRequest()

in source/src/main/java/com/amazonaws/comprehend/esproxy/lambda/processor/BulkProcessor.java [84:140]


    public Response processRequest(@NonNull final Request request, @NonNull final LambdaLogger logger) {
        logger.log("Bulk requests detected");
        String payloadStr = HTTPTransformer.transformHttpEntityToString(request.getEntity());
        Map<String, ComprehendConfiguration> configMap = configRetriever.retrieveStoredConfig();

        if (Strings.isNullOrEmpty(payloadStr) || configMap.isEmpty()) {
            logger.log("Payload or Comprehend config is empty, return pass through requests");
            return esClient.performRequest(request);
        }
        // Transfer the payload to array list, split by lineSeparator
        List<String> payloadList = Arrays.asList(payloadStr.split(System.lineSeparator()));

        // Stores where the ingestion Payload was detected: Map<contentRowNum, ingestionPayload>
        Map<Integer, BulkPayload> payloadMap = getIngestionPayloadMap(payloadList);
        if (payloadMap.isEmpty()) {
            logger.log("No ingestion requests detected, return pass through requests");
            return esClient.performRequest(request);
        }

        // The batchObject list that needs to be comprehend
        List<BulkRequest> bulkRequestList = getBatchObjectList(payloadMap, configMap);
        if (bulkRequestList.isEmpty()) {
            logger.log("No config field was detected in the bulk request, return pass through requests");
            return esClient.performRequest(request);
        }

        // Create the batch requests list
        List<Callable<BatchResponse>> callableList = createCallableList(bulkRequestList);
        try {
            List<Future<BatchResponse>> executionResult = executorService.invokeAll(callableList,
                    Constants.BULK_EXECUTOR_TIMEOUT_SECONDS, TimeUnit.SECONDS);
            for (Future<BatchResponse> result : executionResult) {
                try {
                    // For each executionResult, attach the comprehend response
                    attachComprehendResponse(payloadList, result, logger);
                    logger.log("Extended original payload with Comprehend result");
                } catch (RuntimeException e) {
                    throw new InternalErrorException(CustomerMessage.INTERNAL_ERROR);
                }
            }
            // Send the enriched request to ES
            Request transformedRequest = new Request(request.getMethod(), request.getEndpoint());
            transformedRequest.setJsonEntity(String.join(System.lineSeparator(), payloadList) + System.lineSeparator());

            logger.log("Ingest Comprehend enriched bulk results to OpenSearchService");
            Response response = esClient.performRequest(transformedRequest);

            logger.log(String.format("Response of bulk request: statusCode = %d, reasonPhrase = %s",
                                     response.getStatusLine().getStatusCode(), response.getStatusLine().getReasonPhrase()));

            return response;
        } catch (InterruptedException | JSONException e) {
            logger.log("InternalErrorException happened when trying to process bulk request. " + e);
            throw new InternalErrorException(CustomerMessage.INTERNAL_ERROR, e);
        }

    }