public void onTrigger()

in nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java [239:528]


    public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
        // Fetch the column/table info once
        if (!setupComplete.get()) {
            super.setup(context);
        }
        ProcessSession session = sessionFactory.createSession();
        final List<FlowFile> resultSetFlowFiles = new ArrayList<>();

        final ComponentLog logger = getLogger();

        final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
        final DatabaseDialectService databaseDialectService = getDatabaseDialectService(context);
        final String databaseType = context.getProperty(DB_TYPE).getValue();
        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
        final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue();
        final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
        final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
        final String initialLoadStrategy = context.getProperty(INITIAL_LOAD_STRATEGY).getValue();
        final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue();
        final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
        final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
        final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
        final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
        final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
        final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
                ? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger()
                : 0;
        final Integer transIsolationLevel = context.getProperty(TRANS_ISOLATION_LEVEL).isSet()
                ? context.getProperty(TRANS_ISOLATION_LEVEL).asInteger()
                : null;

        SqlWriter sqlWriter = configureSqlWriter(session, context);

        final StateMap stateMap;
        try {
            stateMap = session.getState(Scope.CLUSTER);
        } catch (final IOException ioe) {
            getLogger().error("Failed to retrieve observed maximum values from the State Manager. Will not perform "
                    + "query until this is accomplished.", ioe);
            context.yield();
            return;
        }

        // Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually
        // set as the current state map (after the session has been committed)
        final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());

        //If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map
        for (final Map.Entry<String, String> maxProp : maxValueProperties.entrySet()) {
            String maxPropKey = maxProp.getKey().toLowerCase();
            String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey);
            if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) {
                String newMaxPropValue;
                // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
                // the value has been stored under a key that is only the column name. Fall back to check the column name,
                // but store the new initial max value under the fully-qualified key.
                if (statePropertyMap.containsKey(maxPropKey)) {
                    newMaxPropValue = statePropertyMap.get(maxPropKey);
                } else {
                    newMaxPropValue = maxProp.getValue();
                }
                statePropertyMap.put(fullyQualifiedMaxPropKey, newMaxPropValue);

            }
        }

        List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
                ? null
                : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));

        if (maxValueColumnNameList != null && statePropertyMap.isEmpty() && initialLoadStrategy.equals(INITIAL_LOAD_STRATEGY_NEW_ROWS.getValue())) {
            final List<ColumnDefinition> maxValueColumnDefinitions = maxValueColumnNameList.stream()
                    .map(columnName -> String.format("MAX(%s) %s", columnName, columnName))
                    .map(StandardColumnDefinition::new)
                    .map(ColumnDefinition.class::cast)
                    .toList();
            final TableDefinition tableDefinition = new TableDefinition(Optional.empty(), Optional.empty(), tableName, maxValueColumnDefinitions);
            final QueryStatementRequest statementRequest = new StandardQueryStatementRequest(StatementType.SELECT, tableDefinition);
            final StatementResponse maxValueStatementResponse = databaseDialectService.getStatement(statementRequest);
            final String selectMaxQuery = maxValueStatementResponse.sql();

            try (final Connection con = dbcpService.getConnection(Collections.emptyMap());
                 final Statement st = con.createStatement()) {

                if (transIsolationLevel != null) {
                    con.setTransactionIsolation(transIsolationLevel);
                }

                st.setQueryTimeout(queryTimeout); // timeout in seconds

                try (final ResultSet resultSet = st.executeQuery(selectMaxQuery)) {
                    if (resultSet.next()) {
                        final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap);
                        maxValCollector.processRow(resultSet);
                        maxValCollector.applyStateChanges();
                    }
                }
            } catch (final Exception e) {
                logger.error("Unable to execute SQL select query {} due to {}", selectMaxQuery, e);
                context.yield();
            }
        }

        final List<String> parsedColumnNames;
        if (columnNames == null) {
            parsedColumnNames = List.of();
        } else {
            parsedColumnNames = Arrays.asList(columnNames.split(", "));
        }

        final String selectQuery = getQuery(databaseDialectService, databaseType, tableName, sqlQuery, parsedColumnNames, maxValueColumnNameList, customWhereClause, statePropertyMap);
        final StopWatch stopWatch = new StopWatch(true);
        final String fragmentIdentifier = UUID.randomUUID().toString();

        try (final Connection con = dbcpService.getConnection(Collections.emptyMap());
             final Statement st = con.createStatement()) {

            if (fetchSize != null && fetchSize > 0) {
                try {
                    st.setFetchSize(fetchSize);
                } catch (SQLException se) {
                    // Not all drivers support this, just log the error (at debug level) and move on
                    logger.debug("Cannot set fetch size to {} due to {}", fetchSize, se.getLocalizedMessage(), se);
                }
            }

            if (transIsolationLevel != null) {
                con.setTransactionIsolation(transIsolationLevel);
            }

            String jdbcURL = "DBCPService";
            try {
                DatabaseMetaData databaseMetaData = con.getMetaData();
                if (databaseMetaData != null) {
                    jdbcURL = databaseMetaData.getURL();
                }
            } catch (SQLException ignored) {
                // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
            }

            st.setQueryTimeout(queryTimeout); // timeout in seconds
            if (logger.isDebugEnabled()) {
                logger.debug("Executing query {}", selectQuery);
            }

            final boolean originalAutoCommit = con.getAutoCommit();
            final Boolean setAutoCommitValue = context.getProperty(AUTO_COMMIT).evaluateAttributeExpressions().asBoolean();
            // If user sets AUTO_COMMIT property to non-null (i.e. true or false), then the property value overrides the dbAdapter's value
            if (setAutoCommitValue != null && originalAutoCommit != setAutoCommitValue) {
                try {
                    con.setAutoCommit(setAutoCommitValue);
                    logger.debug("Driver connection changed to setAutoCommit({})", setAutoCommitValue);
                } catch (Exception ex) {
                    logger.debug("Failed to setAutoCommit({}) due to {}: {}",
                            setAutoCommitValue, ex.getClass().getName(), ex.getMessage());
                }
            }

            try (final ResultSet resultSet = st.executeQuery(selectQuery)) {
                int fragmentIndex = 0;
                // Max values will be updated in the state property map by the callback
                final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap);

                while (true) {
                    final AtomicLong nrOfRows = new AtomicLong(0L);

                    FlowFile fileToProcess = session.create();
                    try {
                        fileToProcess = session.write(fileToProcess, out -> {
                            try {
                                nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), maxValCollector));
                            } catch (Exception e) {
                                throw new ProcessException("Error during database query or conversion of records.", e);
                            }
                        });
                    } catch (ProcessException e) {
                        // Add flowfile to results before rethrowing so it will be removed from session in outer catch
                        resultSetFlowFiles.add(fileToProcess);
                        throw e;
                    }

                    if (nrOfRows.get() > 0) {
                        // set attributes
                        final Map<String, String> attributesToAdd = new HashMap<>();
                        attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
                        attributesToAdd.put(RESULT_TABLENAME, tableName);

                        if (maxRowsPerFlowFile > 0) {
                            attributesToAdd.put(FRAGMENT_ID, fragmentIdentifier);
                            attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(fragmentIndex));
                        }

                        attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
                        fileToProcess = session.putAllAttributes(fileToProcess, attributesToAdd);
                        sqlWriter.updateCounters(session);

                        logger.debug("{} contains {} records; transferring to 'success'",
                                fileToProcess, nrOfRows.get());

                        session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                        resultSetFlowFiles.add(fileToProcess);
                        // If we've reached the batch size, send out the flow files
                        if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
                            session.transfer(resultSetFlowFiles, REL_SUCCESS);
                            session.commitAsync();
                            resultSetFlowFiles.clear();
                        }
                    } else {
                        // If there were no rows returned, don't send the flowfile
                        session.remove(fileToProcess);
                        // If no rows and this was first FlowFile, yield
                        if (fragmentIndex == 0) {
                            context.yield();
                        }
                        break;
                    }

                    fragmentIndex++;
                    if (maxFragments > 0 && fragmentIndex >= maxFragments) {
                        break;
                    }

                    // If we aren't splitting up the data into flow files or fragments, then the result set has been entirely fetched so don't loop back around
                    if (maxFragments == 0 && maxRowsPerFlowFile == 0) {
                        break;
                    }

                    // If we are splitting up the data into flow files, don't loop back around if we've gotten all results
                    if (maxRowsPerFlowFile > 0 && nrOfRows.get() < maxRowsPerFlowFile) {
                        break;
                    }
                }

                // Apply state changes from the Max Value tracker
                maxValCollector.applyStateChanges();

                // Even though the maximum value and total count are known at this point, to maintain consistent behavior if Output Batch Size is set, do not store the attributes
                if (outputBatchSize == 0) {
                    for (int i = 0; i < resultSetFlowFiles.size(); i++) {
                        final Map<String, String> newAttributesMap = new HashMap<>();

                        // Add maximum values as attributes
                        for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) {
                            // Get just the column name from the key
                            String key = entry.getKey();
                            String colName = key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + NAMESPACE_DELIMITER.length());
                            newAttributesMap.put("maxvalue." + colName, entry.getValue());
                        }

                        // Set count for all FlowFiles
                        if (maxRowsPerFlowFile > 0) {
                            newAttributesMap.put(FRAGMENT_COUNT, Integer.toString(fragmentIndex));
                        }

                        resultSetFlowFiles.set(i, session.putAllAttributes(resultSetFlowFiles.get(i), newAttributesMap));
                    }
                }
            } catch (final SQLException e) {
                throw e;
            } finally {
                if (con.getAutoCommit() != originalAutoCommit) {
                    try {
                        con.setAutoCommit(originalAutoCommit);
                        logger.debug("Driver connection reset to original setAutoCommit({})", originalAutoCommit);
                    } catch (Exception ex) {
                        logger.debug("Failed to setAutoCommit({}) due to {}: {}",
                                originalAutoCommit, ex.getClass().getName(), ex.getMessage());
                    }
                }
            }

            session.transfer(resultSetFlowFiles, REL_SUCCESS);

        } catch (final ProcessException | SQLException e) {
            logger.error("Unable to execute SQL select query {} due to {}", selectQuery, e);
            if (!resultSetFlowFiles.isEmpty()) {
                session.remove(resultSetFlowFiles);
            }
            context.yield();
        } finally {
            try {
                // Update the state
                session.setState(statePropertyMap, Scope.CLUSTER);
            } catch (IOException ioe) {
                getLogger().error("{} failed to update State Manager, maximum observed values will not be recorded", this, ioe);
            }

            session.commitAsync();
        }
    }