in nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java [258:590]
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
// Fetch the column/table info once (if the table name and max value columns are not dynamic). Otherwise do the setup later
if (!isDynamicTableName && !isDynamicMaxValues && !setupComplete.get()) {
super.setup(context);
}
ProcessSession session = sessionFactory.createSession();
FlowFile fileToProcess = null;
if (context.hasIncomingConnection()) {
fileToProcess = session.get();
if (fileToProcess == null) {
// Incoming connection with no FlowFile available, do no work (see capability description)
return;
}
}
maxValueProperties = getDefaultMaxValueProperties(context, fileToProcess);
final ComponentLog logger = getLogger();
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final DatabaseDialectService databaseDialectService = getDatabaseDialectService(context);
final String databaseType = context.getProperty(DB_TYPE).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(fileToProcess).getValue();
final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
final int partitionSize = context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
final String columnForPartitioning = context.getProperty(COLUMN_FOR_VALUE_PARTITIONING).evaluateAttributeExpressions(fileToProcess).getValue();
final boolean useColumnValsForPaging = !StringUtils.isEmpty(columnForPartitioning);
final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions(fileToProcess).getValue();
final String customOrderByColumn = context.getProperty(CUSTOM_ORDERBY_COLUMN).evaluateAttributeExpressions(fileToProcess).getValue();
final boolean outputEmptyFlowFileOnZeroResults = context.getProperty(OUTPUT_EMPTY_FLOWFILE_ON_ZERO_RESULTS).asBoolean();
final StateMap stateMap;
FlowFile finalFileToProcess = fileToProcess;
try {
stateMap = session.getState(Scope.CLUSTER);
} catch (final IOException ioe) {
logger.error("Failed to retrieve observed maximum values from the State Manager. Will not perform "
+ "query until this is accomplished.", ioe);
context.yield();
return;
}
try {
// Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually
// set as the current state map (after the session has been committed)
final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());
// If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map
for (final Map.Entry<String, String> maxProp : maxValueProperties.entrySet()) {
String maxPropKey = maxProp.getKey().toLowerCase();
String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey);
if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) {
String newMaxPropValue;
// If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme)
// the value has been stored under a key that is only the column name. Fall back to check the column name,
// but store the new initial max value under the fully-qualified key.
if (statePropertyMap.containsKey(maxPropKey)) {
newMaxPropValue = statePropertyMap.get(maxPropKey);
} else {
newMaxPropValue = maxProp.getValue();
}
statePropertyMap.put(fullyQualifiedMaxPropKey, newMaxPropValue);
}
}
// Build a WHERE clause with maximum-value columns (if they exist), and a list of column names that will contain MAX(<column>) aliases. The
// executed SQL query will retrieve the count of all records after the filter(s) have been applied, as well as the new maximum values for the
// specified columns. This allows the processor to generate the correctly partitioned SQL statements as well as to update the state with the
// latest observed maximum values.
List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
? new ArrayList<>(0)
: Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
final int numMaxValueColumns = maxValueColumnNameList.size();
List<String> maxValueClauses = new ArrayList<>(numMaxValueColumns);
Long maxValueForPartitioning = null;
Long minValueForPartitioning = null;
List<String> maxValueSelectColumns = new ArrayList<>(numMaxValueColumns + 1);
// replace unnecessary row count with -1 stub value when column values for paging is used, or when partition size is zero.
if (useColumnValsForPaging || partitionSize == 0) {
maxValueSelectColumns.add("-1");
} else {
maxValueSelectColumns.add("COUNT(*)");
}
// For each maximum-value column, get a WHERE filter and a MAX(column) alias
IntStream.range(0, numMaxValueColumns).forEach((index) -> {
String colName = maxValueColumnNameList.get(index);
maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
if (!StringUtils.isEmpty(maxValue)) {
if (columnTypeMap.isEmpty() || getColumnType(tableName, colName) == null) {
// This means column type cache is clean after instance reboot. We should re-cache column type
super.setup(context, false, finalFileToProcess);
}
Integer type = getColumnType(tableName, colName);
// Add a condition for the WHERE clause
maxValueClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, databaseType));
}
});
// If we are using a columns' values, get the maximum and minimum values in the context of the aforementioned WHERE clause
if (useColumnValsForPaging) {
if (columnForPartitioning.contains(",")) {
throw new ProcessException(COLUMN_FOR_VALUE_PARTITIONING.getDisplayName() + " requires a single column name, but a comma was detected");
}
maxValueSelectColumns.add("MAX(" + columnForPartitioning + ") " + columnForPartitioning);
maxValueSelectColumns.add("MIN(" + columnForPartitioning + ") MIN_" + columnForPartitioning);
}
if (customWhereClause != null) {
// adding the custom WHERE clause (if defined) to the list of existing clauses.
maxValueClauses.add("(" + customWhereClause + ")");
}
final String maxWhereClause = StringUtils.join(maxValueClauses, " AND ");
final QueryStatementRequest queryStatementRequest = getMaxColumnStatementRequest(tableName, maxValueSelectColumns, maxWhereClause);
// Build a SELECT query with maximum-value columns (if present)
final StatementResponse statementResponse = databaseDialectService.getStatement(queryStatementRequest);
final String selectQuery = statementResponse.sql();
long rowCount;
try (final Connection con = dbcpService.getConnection(finalFileToProcess == null ? Collections.emptyMap() : finalFileToProcess.getAttributes());
final Statement st = con.createStatement()) {
final int queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds
logger.debug("Executing {}", selectQuery);
ResultSet resultSet;
resultSet = st.executeQuery(selectQuery);
if (resultSet.next()) {
// Total row count is in the first column
rowCount = resultSet.getLong(1);
// Update the state map with the newly-observed maximum values
ResultSetMetaData rsmd = resultSet.getMetaData();
int i = 2;
for (; i <= numMaxValueColumns + 1; i++) {
//Some JDBC drivers consider the columns name and label to be very different things.
// Since this column has been aliased lets check the label first,
// if there is no label we'll use the column name.
String resultColumnName = (StringUtils.isNotEmpty(rsmd.getColumnLabel(i)) ? rsmd.getColumnLabel(i) : rsmd.getColumnName(i)).toLowerCase();
String fullyQualifiedStateKey = getStateKey(tableName, resultColumnName);
String resultColumnCurrentMax = statePropertyMap.get(fullyQualifiedStateKey);
if (StringUtils.isEmpty(resultColumnCurrentMax) && !isDynamicTableName) {
// If we can't find the value at the fully-qualified key name and the table name is static, it is possible (under a previous scheme)
// the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
// maximum value is observed, it will be stored under the fully-qualified key from then on.
resultColumnCurrentMax = statePropertyMap.get(resultColumnName);
}
int type = rsmd.getColumnType(i);
if (isDynamicTableName) {
// We haven't pre-populated the column type map if the table name is dynamic, so do it here
columnTypeMap.put(fullyQualifiedStateKey, type);
}
try {
String newMaxValue = getMaxValueFromRow(resultSet, i, type, resultColumnCurrentMax);
if (newMaxValue != null) {
statePropertyMap.put(fullyQualifiedStateKey, newMaxValue);
}
} catch (ParseException | IOException | ClassCastException pice) {
// Fail the whole thing here before we start creating FlowFiles and such
throw new ProcessException(pice);
}
}
// Process the maximum and minimum values for the partitioning column if necessary
// These are currently required to be Long values, will throw a ClassCastException if they are not
if (useColumnValsForPaging) {
Object o = resultSet.getObject(i);
maxValueForPartitioning = o == null ? null : Long.valueOf(o.toString());
o = resultSet.getObject(i + 1);
minValueForPartitioning = o == null ? null : Long.valueOf(o.toString());
}
} else {
// Something is very wrong here, one row (even if count is zero) should be returned
throw new SQLException("No rows returned from metadata query: " + selectQuery);
}
// for each maximum-value column get a right bounding WHERE condition
IntStream.range(0, numMaxValueColumns).forEach((index) -> {
String colName = maxValueColumnNameList.get(index);
String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
if (!StringUtils.isEmpty(maxValue)) {
if (columnTypeMap.isEmpty() || getColumnType(tableName, colName) == null) {
// This means column type cache is clean after instance reboot. We should re-cache column type
super.setup(context, false, finalFileToProcess);
}
Integer type = getColumnType(tableName, colName);
// Add a condition for the WHERE clause
maxValueClauses.add(colName + " <= " + getLiteralByType(type, maxValue, databaseType));
}
});
final long numberOfFetches;
if (useColumnValsForPaging) {
final long valueRangeSize = maxValueForPartitioning == null ? 0 : (maxValueForPartitioning - minValueForPartitioning + 1);
numberOfFetches = (partitionSize == 0) ? 1 : (valueRangeSize / partitionSize) + (valueRangeSize % partitionSize == 0 ? 0 : 1);
} else {
numberOfFetches = (partitionSize == 0) ? 1 : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
}
// Generate SQL statements to read "pages" of data
final String fragmentIdentifier = UUID.randomUUID().toString();
List<FlowFile> flowFilesToTransfer = new ArrayList<>();
Map<String, String> baseAttributes = new HashMap<>();
baseAttributes.put("generatetablefetch.tableName", tableName);
if (columnNames != null) {
baseAttributes.put("generatetablefetch.columnNames", columnNames);
}
final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", ");
if (StringUtils.isNotBlank(maxColumnNames)) {
baseAttributes.put("generatetablefetch.maxColumnNames", maxColumnNames);
}
baseAttributes.put(FRAGMENT_ID, fragmentIdentifier);
baseAttributes.put(FRAGMENT_COUNT, String.valueOf(numberOfFetches));
// If there are no SQL statements to be generated, still output an empty FlowFile if specified by the user
if (numberOfFetches == 0 && outputEmptyFlowFileOnZeroResults) {
FlowFile emptyFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
Map<String, String> attributesToAdd = new HashMap<>();
final String fetchWhereClause = maxValueClauses.isEmpty() ? "1=1" : StringUtils.join(maxValueClauses, " AND ");
attributesToAdd.put("generatetablefetch.whereClause", fetchWhereClause);
attributesToAdd.put("generatetablefetch.limit", null);
if (partitionSize != 0) {
attributesToAdd.put("generatetablefetch.offset", null);
}
// Add fragment attributes
attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(0));
attributesToAdd.putAll(baseAttributes);
emptyFlowFile = session.putAllAttributes(emptyFlowFile, attributesToAdd);
flowFilesToTransfer.add(emptyFlowFile);
} else {
Long limit = partitionSize == 0 ? null : (long) partitionSize;
for (long i = 0; i < numberOfFetches; i++) {
// Add a right bounding for the partitioning column if necessary (only on last partition, meaning we don't need the limit)
if ((i == numberOfFetches - 1) && useColumnValsForPaging && (maxValueClauses.isEmpty() || customWhereClause != null)) {
maxValueClauses.add(columnForPartitioning + " <= " + maxValueForPartitioning);
limit = null;
}
//Update WHERE list to include new right hand boundaries
final String whereClause = maxValueClauses.isEmpty() ? "1=1" : StringUtils.join(maxValueClauses, " AND ");
Long offset = partitionSize == 0 ? null : i * partitionSize + (useColumnValsForPaging ? minValueForPartitioning : 0);
// Don't use an ORDER BY clause if there's only one partition
final String orderByClause = partitionSize == 0 ? null : (maxColumnNames.isEmpty() ? customOrderByColumn : maxColumnNames);
final List<String> namedColumns;
if (columnNames == null) {
namedColumns = List.of();
} else {
namedColumns = Arrays.asList(columnNames.split(", "));
}
final QueryStatementRequest selectStatementRequest = getSelectStatementRequest(tableName, namedColumns, whereClause, orderByClause, offset, limit, columnForPartitioning);
final StatementResponse selectStatementResponse = databaseDialectService.getStatement(selectStatementRequest);
final String query = selectStatementResponse.sql();
FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes()));
Map<String, String> attributesToAdd = new HashMap<>();
attributesToAdd.put("generatetablefetch.whereClause", whereClause);
attributesToAdd.put("generatetablefetch.limit", (limit == null) ? null : limit.toString());
if (partitionSize != 0) {
attributesToAdd.put("generatetablefetch.offset", String.valueOf(offset));
}
// Add fragment attributes
attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(i));
attributesToAdd.putAll(baseAttributes);
sqlFlowFile = session.putAllAttributes(sqlFlowFile, attributesToAdd);
flowFilesToTransfer.add(sqlFlowFile);
}
}
session.transfer(flowFilesToTransfer, REL_SUCCESS);
if (fileToProcess != null) {
session.remove(fileToProcess);
}
} catch (SQLException e) {
if (fileToProcess != null) {
logger.error("Routing {} to failure since unable to execute SQL select query {}", fileToProcess, selectQuery, e);
fileToProcess = session.putAttribute(fileToProcess, "generatetablefetch.sql.error", e.getMessage());
session.transfer(fileToProcess, REL_FAILURE);
} else {
logger.error("Unable to execute SQL select query {}", selectQuery, e);
throw new ProcessException(e);
}
}
try {
// Update the state
session.setState(statePropertyMap, Scope.CLUSTER);
} catch (IOException ioe) {
logger.error("{} failed to update State Manager, observed maximum values will not be recorded. "
+ "Also, any generated SQL statements may be duplicated.", this, ioe);
}
session.commitAsync();
} catch (final ProcessException pe) {
// Log the cause of the ProcessException if it is available
Throwable t = (pe.getCause() == null ? pe : pe.getCause());
logger.error("Error during processing: {}", t.getMessage(), t);
session.rollback();
context.yield();
}
}