in nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java [841:1032]
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
OkHttpClient okHttpClient = okHttpClientAtomicReference.get();
FlowFile requestFlowFile = session.get();
// Checking to see if the property to put the body of the response in an attribute was set
boolean putToAttribute = context.getProperty(RESPONSE_BODY_ATTRIBUTE_NAME).isSet();
final int maxAttributeSize = putToAttribute ? context.getProperty(RESPONSE_BODY_ATTRIBUTE_SIZE).asInteger() : 256;
if (requestFlowFile == null) {
if (context.hasNonLoopConnection()) {
return;
}
final String method = getRequestMethod(context, null);
final Optional<HttpMethod> requestMethodFound = findRequestMethod(method);
final HttpMethod requestMethod = requestMethodFound.orElse(HttpMethod.GET);
if (requestMethod.isRequestBodySupported()) {
return;
} else if (putToAttribute) {
requestFlowFile = session.create();
}
}
final ComponentLog logger = getLogger();
final UUID txId = UUID.randomUUID();
FlowFile responseFlowFile = null;
try {
final String urlProperty = trimToEmpty(context.getProperty(HTTP_URL).evaluateAttributeExpressions(requestFlowFile).getValue());
Request httpRequest = configureRequest(context, session, requestFlowFile, urlProperty);
if (httpRequest.body() != null) {
session.getProvenanceReporter().send(requestFlowFile, urlProperty, true);
}
final long startNanos = System.nanoTime();
logger.debug("Request [{}] {} {} starting", txId, httpRequest.method(), httpRequest.url());
try (Response responseHttp = okHttpClient.newCall(httpRequest).execute()) {
final int statusCode = responseHttp.code();
logger.info("Request [{}] {} {} HTTP {} [{}]", txId, httpRequest.method(), httpRequest.url(), statusCode, responseHttp.protocol());
// store the status code and message
String statusMessage = responseHttp.message();
// Create a map of the status attributes that are always written to the request and response FlowFiles
Map<String, String> statusAttributes = new HashMap<>();
statusAttributes.put(STATUS_CODE, String.valueOf(statusCode));
statusAttributes.put(STATUS_MESSAGE, statusMessage);
statusAttributes.put(REQUEST_URL, urlProperty);
statusAttributes.put(REQUEST_DURATION, Long.toString(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos)));
statusAttributes.put(RESPONSE_URL, responseHttp.request().url().toString());
statusAttributes.put(TRANSACTION_ID, txId.toString());
if (requestFlowFile != null) {
requestFlowFile = session.putAllAttributes(requestFlowFile, statusAttributes);
}
boolean outputBodyToRequestAttribute = (!isSuccess(statusCode) || putToAttribute) && requestFlowFile != null;
boolean outputBodyToResponseContent = (isSuccess(statusCode) && !putToAttribute) || context.getProperty(RESPONSE_GENERATION_REQUIRED).asBoolean();
ResponseBody responseBody = responseHttp.body();
boolean bodyExists = responseBody != null && !context.getProperty(RESPONSE_BODY_IGNORED).asBoolean();
InputStream responseBodyStream = null;
SoftLimitBoundedByteArrayOutputStream outputStreamToRequestAttribute = null;
TeeInputStream teeInputStream = null;
try {
responseBodyStream = bodyExists ? responseBody.byteStream() : null;
if (responseBodyStream != null && outputBodyToRequestAttribute && outputBodyToResponseContent) {
outputStreamToRequestAttribute = new SoftLimitBoundedByteArrayOutputStream(maxAttributeSize);
teeInputStream = new TeeInputStream(responseBodyStream, outputStreamToRequestAttribute);
}
if (outputBodyToResponseContent) {
/*
* If successful and putting to response flowfile, store the response body as the flowfile payload
* we include additional flowfile attributes including the response headers and the status codes.
*/
// clone the flowfile to capture the response
if (requestFlowFile != null) {
responseFlowFile = session.create(requestFlowFile);
} else {
responseFlowFile = session.create();
}
// write attributes to response flowfile
responseFlowFile = session.putAllAttributes(responseFlowFile, statusAttributes);
// write the response headers as attributes
// this will overwrite any existing flowfile attributes
responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(responseHttp, ""));
// update FlowFile's filename attribute with an extracted value from the remote URL
if (FlowFileNamingStrategy.URL_PATH.equals(getFlowFileNamingStrategy(context)) && HttpMethod.GET.name().equals(httpRequest.method())) {
final URL url = URLValidator.createURL(urlProperty);
String fileName = getFileNameFromUrl(url);
if (fileName != null) {
responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.FILENAME.key(), fileName);
}
}
// transfer the message body to the payload
// can potentially be null in edge cases
if (bodyExists) {
// write content type attribute to response flowfile if it is available
final MediaType contentType = responseBody.contentType();
if (contentType != null) {
responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.MIME_TYPE.key(), contentType.toString());
}
if (teeInputStream != null) {
responseFlowFile = session.importFrom(teeInputStream, responseFlowFile);
} else {
responseFlowFile = session.importFrom(responseBodyStream, responseFlowFile);
}
// emit provenance event
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
if (requestFlowFile != null) {
session.getProvenanceReporter().fetch(responseFlowFile, urlProperty, millis);
} else {
session.getProvenanceReporter().receive(responseFlowFile, urlProperty, millis);
}
}
}
// if not successful and request flowfile is not null, store the response body into a flowfile attribute
if (outputBodyToRequestAttribute && bodyExists) {
String attributeKey = context.getProperty(RESPONSE_BODY_ATTRIBUTE_NAME).evaluateAttributeExpressions(requestFlowFile).getValue();
if (attributeKey == null) {
attributeKey = RESPONSE_BODY;
}
byte[] outputBuffer;
int size;
if (outputStreamToRequestAttribute != null) {
outputBuffer = outputStreamToRequestAttribute.getBuffer();
size = outputStreamToRequestAttribute.size();
} else {
outputBuffer = new byte[maxAttributeSize];
size = StreamUtils.fillBuffer(responseBodyStream, outputBuffer, false);
}
String bodyString = new String(outputBuffer, 0, size, getCharsetFromMediaType(responseBody.contentType()));
requestFlowFile = session.putAttribute(requestFlowFile, attributeKey, bodyString);
final long processingDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
final String eventDetails = String.format("Response Body Attribute Added [%s] Processing Duration [%d ms]", attributeKey, processingDuration);
session.getProvenanceReporter().modifyAttributes(requestFlowFile, eventDetails);
}
} finally {
if (outputStreamToRequestAttribute != null) {
outputStreamToRequestAttribute.close();
}
if (teeInputStream != null) {
teeInputStream.close();
} else if (responseBodyStream != null) {
responseBodyStream.close();
}
}
// This needs to be done after the response flowFile has been created from the request flowFile
// as the added attribute headers may have a prefix added that doesn't make sense for the response flowFile.
if (context.getProperty(RESPONSE_HEADER_REQUEST_ATTRIBUTES_ENABLED).asBoolean() && requestFlowFile != null) {
final String prefix = context.getProperty(RESPONSE_HEADER_REQUEST_ATTRIBUTES_PREFIX).evaluateAttributeExpressions(requestFlowFile).getValue();
// write the response headers as attributes
// this will overwrite any existing flowfile attributes
requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(responseHttp, prefix));
}
route(requestFlowFile, responseFlowFile, session, context, statusCode);
}
} catch (final Exception e) {
if (requestFlowFile == null) {
logger.error("Request Processing failed", e);
context.yield();
} else {
logger.error("Request Processing failed: {}", requestFlowFile, e);
requestFlowFile = session.penalize(requestFlowFile);
requestFlowFile = session.putAttribute(requestFlowFile, EXCEPTION_CLASS, e.getClass().getName());
requestFlowFile = session.putAttribute(requestFlowFile, EXCEPTION_MESSAGE, e.getMessage());
session.transfer(requestFlowFile, FAILURE);
}
if (responseFlowFile != null) {
session.remove(responseFlowFile);
}
}
}