public void onTrigger()

in nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java [237:511]


    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile fileToProcess = null;
        if (context.hasIncomingConnection()) {
            fileToProcess = session.get();

            // If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
            // However, if we have no FlowFile and we have connections coming from other Processors, then
            // we know that we should run only if we have a FlowFile.
            if (fileToProcess == null && context.hasNonLoopConnection()) {
                return;
            }
        }

        final List<FlowFile> resultSetFlowFiles = new ArrayList<>();

        final ComponentLog logger = getLogger();
        final int queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();
        final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions(fileToProcess).asInteger();
        final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
        final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
        final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();

        List<String> preQueries = getQueries(context.getProperty(SQL_PRE_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
        List<String> postQueries = getQueries(context.getProperty(SQL_POST_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());

        SqlWriter sqlWriter = configureSqlWriter(session, context, fileToProcess);

        String selectQuery;
        if (context.getProperty(SQL_QUERY).isSet()) {
            selectQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
        } else {
            // If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query.
            // If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled.
            final StringBuilder queryContents = new StringBuilder();
            session.read(fileToProcess, in -> queryContents.append(IOUtils.toString(in, Charset.defaultCharset())));
            selectQuery = queryContents.toString();
        }

        int resultCount = 0;
        try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes())) {
            final boolean isAutoCommit = con.getAutoCommit();
            final boolean setAutoCommitValue = context.getProperty(AUTO_COMMIT).asBoolean();
            // Only set auto-commit if necessary, log any "feature not supported" exceptions
            if (isAutoCommit != setAutoCommitValue) {
                try {
                    con.setAutoCommit(setAutoCommitValue);
                } catch (SQLFeatureNotSupportedException sfnse) {
                    logger.debug("setAutoCommit({}) not supported by this driver", setAutoCommitValue);
                }
            }
            try (final PreparedStatement st = con.prepareStatement(selectQuery)) {
                if (fetchSize != null && fetchSize > 0) {
                    try {
                        st.setFetchSize(fetchSize);
                    } catch (SQLException se) {
                        // Not all drivers support this, just log the error (at debug level) and move on
                        logger.debug("Cannot set fetch size to {} due to {}", fetchSize, se.getLocalizedMessage(), se);
                    }
                }
                st.setQueryTimeout(queryTimeout); // timeout in seconds

                // Execute pre-query, throw exception and cleanup Flow Files if fail
                Pair<String, SQLException> failure = executeConfigStatements(con, preQueries);
                if (failure != null) {
                    // In case of failure, assigning config query to "selectQuery" to follow current error handling
                    selectQuery = failure.getLeft();
                    throw failure.getRight();
                }

                final Map<String, SensitiveValueWrapper> sqlParameters = context.getProperties()
                        .entrySet()
                        .stream()
                        .filter(e -> e.getKey().isDynamic())
                        .collect(Collectors.toMap(e -> e.getKey().getName(), e -> new SensitiveValueWrapper(e.getValue(), e.getKey().isSensitive())));

                if (fileToProcess != null) {
                    for (Map.Entry<String, String> entry : fileToProcess.getAttributes().entrySet()) {
                        sqlParameters.put(entry.getKey(), new SensitiveValueWrapper(entry.getValue(), false));
                    }
                }

                if (!sqlParameters.isEmpty()) {
                    JdbcCommon.setSensitiveParameters(st, sqlParameters);
                }

                logger.debug("Executing query {}", selectQuery);

                int fragmentIndex = 0;
                final String fragmentId = UUID.randomUUID().toString();

                final StopWatch executionTime = new StopWatch(true);

                boolean hasResults = st.execute();

                long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS);

                boolean hasUpdateCount = st.getUpdateCount() != -1;

                Map<String, String> inputFileAttrMap = fileToProcess == null ? null : fileToProcess.getAttributes();
                String inputFileUUID = fileToProcess == null ? null : fileToProcess.getAttribute(CoreAttributes.UUID.key());
                while (hasResults || hasUpdateCount) {
                    //getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet
                    if (hasResults) {
                        final AtomicLong nrOfRows = new AtomicLong(0L);

                        try {
                            final ResultSet resultSet = st.getResultSet();
                            do {
                                final StopWatch fetchTime = new StopWatch(true);

                                FlowFile resultSetFF;
                                if (fileToProcess == null) {
                                    resultSetFF = session.create();
                                } else {
                                    resultSetFF = session.create(fileToProcess);
                                }

                                try {
                                    resultSetFF = session.write(resultSetFF, out -> {
                                        try {
                                            nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), null));
                                        } catch (Exception e) {
                                            throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
                                        }
                                    });

                                    // if fragmented ResultSet, determine if we should keep this fragment
                                    if (maxRowsPerFlowFile > 0 && nrOfRows.get() == 0 && fragmentIndex > 0) {
                                        // if row count is zero and this is not the first fragment, drop it instead of committing it.
                                        session.remove(resultSetFF);
                                        break;
                                    }

                                    long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);

                                    // set attributes
                                    final Map<String, String> attributesToAdd = new HashMap<>();
                                    if (inputFileAttrMap != null) {
                                        attributesToAdd.putAll(inputFileAttrMap);
                                    }
                                    attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
                                    attributesToAdd.put(RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
                                    attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
                                    attributesToAdd.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
                                    attributesToAdd.put(RESULTSET_INDEX, String.valueOf(resultCount));
                                    if (inputFileUUID != null) {
                                        attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID);
                                    }
                                    if (maxRowsPerFlowFile > 0) {
                                        // if fragmented ResultSet, set fragment attributes
                                        attributesToAdd.put(FRAGMENT_ID, fragmentId);
                                        attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(fragmentIndex));
                                    }
                                    attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
                                    resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd);
                                    sqlWriter.updateCounters(session);

                                    logger.info("{} contains {} records; transferring to 'success'", resultSetFF, nrOfRows.get());

                                    // Report a FETCH event if there was an incoming flow file, or a RECEIVE event otherwise
                                    if (context.hasIncomingConnection()) {
                                        session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
                                    } else {
                                        session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
                                    }
                                    resultSetFlowFiles.add(resultSetFF);

                                    // If we've reached the batch size, send out the flow files
                                    if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
                                        session.transfer(resultSetFlowFiles, REL_SUCCESS);
                                        // Need to remove the original input file if it exists
                                        if (fileToProcess != null) {
                                            session.remove(fileToProcess);
                                            fileToProcess = null;
                                        }

                                        session.commitAsync();
                                        resultSetFlowFiles.clear();
                                    }

                                    fragmentIndex++;
                                } catch (Exception e) {
                                    // Remove any result set flow file(s) and propagate the exception
                                    session.remove(resultSetFF);
                                    session.remove(resultSetFlowFiles);
                                    if (e instanceof ProcessException) {
                                        throw (ProcessException) e;
                                    } else {
                                        throw new ProcessException(e);
                                    }
                                }
                            } while (maxRowsPerFlowFile > 0 && nrOfRows.get() == maxRowsPerFlowFile);

                            // If we are splitting results but not outputting batches, set count on all FlowFiles
                            if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) {
                                for (int i = 0; i < resultSetFlowFiles.size(); i++) {
                                    resultSetFlowFiles.set(i,
                                            session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex)));
                                }
                            }
                        } catch (final SQLException e) {
                            throw new ProcessException(e);
                        }

                        resultCount++;
                    }

                    // are there anymore result sets?
                    try {
                        hasResults = st.getMoreResults(Statement.CLOSE_CURRENT_RESULT);
                        hasUpdateCount = st.getUpdateCount() != -1;
                    } catch (SQLException ex) {
                        hasResults = false;
                        hasUpdateCount = false;
                    }
                }

                // Execute post-query, throw exception and cleanup Flow Files if fail
                failure = executeConfigStatements(con, postQueries);
                if (failure != null) {
                    selectQuery = failure.getLeft();
                    resultSetFlowFiles.forEach(session::remove);
                    throw failure.getRight();
                }

                // If the auto commit is set to false, commit() is called for consistency
                if (!con.getAutoCommit()) {
                    con.commit();
                }

                // Transfer any remaining files to SUCCESS
                session.transfer(resultSetFlowFiles, REL_SUCCESS);
                resultSetFlowFiles.clear();

                if (fileToProcess != null) {
                    if (resultCount > 0) {
                        // If we had at least one result then it's OK to drop the original file
                        session.remove(fileToProcess);
                    } else {
                        // If we had no results then transfer the original flow file downstream to trigger processors
                        final ContentOutputStrategy contentOutputStrategy = context.getProperty(CONTENT_OUTPUT_STRATEGY).asAllowableValue(ContentOutputStrategy.class);
                        if (ContentOutputStrategy.ORIGINAL == contentOutputStrategy) {
                            session.transfer(fileToProcess, REL_SUCCESS);
                        } else {
                            // Set Empty Results as the default behavior based on strategy or null property
                            session.transfer(setFlowFileEmptyResults(session, fileToProcess, sqlWriter), REL_SUCCESS);
                        }
                    }
                } else if (resultCount == 0) {
                    // If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only)
                    // Then generate an empty Output FlowFile
                    FlowFile resultSetFF = session.create();
                    session.transfer(setFlowFileEmptyResults(session, resultSetFF, sqlWriter), REL_SUCCESS);
                }
            }
        } catch (final ProcessException | SQLException e) {
            //If we had at least one result then it's OK to drop the original file, but if we had no results then
            //  pass the original flow file down the line to trigger downstream processors
            if (fileToProcess == null) {
                // This can happen if any exceptions occur while setting up the connection, statement, etc.
                logger.error("Unable to execute SQL select query [{}]. No FlowFile to route to failure", selectQuery, e);
                context.yield();
            } else {
                if (context.hasIncomingConnection()) {
                    logger.error("Unable to execute SQL select query [{}] for {} routing to failure", selectQuery, fileToProcess, e);
                    fileToProcess = session.penalize(fileToProcess);
                } else {
                    logger.error("Unable to execute SQL select query [{}] routing to failure", selectQuery, e);
                    context.yield();
                }
                session.putAttribute(fileToProcess, RESULT_ERROR_MESSAGE, e.getMessage());
                session.transfer(fileToProcess, REL_FAILURE);
            }
        }
    }