public static Schema createSchema()

in nifi-extension-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java [515:685]


    public static Schema createSchema(final ResultSet rs, AvroConversionOptions options) throws SQLException {
        final ResultSetMetaData meta = rs.getMetaData();
        final int nrOfColumns = meta.getColumnCount();
        String tableName = StringUtils.isEmpty(options.recordName) ? "NiFi_ExecuteSQL_Record" : options.recordName;
        if (nrOfColumns > 0) {
            String tableNameFromMeta = meta.getTableName(1);
            if (!StringUtils.isBlank(tableNameFromMeta)) {
                tableName = tableNameFromMeta;
            }
        }

        if (options.convertNames) {
            tableName = normalizeNameForAvro(tableName);
        }

        final FieldAssembler<Schema> builder = SchemaBuilder.record(tableName).namespace("any.data").fields();

        /**
         * Some missing Avro types - Decimal, Date types. May need some additional work.
         */
        for (int i = 1; i <= nrOfColumns; i++) {
        /**
        *   as per jdbc 4 specs, getColumnLabel will have the alias for the column, if not it will have the column name.
        *  so it may be a better option to check for columnlabel first and if in case it is null is someimplementation,
        *  check for alias. Postgres is the one that has the null column names for calculated fields.
        */
            String nameOrLabel = StringUtils.isNotEmpty(meta.getColumnLabel(i)) ? meta.getColumnLabel(i) : meta.getColumnName(i);
            String columnName = options.convertNames ? normalizeNameForAvro(nameOrLabel) : nameOrLabel;
            switch (meta.getColumnType(i)) {
                case CHAR:
                case LONGNVARCHAR:
                case LONGVARCHAR:
                case NCHAR:
                case NVARCHAR:
                case VARCHAR:
                case CLOB:
                case NCLOB:
                case OTHER:
                case SQLXML:
                    builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                    break;

                case BIT:
                case BOOLEAN:
                    builder.name(columnName).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
                    break;

                case INTEGER:
                    if (meta.isSigned(i) || (meta.getPrecision(i) > 0 && meta.getPrecision(i) < MAX_DIGITS_IN_INT)) {
                        builder.name(columnName).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
                    } else {
                        builder.name(columnName).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
                    }
                    break;

                case SMALLINT:
                case TINYINT:
                    builder.name(columnName).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
                    break;

                case BIGINT:
                    // Check the precision of the BIGINT. Some databases allow arbitrary precision (> 19), but Avro won't handle that.
                    // If the precision > 19 (or is negative), use a string for the type, otherwise use a long. The object(s) will be converted
                    // to strings as necessary
                    int precision = meta.getPrecision(i);
                    if (precision < 0 || precision > MAX_DIGITS_IN_BIGINT) {
                        builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                    } else {
                        builder.name(columnName).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
                    }
                    break;

                // java.sql.RowId is interface, is seems to be database
                // implementation specific, let's convert to String
                case ROWID:
                    builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
                    break;

                case FLOAT:
                case REAL:
                case 100: //Oracle BINARY_FLOAT type
                    builder.name(columnName).type().unionOf().nullBuilder().endNull().and().floatType().endUnion().noDefault();
                    break;

                case DOUBLE:
                case 101: //Oracle BINARY_DOUBLE type
                    builder.name(columnName).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault();
                    break;

                // Since Avro 1.8, LogicalType is supported.
                case DECIMAL:
                case NUMERIC:
                    if (options.useLogicalTypes) {
                        int decimalPrecision;
                        final int decimalScale;
                        if (meta.getPrecision(i) > 0) {
                            // When database returns a certain precision, we can rely on that.
                            decimalPrecision = meta.getPrecision(i);
                            //For the float data type Oracle return decimalScale < 0 which cause is not expected to org.apache.avro.LogicalTypes
                            //Hence falling back to default scale if decimalScale < 0
                            decimalScale = meta.getScale(i) >= 0 ? meta.getScale(i) : options.defaultScale;
                        } else {
                            // If not, use default precision.
                            decimalPrecision = options.defaultPrecision;
                            // Oracle returns precision=0, scale=-127 for variable scale value such as ROWNUM or function result.
                            // Specifying 'oracle.jdbc.J2EE13Compliant' SystemProperty makes it to return scale=0 instead.
                            // Queries for example, 'SELECT 1.23 as v from DUAL' can be problematic because it can't be mapped with decimal with scale=0.
                            // Default scale is used to preserve decimals in such case.
                            decimalScale = meta.getScale(i) > 0 ? meta.getScale(i) : options.defaultScale;
                        }
                        // Scale can be bigger than precision in some cases (Oracle, e.g.) If this is the case, assume precision refers to the number of
                        // decimal digits and thus precision = scale
                        if (decimalScale > decimalPrecision) {
                            decimalPrecision = decimalScale;
                        }
                        final LogicalTypes.Decimal decimal = LogicalTypes.decimal(decimalPrecision, decimalScale);
                        addNullableField(builder, columnName,
                                u -> u.type(decimal.addToSchema(SchemaBuilder.builder().bytesType())));
                    } else {
                        addNullableField(builder, columnName, u -> u.stringType());
                    }
                    break;

                case DATE:

                    addNullableField(builder, columnName,
                            u -> options.useLogicalTypes
                                    ? u.type(LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()))
                                    : u.stringType());
                    break;

                case TIME:
                    addNullableField(builder, columnName,
                            u -> options.useLogicalTypes
                                    ? u.type(LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType()))
                                    : u.stringType());
                    break;

                case TIMESTAMP:
                case TIMESTAMP_WITH_TIMEZONE:
                case -101: // Oracle's TIMESTAMP WITH TIME ZONE
                case -102: // Oracle's TIMESTAMP WITH LOCAL TIME ZONE
                case -155: // SQL Server's DATETIMEOFFSET
                    addNullableField(builder, columnName,
                            u -> options.useLogicalTypes
                                    ? u.type(LogicalTypes.timestampMillis().addToSchema(SchemaBuilder.builder().longType()))
                                    : u.stringType());
                    break;

                case BINARY:
                case VARBINARY:
                case LONGVARBINARY:
                case ARRAY:
                case BLOB:
                    builder.name(columnName).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault();
                    break;

                case -150: // SQLServer may return -150 from the driver even though it's really -156 (sql_variant), treat as a union since we don't know what the values will actually be
                case -156:
                    builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().and().intType().and().longType().and().booleanType().and().bytesType().and()
                            .doubleType().and().floatType().endUnion().noDefault();
                    break;

                default:
                    throw new IllegalArgumentException("createSchema: Unknown SQL type " + meta.getColumnType(i) + " / " + meta.getColumnTypeName(i)
                            + " (table: " + tableName + ", column: " + columnName + ") cannot be converted to Avro type");
            }
        }

        return builder.endRecord();
    }