private void executeDML()

in nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java [763:1013]


    private void executeDML(final ProcessContext context, final ProcessSession session, final FlowFile flowFile,
                            final Connection con, final RecordReader recordReader, final String explicitStatementType, final DMLSettings settings)
            throws IllegalArgumentException, MalformedRecordException, IOException, SQLException {

        final ComponentLog log = getLogger();
        final String configuredStatementType = context.getProperty(STATEMENT_TYPE).getValue();

        final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
        final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
        final String updateKeys = switch (configuredStatementType) {
            case UPDATE_TYPE, UPSERT_TYPE, SQL_TYPE, USE_ATTR_TYPE, USE_RECORD_PATH ->
                    context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
            default -> null;
        };
        final String deleteKeys = switch (configuredStatementType) {
            case DELETE_TYPE, SQL_TYPE, USE_ATTR_TYPE, USE_RECORD_PATH ->
                    context.getProperty(DELETE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
            default -> null;
        };
        final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
        final int timeoutMillis = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();

        final String binaryStringFormat = context.getProperty(BINARY_STRING_FORMAT).evaluateAttributeExpressions(flowFile).getValue();

        // Ensure the table name has been set, the generated SQL statements (and TableSchema cache) will need it
        if (StringUtils.isEmpty(tableName)) {
            throw new IllegalArgumentException(format("Cannot process %s because Table Name is null or empty", flowFile));
        }
        final NameNormalizer normalizer = Optional.of(settings)
                .filter(s -> s.translateFieldNames)
                .map(s -> NameNormalizerFactory.getNormalizer(s.translationStrategy, s.translationPattern))
                .orElse(null);
        final SchemaKey schemaKey = new PutDatabaseRecord.SchemaKey(catalog, schemaName, tableName);
        final TableSchema tableSchema;
        try {
            tableSchema = schemaCache.get(schemaKey, key -> {
                try {
                    final TableSchema schema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, normalizer, updateKeys, log);
                    getLogger().debug("Fetched Table Schema {} for table name {}", schema, tableName);
                    return schema;
                } catch (SQLException e) {
                    // Wrap this in a runtime exception, it is unwrapped in the outer try
                    throw new ProcessException(e);
                }
            });
            if (tableSchema == null) {
                throw new IllegalArgumentException("No table schema specified!");
            }
        } catch (ProcessException pe) {
            // Unwrap the SQLException if one occurred
            if (pe.getCause() instanceof SQLException) {
                throw (SQLException) pe.getCause();
            } else {
                throw pe;
            }
        }

        // build the fully qualified table name
        final String fqTableName = generateTableName(settings, catalog, schemaName, tableName, tableSchema);

        final Map<String, PreparedSqlAndColumns> preparedSql = new HashMap<>();
        int currentBatchSize = 0;
        int batchIndex = 0;
        Record outerRecord;
        PreparedStatement lastPreparedStatement = null;

        try {
            while ((outerRecord = recordReader.nextRecord()) != null) {
                final String statementType;
                if (USE_RECORD_PATH.equalsIgnoreCase(explicitStatementType)) {
                    statementType = recordPathOperationType.apply(outerRecord);
                } else {
                    statementType = explicitStatementType;
                }

                final List<Record> dataRecords = getDataRecords(outerRecord);
                for (final Record currentRecord : dataRecords) {
                    PreparedSqlAndColumns preparedSqlAndColumns = preparedSql.get(statementType);
                    if (preparedSqlAndColumns == null) {
                        final RecordSchema recordSchema = currentRecord.getSchema();

                        final SqlAndIncludedColumns sqlHolder;
                        if (INSERT_TYPE.equalsIgnoreCase(statementType)) {
                            sqlHolder = generateInsert(recordSchema, fqTableName, tableSchema, settings, normalizer);
                        } else if (UPDATE_TYPE.equalsIgnoreCase(statementType)) {
                            sqlHolder = generateUpdate(recordSchema, fqTableName, updateKeys, tableSchema, settings, normalizer);
                        } else if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
                            sqlHolder = generateDelete(recordSchema, fqTableName, deleteKeys, tableSchema, settings, normalizer);
                        } else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) {
                            sqlHolder = getSqlStatement(StatementType.UPSERT, recordSchema, fqTableName, updateKeys, tableSchema, settings, normalizer);
                        } else if (INSERT_IGNORE_TYPE.equalsIgnoreCase(statementType)) {
                            sqlHolder = getSqlStatement(StatementType.INSERT_IGNORE, recordSchema, fqTableName, updateKeys, tableSchema, settings, normalizer);
                        } else {
                            throw new IllegalArgumentException(format("Statement Type %s is not valid, FlowFile %s", statementType, flowFile));
                        }

                        // Log debug sqlHolder
                        log.debug("Generated SQL: {}", sqlHolder.getSql());
                        // Create the Prepared Statement
                        final PreparedStatement preparedStatement = con.prepareStatement(sqlHolder.getSql());

                        try {
                            preparedStatement.setQueryTimeout(timeoutMillis); // timeout in seconds
                        } catch (final SQLException se) {
                            // If the driver doesn't support query timeout, then assume it is "infinite". Allow a timeout of zero only
                            if (timeoutMillis > 0) {
                                throw se;
                            }
                        }

                        final int parameterCount = getParameterCount(sqlHolder.sql);
                        preparedSqlAndColumns = new PreparedSqlAndColumns(sqlHolder, preparedStatement, parameterCount);
                        preparedSql.put(statementType, preparedSqlAndColumns);
                    }

                    final PreparedStatement ps = preparedSqlAndColumns.getPreparedStatement();
                    final List<Integer> fieldIndexes = preparedSqlAndColumns.getSqlAndIncludedColumns().getFieldIndexes();
                    final String sql = preparedSqlAndColumns.getSqlAndIncludedColumns().getSql();

                    if (ps != lastPreparedStatement && lastPreparedStatement != null) {
                        batchIndex++;
                        log.debug("Executing query {} because Statement Type changed between Records for {}; fieldIndexes: {}; batch index: {}; batch size: {}",
                                sql, flowFile, fieldIndexes, batchIndex, currentBatchSize);
                        lastPreparedStatement.executeBatch();

                        session.adjustCounter("Batches Executed", 1, false);
                        currentBatchSize = 0;
                    }
                    lastPreparedStatement = ps;

                    final Object[] values = currentRecord.getValues();
                    final List<DataType> dataTypes = currentRecord.getSchema().getDataTypes();
                    final RecordSchema recordSchema = currentRecord.getSchema();
                    final Map<String, ColumnDescription> columns = tableSchema.getColumns();

                    int deleteIndex = 0;
                    for (int i = 0; i < fieldIndexes.size(); i++) {
                        final int currentFieldIndex = fieldIndexes.get(i);
                        Object currentValue = values[currentFieldIndex];
                        final DataType dataType = dataTypes.get(currentFieldIndex);
                        final int fieldSqlType = DataTypeUtils.getSQLTypeValue(dataType);
                        final String fieldName = recordSchema.getField(currentFieldIndex).getFieldName();
                        String columnName = TableSchema.normalizedName(fieldName, settings.translateFieldNames, normalizer);
                        int sqlType;

                        final ColumnDescription column = columns.get(columnName);
                        // 'column' should not be null here as the fieldIndexes should correspond to fields that match table columns, but better to handle just in case
                        if (column == null) {
                            if (!settings.ignoreUnmappedFields) {
                                throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database\n"
                                        + (settings.translateFieldNames ? "Normalized " : "") + "Columns: " + String.join(",", columns.keySet()));
                            } else {
                                sqlType = fieldSqlType;
                            }
                        } else {
                            sqlType = column.getDataType();
                            // SQLServer returns -150 for sql_variant from DatabaseMetaData though the server expects -156 when setting a sql_variant parameter
                            if (sqlType == -150) {
                                sqlType = -156;
                            }
                        }

                        // Convert (if necessary) from field data type to column data type
                        if (fieldSqlType != sqlType) {
                            try {
                                DataType targetDataType = DataTypeUtils.getDataTypeFromSQLTypeValue(sqlType);
                                // If sqlType is unsupported, fall back to the fieldSqlType instead
                                if (targetDataType == null) {
                                    targetDataType = DataTypeUtils.getDataTypeFromSQLTypeValue(fieldSqlType);
                                }
                                if (targetDataType != null) {
                                    if (sqlType == Types.BLOB || sqlType == Types.BINARY || sqlType == Types.VARBINARY || sqlType == Types.LONGVARBINARY) {
                                        if (currentValue instanceof Object[]) {
                                            // Convert Object[Byte] arrays to byte[]
                                            Object[] src = (Object[]) currentValue;
                                            if (src.length > 0) {
                                                if (!(src[0] instanceof Byte)) {
                                                    throw new IllegalTypeConversionException("Cannot convert value " + currentValue + " to BLOB/BINARY/VARBINARY/LONGVARBINARY");
                                                }
                                            }
                                            byte[] dest = new byte[src.length];
                                            for (int j = 0; j < src.length; j++) {
                                                dest[j] = (Byte) src[j];
                                            }
                                            currentValue = dest;
                                        } else if (currentValue instanceof String stringValue) {
                                            if (BINARY_STRING_FORMAT_BASE64.getValue().equals(binaryStringFormat)) {
                                                currentValue = Base64.getDecoder().decode(stringValue);
                                            } else if (BINARY_STRING_FORMAT_HEXADECIMAL.getValue().equals(binaryStringFormat)) {
                                                currentValue = HexFormat.of().parseHex(stringValue);
                                            } else {
                                                currentValue = stringValue.getBytes(StandardCharsets.UTF_8);
                                            }
                                        } else if (currentValue != null && !(currentValue instanceof byte[])) {
                                            throw new IllegalTypeConversionException("Cannot convert value " + currentValue + " to BLOB/BINARY/VARBINARY/LONGVARBINARY");
                                        }
                                    } else {
                                        currentValue = DataTypeUtils.convertType(
                                                currentValue,
                                                targetDataType,
                                                fieldName);
                                    }
                                }
                            } catch (IllegalTypeConversionException itce) {
                                // If the field and column types don't match or the value can't otherwise be converted to the column datatype,
                                // try with the original object and field datatype
                                sqlType = DataTypeUtils.getSQLTypeValue(dataType);
                            }
                        }

                        // If DELETE type, insert the object twice if the column is nullable because of the null check (see generateDelete for details)
                        if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
                            setParameter(ps, ++deleteIndex, currentValue, fieldSqlType, sqlType);
                            if (column != null && column.isNullable()) {
                                setParameter(ps, ++deleteIndex, currentValue, fieldSqlType, sqlType);
                            }
                        } else if (UPSERT_TYPE.equalsIgnoreCase(statementType)) {
                            // Calculate the number of times to set the parameter based on parameters divided by number of field indexes
                            final int timesToAddObjects =  preparedSqlAndColumns.parameterCount / fieldIndexes.size();
                            for (int j = 0; j < timesToAddObjects; j++) {
                                setParameter(ps, i + (fieldIndexes.size() * j) + 1, currentValue, fieldSqlType, sqlType);
                            }
                        } else {
                            setParameter(ps, i + 1, currentValue, fieldSqlType, sqlType);
                        }
                    }

                    ps.addBatch();
                    session.adjustCounter(statementType + " updates performed", 1, false);
                    if (++currentBatchSize == maxBatchSize) {
                        batchIndex++;
                        log.debug("Executing query {} because batch reached max size for {}; fieldIndexes: {}; batch index: {}; batch size: {}",
                                sql, flowFile, fieldIndexes, batchIndex, currentBatchSize);
                        session.adjustCounter("Batches Executed", 1, false);
                        ps.executeBatch();
                        currentBatchSize = 0;
                    }
                }
            }

            if (currentBatchSize > 0) {
                lastPreparedStatement.executeBatch();
                session.adjustCounter("Batches Executed", 1, false);
            }
        } finally {
            for (final PreparedSqlAndColumns preparedSqlAndColumns : preparedSql.values()) {
                preparedSqlAndColumns.getPreparedStatement().close();
            }
        }
    }