in source/src/main/java/com/amazonaws/comprehend/esproxy/lambda/processor/BulkProcessor.java [84:140]
public Response processRequest(@NonNull final Request request, @NonNull final LambdaLogger logger) {
logger.log("Bulk requests detected");
String payloadStr = HTTPTransformer.transformHttpEntityToString(request.getEntity());
Map<String, ComprehendConfiguration> configMap = configRetriever.retrieveStoredConfig();
if (Strings.isNullOrEmpty(payloadStr) || configMap.isEmpty()) {
logger.log("Payload or Comprehend config is empty, return pass through requests");
return esClient.performRequest(request);
}
// Transfer the payload to array list, split by lineSeparator
List<String> payloadList = Arrays.asList(payloadStr.split(System.lineSeparator()));
// Stores where the ingestion Payload was detected: Map<contentRowNum, ingestionPayload>
Map<Integer, BulkPayload> payloadMap = getIngestionPayloadMap(payloadList);
if (payloadMap.isEmpty()) {
logger.log("No ingestion requests detected, return pass through requests");
return esClient.performRequest(request);
}
// The batchObject list that needs to be comprehend
List<BulkRequest> bulkRequestList = getBatchObjectList(payloadMap, configMap);
if (bulkRequestList.isEmpty()) {
logger.log("No config field was detected in the bulk request, return pass through requests");
return esClient.performRequest(request);
}
// Create the batch requests list
List<Callable<BatchResponse>> callableList = createCallableList(bulkRequestList);
try {
List<Future<BatchResponse>> executionResult = executorService.invokeAll(callableList,
Constants.BULK_EXECUTOR_TIMEOUT_SECONDS, TimeUnit.SECONDS);
for (Future<BatchResponse> result : executionResult) {
try {
// For each executionResult, attach the comprehend response
attachComprehendResponse(payloadList, result, logger);
logger.log("Extended original payload with Comprehend result");
} catch (RuntimeException e) {
throw new InternalErrorException(CustomerMessage.INTERNAL_ERROR);
}
}
// Send the enriched request to ES
Request transformedRequest = new Request(request.getMethod(), request.getEndpoint());
transformedRequest.setJsonEntity(String.join(System.lineSeparator(), payloadList) + System.lineSeparator());
logger.log("Ingest Comprehend enriched bulk results to OpenSearchService");
Response response = esClient.performRequest(transformedRequest);
logger.log(String.format("Response of bulk request: statusCode = %d, reasonPhrase = %s",
response.getStatusLine().getStatusCode(), response.getStatusLine().getReasonPhrase()));
return response;
} catch (InterruptedException | JSONException e) {
logger.log("InternalErrorException happened when trying to process bulk request. " + e);
throw new InternalErrorException(CustomerMessage.INTERNAL_ERROR, e);
}
}