in nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java [237:511]
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile fileToProcess = null;
if (context.hasIncomingConnection()) {
fileToProcess = session.get();
// If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
// However, if we have no FlowFile and we have connections coming from other Processors, then
// we know that we should run only if we have a FlowFile.
if (fileToProcess == null && context.hasNonLoopConnection()) {
return;
}
}
final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
final ComponentLog logger = getLogger();
final int queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();
final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions(fileToProcess).asInteger();
final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField;
final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
List<String> preQueries = getQueries(context.getProperty(SQL_PRE_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
List<String> postQueries = getQueries(context.getProperty(SQL_POST_QUERY).evaluateAttributeExpressions(fileToProcess).getValue());
SqlWriter sqlWriter = configureSqlWriter(session, context, fileToProcess);
String selectQuery;
if (context.getProperty(SQL_QUERY).isSet()) {
selectQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
} else {
// If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query.
// If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled.
final StringBuilder queryContents = new StringBuilder();
session.read(fileToProcess, in -> queryContents.append(IOUtils.toString(in, Charset.defaultCharset())));
selectQuery = queryContents.toString();
}
int resultCount = 0;
try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes())) {
final boolean isAutoCommit = con.getAutoCommit();
final boolean setAutoCommitValue = context.getProperty(AUTO_COMMIT).asBoolean();
// Only set auto-commit if necessary, log any "feature not supported" exceptions
if (isAutoCommit != setAutoCommitValue) {
try {
con.setAutoCommit(setAutoCommitValue);
} catch (SQLFeatureNotSupportedException sfnse) {
logger.debug("setAutoCommit({}) not supported by this driver", setAutoCommitValue);
}
}
try (final PreparedStatement st = con.prepareStatement(selectQuery)) {
if (fetchSize != null && fetchSize > 0) {
try {
st.setFetchSize(fetchSize);
} catch (SQLException se) {
// Not all drivers support this, just log the error (at debug level) and move on
logger.debug("Cannot set fetch size to {} due to {}", fetchSize, se.getLocalizedMessage(), se);
}
}
st.setQueryTimeout(queryTimeout); // timeout in seconds
// Execute pre-query, throw exception and cleanup Flow Files if fail
Pair<String, SQLException> failure = executeConfigStatements(con, preQueries);
if (failure != null) {
// In case of failure, assigning config query to "selectQuery" to follow current error handling
selectQuery = failure.getLeft();
throw failure.getRight();
}
final Map<String, SensitiveValueWrapper> sqlParameters = context.getProperties()
.entrySet()
.stream()
.filter(e -> e.getKey().isDynamic())
.collect(Collectors.toMap(e -> e.getKey().getName(), e -> new SensitiveValueWrapper(e.getValue(), e.getKey().isSensitive())));
if (fileToProcess != null) {
for (Map.Entry<String, String> entry : fileToProcess.getAttributes().entrySet()) {
sqlParameters.put(entry.getKey(), new SensitiveValueWrapper(entry.getValue(), false));
}
}
if (!sqlParameters.isEmpty()) {
JdbcCommon.setSensitiveParameters(st, sqlParameters);
}
logger.debug("Executing query {}", selectQuery);
int fragmentIndex = 0;
final String fragmentId = UUID.randomUUID().toString();
final StopWatch executionTime = new StopWatch(true);
boolean hasResults = st.execute();
long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS);
boolean hasUpdateCount = st.getUpdateCount() != -1;
Map<String, String> inputFileAttrMap = fileToProcess == null ? null : fileToProcess.getAttributes();
String inputFileUUID = fileToProcess == null ? null : fileToProcess.getAttribute(CoreAttributes.UUID.key());
while (hasResults || hasUpdateCount) {
//getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet
if (hasResults) {
final AtomicLong nrOfRows = new AtomicLong(0L);
try {
final ResultSet resultSet = st.getResultSet();
do {
final StopWatch fetchTime = new StopWatch(true);
FlowFile resultSetFF;
if (fileToProcess == null) {
resultSetFF = session.create();
} else {
resultSetFF = session.create(fileToProcess);
}
try {
resultSetFF = session.write(resultSetFF, out -> {
try {
nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), null));
} catch (Exception e) {
throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e);
}
});
// if fragmented ResultSet, determine if we should keep this fragment
if (maxRowsPerFlowFile > 0 && nrOfRows.get() == 0 && fragmentIndex > 0) {
// if row count is zero and this is not the first fragment, drop it instead of committing it.
session.remove(resultSetFF);
break;
}
long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS);
// set attributes
final Map<String, String> attributesToAdd = new HashMap<>();
if (inputFileAttrMap != null) {
attributesToAdd.putAll(inputFileAttrMap);
}
attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
attributesToAdd.put(RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed));
attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed));
attributesToAdd.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
attributesToAdd.put(RESULTSET_INDEX, String.valueOf(resultCount));
if (inputFileUUID != null) {
attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID);
}
if (maxRowsPerFlowFile > 0) {
// if fragmented ResultSet, set fragment attributes
attributesToAdd.put(FRAGMENT_ID, fragmentId);
attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(fragmentIndex));
}
attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd);
sqlWriter.updateCounters(session);
logger.info("{} contains {} records; transferring to 'success'", resultSetFF, nrOfRows.get());
// Report a FETCH event if there was an incoming flow file, or a RECEIVE event otherwise
if (context.hasIncomingConnection()) {
session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
} else {
session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
}
resultSetFlowFiles.add(resultSetFF);
// If we've reached the batch size, send out the flow files
if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
session.transfer(resultSetFlowFiles, REL_SUCCESS);
// Need to remove the original input file if it exists
if (fileToProcess != null) {
session.remove(fileToProcess);
fileToProcess = null;
}
session.commitAsync();
resultSetFlowFiles.clear();
}
fragmentIndex++;
} catch (Exception e) {
// Remove any result set flow file(s) and propagate the exception
session.remove(resultSetFF);
session.remove(resultSetFlowFiles);
if (e instanceof ProcessException) {
throw (ProcessException) e;
} else {
throw new ProcessException(e);
}
}
} while (maxRowsPerFlowFile > 0 && nrOfRows.get() == maxRowsPerFlowFile);
// If we are splitting results but not outputting batches, set count on all FlowFiles
if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) {
for (int i = 0; i < resultSetFlowFiles.size(); i++) {
resultSetFlowFiles.set(i,
session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex)));
}
}
} catch (final SQLException e) {
throw new ProcessException(e);
}
resultCount++;
}
// are there anymore result sets?
try {
hasResults = st.getMoreResults(Statement.CLOSE_CURRENT_RESULT);
hasUpdateCount = st.getUpdateCount() != -1;
} catch (SQLException ex) {
hasResults = false;
hasUpdateCount = false;
}
}
// Execute post-query, throw exception and cleanup Flow Files if fail
failure = executeConfigStatements(con, postQueries);
if (failure != null) {
selectQuery = failure.getLeft();
resultSetFlowFiles.forEach(session::remove);
throw failure.getRight();
}
// If the auto commit is set to false, commit() is called for consistency
if (!con.getAutoCommit()) {
con.commit();
}
// Transfer any remaining files to SUCCESS
session.transfer(resultSetFlowFiles, REL_SUCCESS);
resultSetFlowFiles.clear();
if (fileToProcess != null) {
if (resultCount > 0) {
// If we had at least one result then it's OK to drop the original file
session.remove(fileToProcess);
} else {
// If we had no results then transfer the original flow file downstream to trigger processors
final ContentOutputStrategy contentOutputStrategy = context.getProperty(CONTENT_OUTPUT_STRATEGY).asAllowableValue(ContentOutputStrategy.class);
if (ContentOutputStrategy.ORIGINAL == contentOutputStrategy) {
session.transfer(fileToProcess, REL_SUCCESS);
} else {
// Set Empty Results as the default behavior based on strategy or null property
session.transfer(setFlowFileEmptyResults(session, fileToProcess, sqlWriter), REL_SUCCESS);
}
}
} else if (resultCount == 0) {
// If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only)
// Then generate an empty Output FlowFile
FlowFile resultSetFF = session.create();
session.transfer(setFlowFileEmptyResults(session, resultSetFF, sqlWriter), REL_SUCCESS);
}
}
} catch (final ProcessException | SQLException e) {
//If we had at least one result then it's OK to drop the original file, but if we had no results then
// pass the original flow file down the line to trigger downstream processors
if (fileToProcess == null) {
// This can happen if any exceptions occur while setting up the connection, statement, etc.
logger.error("Unable to execute SQL select query [{}]. No FlowFile to route to failure", selectQuery, e);
context.yield();
} else {
if (context.hasIncomingConnection()) {
logger.error("Unable to execute SQL select query [{}] for {} routing to failure", selectQuery, fileToProcess, e);
fileToProcess = session.penalize(fileToProcess);
} else {
logger.error("Unable to execute SQL select query [{}] routing to failure", selectQuery, e);
context.yield();
}
session.putAttribute(fileToProcess, RESULT_ERROR_MESSAGE, e.getMessage());
session.transfer(fileToProcess, REL_FAILURE);
}
}
}