in nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java [763:1013]
private void executeDML(final ProcessContext context, final ProcessSession session, final FlowFile flowFile,
final Connection con, final RecordReader recordReader, final String explicitStatementType, final DMLSettings settings)
throws IllegalArgumentException, MalformedRecordException, IOException, SQLException {
final ComponentLog log = getLogger();
final String configuredStatementType = context.getProperty(STATEMENT_TYPE).getValue();
final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String updateKeys = switch (configuredStatementType) {
case UPDATE_TYPE, UPSERT_TYPE, SQL_TYPE, USE_ATTR_TYPE, USE_RECORD_PATH ->
context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
default -> null;
};
final String deleteKeys = switch (configuredStatementType) {
case DELETE_TYPE, SQL_TYPE, USE_ATTR_TYPE, USE_RECORD_PATH ->
context.getProperty(DELETE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
default -> null;
};
final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
final int timeoutMillis = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final String binaryStringFormat = context.getProperty(BINARY_STRING_FORMAT).evaluateAttributeExpressions(flowFile).getValue();
// Ensure the table name has been set, the generated SQL statements (and TableSchema cache) will need it
if (StringUtils.isEmpty(tableName)) {
throw new IllegalArgumentException(format("Cannot process %s because Table Name is null or empty", flowFile));
}
final NameNormalizer normalizer = Optional.of(settings)
.filter(s -> s.translateFieldNames)
.map(s -> NameNormalizerFactory.getNormalizer(s.translationStrategy, s.translationPattern))
.orElse(null);
final SchemaKey schemaKey = new PutDatabaseRecord.SchemaKey(catalog, schemaName, tableName);
final TableSchema tableSchema;
try {
tableSchema = schemaCache.get(schemaKey, key -> {
try {
final TableSchema schema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, normalizer, updateKeys, log);
getLogger().debug("Fetched Table Schema {} for table name {}", schema, tableName);
return schema;
} catch (SQLException e) {
// Wrap this in a runtime exception, it is unwrapped in the outer try
throw new ProcessException(e);
}
});
if (tableSchema == null) {
throw new IllegalArgumentException("No table schema specified!");
}
} catch (ProcessException pe) {
// Unwrap the SQLException if one occurred
if (pe.getCause() instanceof SQLException) {
throw (SQLException) pe.getCause();
} else {
throw pe;
}
}
// build the fully qualified table name
final String fqTableName = generateTableName(settings, catalog, schemaName, tableName, tableSchema);
final Map<String, PreparedSqlAndColumns> preparedSql = new HashMap<>();
int currentBatchSize = 0;
int batchIndex = 0;
Record outerRecord;
PreparedStatement lastPreparedStatement = null;
try {
while ((outerRecord = recordReader.nextRecord()) != null) {
final String statementType;
if (USE_RECORD_PATH.equalsIgnoreCase(explicitStatementType)) {
statementType = recordPathOperationType.apply(outerRecord);
} else {
statementType = explicitStatementType;
}
final List<Record> dataRecords = getDataRecords(outerRecord);
for (final Record currentRecord : dataRecords) {
PreparedSqlAndColumns preparedSqlAndColumns = preparedSql.get(statementType);
if (preparedSqlAndColumns == null) {
final RecordSchema recordSchema = currentRecord.getSchema();
final SqlAndIncludedColumns sqlHolder;
if (INSERT_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateInsert(recordSchema, fqTableName, tableSchema, settings, normalizer);
} else if (UPDATE_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateUpdate(recordSchema, fqTableName, updateKeys, tableSchema, settings, normalizer);
} else if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = generateDelete(recordSchema, fqTableName, deleteKeys, tableSchema, settings, normalizer);
} else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = getSqlStatement(StatementType.UPSERT, recordSchema, fqTableName, updateKeys, tableSchema, settings, normalizer);
} else if (INSERT_IGNORE_TYPE.equalsIgnoreCase(statementType)) {
sqlHolder = getSqlStatement(StatementType.INSERT_IGNORE, recordSchema, fqTableName, updateKeys, tableSchema, settings, normalizer);
} else {
throw new IllegalArgumentException(format("Statement Type %s is not valid, FlowFile %s", statementType, flowFile));
}
// Log debug sqlHolder
log.debug("Generated SQL: {}", sqlHolder.getSql());
// Create the Prepared Statement
final PreparedStatement preparedStatement = con.prepareStatement(sqlHolder.getSql());
try {
preparedStatement.setQueryTimeout(timeoutMillis); // timeout in seconds
} catch (final SQLException se) {
// If the driver doesn't support query timeout, then assume it is "infinite". Allow a timeout of zero only
if (timeoutMillis > 0) {
throw se;
}
}
final int parameterCount = getParameterCount(sqlHolder.sql);
preparedSqlAndColumns = new PreparedSqlAndColumns(sqlHolder, preparedStatement, parameterCount);
preparedSql.put(statementType, preparedSqlAndColumns);
}
final PreparedStatement ps = preparedSqlAndColumns.getPreparedStatement();
final List<Integer> fieldIndexes = preparedSqlAndColumns.getSqlAndIncludedColumns().getFieldIndexes();
final String sql = preparedSqlAndColumns.getSqlAndIncludedColumns().getSql();
if (ps != lastPreparedStatement && lastPreparedStatement != null) {
batchIndex++;
log.debug("Executing query {} because Statement Type changed between Records for {}; fieldIndexes: {}; batch index: {}; batch size: {}",
sql, flowFile, fieldIndexes, batchIndex, currentBatchSize);
lastPreparedStatement.executeBatch();
session.adjustCounter("Batches Executed", 1, false);
currentBatchSize = 0;
}
lastPreparedStatement = ps;
final Object[] values = currentRecord.getValues();
final List<DataType> dataTypes = currentRecord.getSchema().getDataTypes();
final RecordSchema recordSchema = currentRecord.getSchema();
final Map<String, ColumnDescription> columns = tableSchema.getColumns();
int deleteIndex = 0;
for (int i = 0; i < fieldIndexes.size(); i++) {
final int currentFieldIndex = fieldIndexes.get(i);
Object currentValue = values[currentFieldIndex];
final DataType dataType = dataTypes.get(currentFieldIndex);
final int fieldSqlType = DataTypeUtils.getSQLTypeValue(dataType);
final String fieldName = recordSchema.getField(currentFieldIndex).getFieldName();
String columnName = TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer);
int sqlType;
final ColumnDescription column = columns.get(columnName);
// 'column' should not be null here as the fieldIndexes should correspond to fields that match table columns, but better to handle just in case
if (column == null) {
if (!settings.ignoreUnmappedFields) {
throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n"
+ (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", columns.keySet()));
} else {
sqlType = fieldSqlType;
}
} else {
sqlType = column.getDataType();
// SQLServer returns -150 for sql_variant from DatabaseMetaData though the server expects -156 when setting a sql_variant parameter
if (sqlType == -150) {
sqlType = -156;
}
}
// Convert (if necessary) from field data type to column data type
if (fieldSqlType != sqlType) {
try {
DataType targetDataType = DataTypeUtils.getDataTypeFromSQLTypeValue(sqlType);
// If sqlType is unsupported, fall back to the fieldSqlType instead
if (targetDataType == null) {
targetDataType = DataTypeUtils.getDataTypeFromSQLTypeValue(fieldSqlType);
}
if (targetDataType != null) {
if (sqlType == Types.BLOB || sqlType == Types.BINARY || sqlType == Types.VARBINARY || sqlType == Types.LONGVARBINARY) {
if (currentValue instanceof Object[]) {
// Convert Object[Byte] arrays to byte[]
Object[] src = (Object[]) currentValue;
if (src.length > 0) {
if (!(src[0] instanceof Byte)) {
throw new IllegalTypeConversionException("Cannot convert value " + currentValue + " to BLOB/BINARY/VARBINARY/LONGVARBINARY");
}
}
byte[] dest = new byte[src.length];
for (int j = 0; j < src.length; j++) {
dest[j] = (Byte) src[j];
}
currentValue = dest;
} else if (currentValue instanceof String stringValue) {
if (BINARY_STRING_FORMAT_BASE64.getValue().equals(binaryStringFormat)) {
currentValue = Base64.getDecoder().decode(stringValue);
} else if (BINARY_STRING_FORMAT_HEXADECIMAL.getValue().equals(binaryStringFormat)) {
currentValue = HexFormat.of().parseHex(stringValue);
} else {
currentValue = stringValue.getBytes(StandardCharsets.UTF_8);
}
} else if (currentValue != null && !(currentValue instanceof byte[])) {
throw new IllegalTypeConversionException("Cannot convert value " + currentValue + " to BLOB/BINARY/VARBINARY/LONGVARBINARY");
}
} else {
currentValue = DataTypeUtils.convertType(
currentValue,
targetDataType,
fieldName);
}
}
} catch (IllegalTypeConversionException itce) {
// If the field and column types don't match or the value can't otherwise be converted to the column datatype,
// try with the original object and field datatype
sqlType = DataTypeUtils.getSQLTypeValue(dataType);
}
}
// If DELETE type, insert the object twice if the column is nullable because of the null check (see generateDelete for details)
if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
setParameter(ps, ++deleteIndex, currentValue, fieldSqlType, sqlType);
if (column != null && column.isNullable()) {
setParameter(ps, ++deleteIndex, currentValue, fieldSqlType, sqlType);
}
} else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) {
// Calculate the number of times to set the parameter based on parameters divided by number of field indexes
final int timesToAddObjects = preparedSqlAndColumns.parameterCount / fieldIndexes.size();
for (int j = 0; j < timesToAddObjects; j++) {
setParameter(ps, i + (fieldIndexes.size() * j) + 1, currentValue, fieldSqlType, sqlType);
}
} else {
setParameter(ps, i + 1, currentValue, fieldSqlType, sqlType);
}
}
ps.addBatch();
session.adjustCounter(statementType + " updates performed", 1, false);
if (++currentBatchSize == maxBatchSize) {
batchIndex++;
log.debug("Executing query {} because batch reached max size for {}; fieldIndexes: {}; batch index: {}; batch size: {}",
sql, flowFile, fieldIndexes, batchIndex, currentBatchSize);
session.adjustCounter("Batches Executed", 1, false);
ps.executeBatch();
currentBatchSize = 0;
}
}
}
if (currentBatchSize > 0) {
lastPreparedStatement.executeBatch();
session.adjustCounter("Batches Executed", 1, false);
}
} finally {
for (final PreparedSqlAndColumns preparedSqlAndColumns : preparedSql.values()) {
preparedSqlAndColumns.getPreparedStatement().close();
}
}
}