public void onTrigger()

in nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java [841:1032]


    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        OkHttpClient okHttpClient = okHttpClientAtomicReference.get();

        FlowFile requestFlowFile = session.get();

        // Checking to see if the property to put the body of the response in an attribute was set
        boolean putToAttribute = context.getProperty(RESPONSE_BODY_ATTRIBUTE_NAME).isSet();
        final int maxAttributeSize = putToAttribute ? context.getProperty(RESPONSE_BODY_ATTRIBUTE_SIZE).asInteger() : 256;

        if (requestFlowFile == null) {
            if (context.hasNonLoopConnection()) {
                return;
            }

            final String method = getRequestMethod(context, null);
            final Optional<HttpMethod> requestMethodFound = findRequestMethod(method);
            final HttpMethod requestMethod = requestMethodFound.orElse(HttpMethod.GET);
            if (requestMethod.isRequestBodySupported()) {
                return;
            } else if (putToAttribute) {
                requestFlowFile = session.create();
            }
        }

        final ComponentLog logger = getLogger();
        final UUID txId = UUID.randomUUID();

        FlowFile responseFlowFile = null;
        try {
            final String urlProperty = trimToEmpty(context.getProperty(HTTP_URL).evaluateAttributeExpressions(requestFlowFile).getValue());

            Request httpRequest = configureRequest(context, session, requestFlowFile, urlProperty);

            if (httpRequest.body() != null) {
                session.getProvenanceReporter().send(requestFlowFile, urlProperty, true);
            }

            final long startNanos = System.nanoTime();

            logger.debug("Request [{}] {} {} starting", txId, httpRequest.method(), httpRequest.url());
            try (Response responseHttp = okHttpClient.newCall(httpRequest).execute()) {
                final int statusCode = responseHttp.code();
                logger.info("Request [{}] {} {} HTTP {} [{}]", txId, httpRequest.method(), httpRequest.url(), statusCode, responseHttp.protocol());

                // store the status code and message
                String statusMessage = responseHttp.message();

                // Create a map of the status attributes that are always written to the request and response FlowFiles
                Map<String, String> statusAttributes = new HashMap<>();
                statusAttributes.put(STATUS_CODE, String.valueOf(statusCode));
                statusAttributes.put(STATUS_MESSAGE, statusMessage);
                statusAttributes.put(REQUEST_URL, urlProperty);
                statusAttributes.put(REQUEST_DURATION, Long.toString(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos)));
                statusAttributes.put(RESPONSE_URL, responseHttp.request().url().toString());
                statusAttributes.put(TRANSACTION_ID, txId.toString());

                if (requestFlowFile != null) {
                    requestFlowFile = session.putAllAttributes(requestFlowFile, statusAttributes);
                }

                boolean outputBodyToRequestAttribute = (!isSuccess(statusCode) || putToAttribute) && requestFlowFile != null;
                boolean outputBodyToResponseContent = (isSuccess(statusCode) && !putToAttribute) || context.getProperty(RESPONSE_GENERATION_REQUIRED).asBoolean();
                ResponseBody responseBody = responseHttp.body();
                boolean bodyExists = responseBody != null && !context.getProperty(RESPONSE_BODY_IGNORED).asBoolean();

                InputStream responseBodyStream = null;
                SoftLimitBoundedByteArrayOutputStream outputStreamToRequestAttribute = null;
                TeeInputStream teeInputStream = null;
                try {
                    responseBodyStream = bodyExists ? responseBody.byteStream() : null;
                    if (responseBodyStream != null && outputBodyToRequestAttribute && outputBodyToResponseContent) {
                        outputStreamToRequestAttribute = new SoftLimitBoundedByteArrayOutputStream(maxAttributeSize);
                        teeInputStream = new TeeInputStream(responseBodyStream, outputStreamToRequestAttribute);
                    }

                    if (outputBodyToResponseContent) {
                        /*
                         * If successful and putting to response flowfile, store the response body as the flowfile payload
                         * we include additional flowfile attributes including the response headers and the status codes.
                         */

                        // clone the flowfile to capture the response
                        if (requestFlowFile != null) {
                            responseFlowFile = session.create(requestFlowFile);
                        } else {
                            responseFlowFile = session.create();
                        }

                        // write attributes to response flowfile
                        responseFlowFile = session.putAllAttributes(responseFlowFile, statusAttributes);

                        // write the response headers as attributes
                        // this will overwrite any existing flowfile attributes
                        responseFlowFile = session.putAllAttributes(responseFlowFile, convertAttributesFromHeaders(responseHttp, ""));

                        // update FlowFile's filename attribute with an extracted value from the remote URL
                        if (FlowFileNamingStrategy.URL_PATH.equals(getFlowFileNamingStrategy(context)) && HttpMethod.GET.name().equals(httpRequest.method())) {
                            final URL url = URLValidator.createURL(urlProperty);
                            String fileName = getFileNameFromUrl(url);
                            if (fileName != null) {
                                responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.FILENAME.key(), fileName);
                            }
                        }

                        // transfer the message body to the payload
                        // can potentially be null in edge cases
                        if (bodyExists) {
                            // write content type attribute to response flowfile if it is available
                            final MediaType contentType = responseBody.contentType();
                            if (contentType != null) {
                                responseFlowFile = session.putAttribute(responseFlowFile, CoreAttributes.MIME_TYPE.key(), contentType.toString());
                            }
                            if (teeInputStream != null) {
                                responseFlowFile = session.importFrom(teeInputStream, responseFlowFile);
                            } else {
                                responseFlowFile = session.importFrom(responseBodyStream, responseFlowFile);
                            }

                            // emit provenance event
                            final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
                            if (requestFlowFile != null) {
                                session.getProvenanceReporter().fetch(responseFlowFile, urlProperty, millis);
                            } else {
                                session.getProvenanceReporter().receive(responseFlowFile, urlProperty, millis);
                            }
                        }
                    }

                    // if not successful and request flowfile is not null, store the response body into a flowfile attribute
                    if (outputBodyToRequestAttribute && bodyExists) {
                        String attributeKey = context.getProperty(RESPONSE_BODY_ATTRIBUTE_NAME).evaluateAttributeExpressions(requestFlowFile).getValue();
                        if (attributeKey == null) {
                            attributeKey = RESPONSE_BODY;
                        }
                        byte[] outputBuffer;
                        int size;

                        if (outputStreamToRequestAttribute != null) {
                            outputBuffer = outputStreamToRequestAttribute.getBuffer();
                            size = outputStreamToRequestAttribute.size();
                        } else {
                            outputBuffer = new byte[maxAttributeSize];
                            size = StreamUtils.fillBuffer(responseBodyStream, outputBuffer, false);
                        }
                        String bodyString = new String(outputBuffer, 0, size, getCharsetFromMediaType(responseBody.contentType()));
                        requestFlowFile = session.putAttribute(requestFlowFile, attributeKey, bodyString);

                        final long processingDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
                        final String eventDetails = String.format("Response Body Attribute Added [%s] Processing Duration [%d ms]", attributeKey, processingDuration);
                        session.getProvenanceReporter().modifyAttributes(requestFlowFile, eventDetails);
                    }
                } finally {
                    if (outputStreamToRequestAttribute != null) {
                        outputStreamToRequestAttribute.close();
                    }
                    if (teeInputStream != null) {
                        teeInputStream.close();
                    } else if (responseBodyStream != null) {
                        responseBodyStream.close();
                    }
                }

                // This needs to be done after the response flowFile has been created from the request flowFile
                // as the added attribute headers may have a prefix added that doesn't make sense for the response flowFile.
                if (context.getProperty(RESPONSE_HEADER_REQUEST_ATTRIBUTES_ENABLED).asBoolean() && requestFlowFile != null) {
                    final String prefix = context.getProperty(RESPONSE_HEADER_REQUEST_ATTRIBUTES_PREFIX).evaluateAttributeExpressions(requestFlowFile).getValue();

                    // write the response headers as attributes
                    // this will overwrite any existing flowfile attributes
                    requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(responseHttp, prefix));
                }

                route(requestFlowFile, responseFlowFile, session, context, statusCode);

            }
        } catch (final Exception e) {
            if (requestFlowFile == null) {
                logger.error("Request Processing failed", e);
                context.yield();
            } else {
                logger.error("Request Processing failed: {}", requestFlowFile, e);
                requestFlowFile = session.penalize(requestFlowFile);
                requestFlowFile = session.putAttribute(requestFlowFile, EXCEPTION_CLASS, e.getClass().getName());
                requestFlowFile = session.putAttribute(requestFlowFile, EXCEPTION_MESSAGE, e.getMessage());
                session.transfer(requestFlowFile, FAILURE);
            }

            if (responseFlowFile != null) {
                session.remove(responseFlowFile);
            }
        }
    }