in nifi-extension-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java [515:685]
public static Schema createSchema(final ResultSet rs, AvroConversionOptions options) throws SQLException {
final ResultSetMetaData meta = rs.getMetaData();
final int nrOfColumns = meta.getColumnCount();
String tableName = StringUtils.isEmpty(options.recordName) ? "NiFi_ExecuteSQL_Record" : options.recordName;
if (nrOfColumns > 0) {
String tableNameFromMeta = meta.getTableName(1);
if (!StringUtils.isBlank(tableNameFromMeta)) {
tableName = tableNameFromMeta;
}
}
if (options.convertNames) {
tableName = normalizeNameForAvro(tableName);
}
final FieldAssembler<Schema> builder = SchemaBuilder.record(tableName).namespace("any.data").fields();
/**
* Some missing Avro types - Decimal, Date types. May need some additional work.
*/
for (int i = 1; i <= nrOfColumns; i++) {
/**
* as per jdbc 4 specs, getColumnLabel will have the alias for the column, if not it will have the column name.
* so it may be a better option to check for columnlabel first and if in case it is null is someimplementation,
* check for alias. Postgres is the one that has the null column names for calculated fields.
*/
String nameOrLabel = StringUtils.isNotEmpty(meta.getColumnLabel(i)) ? meta.getColumnLabel(i) : meta.getColumnName(i);
String columnName = options.convertNames ? normalizeNameForAvro(nameOrLabel) : nameOrLabel;
switch (meta.getColumnType(i)) {
case CHAR:
case LONGNVARCHAR:
case LONGVARCHAR:
case NCHAR:
case NVARCHAR:
case VARCHAR:
case CLOB:
case NCLOB:
case OTHER:
case SQLXML:
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
break;
case BIT:
case BOOLEAN:
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().booleanType().endUnion().noDefault();
break;
case INTEGER:
if (meta.isSigned(i) || (meta.getPrecision(i) > 0 && meta.getPrecision(i) < MAX_DIGITS_IN_INT)) {
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
} else {
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
}
break;
case SMALLINT:
case TINYINT:
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().intType().endUnion().noDefault();
break;
case BIGINT:
// Check the precision of the BIGINT. Some databases allow arbitrary precision (> 19), but Avro won't handle that.
// If the precision > 19 (or is negative), use a string for the type, otherwise use a long. The object(s) will be converted
// to strings as necessary
int precision = meta.getPrecision(i);
if (precision < 0 || precision > MAX_DIGITS_IN_BIGINT) {
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
} else {
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().longType().endUnion().noDefault();
}
break;
// java.sql.RowId is interface, is seems to be database
// implementation specific, let's convert to String
case ROWID:
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().endUnion().noDefault();
break;
case FLOAT:
case REAL:
case 100: //Oracle BINARY_FLOAT type
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().floatType().endUnion().noDefault();
break;
case DOUBLE:
case 101: //Oracle BINARY_DOUBLE type
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().doubleType().endUnion().noDefault();
break;
// Since Avro 1.8, LogicalType is supported.
case DECIMAL:
case NUMERIC:
if (options.useLogicalTypes) {
int decimalPrecision;
final int decimalScale;
if (meta.getPrecision(i) > 0) {
// When database returns a certain precision, we can rely on that.
decimalPrecision = meta.getPrecision(i);
//For the float data type Oracle return decimalScale < 0 which cause is not expected to org.apache.avro.LogicalTypes
//Hence falling back to default scale if decimalScale < 0
decimalScale = meta.getScale(i) >= 0 ? meta.getScale(i) : options.defaultScale;
} else {
// If not, use default precision.
decimalPrecision = options.defaultPrecision;
// Oracle returns precision=0, scale=-127 for variable scale value such as ROWNUM or function result.
// Specifying 'oracle.jdbc.J2EE13Compliant' SystemProperty makes it to return scale=0 instead.
// Queries for example, 'SELECT 1.23 as v from DUAL' can be problematic because it can't be mapped with decimal with scale=0.
// Default scale is used to preserve decimals in such case.
decimalScale = meta.getScale(i) > 0 ? meta.getScale(i) : options.defaultScale;
}
// Scale can be bigger than precision in some cases (Oracle, e.g.) If this is the case, assume precision refers to the number of
// decimal digits and thus precision = scale
if (decimalScale > decimalPrecision) {
decimalPrecision = decimalScale;
}
final LogicalTypes.Decimal decimal = LogicalTypes.decimal(decimalPrecision, decimalScale);
addNullableField(builder, columnName,
u -> u.type(decimal.addToSchema(SchemaBuilder.builder().bytesType())));
} else {
addNullableField(builder, columnName, u -> u.stringType());
}
break;
case DATE:
addNullableField(builder, columnName,
u -> options.useLogicalTypes
? u.type(LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()))
: u.stringType());
break;
case TIME:
addNullableField(builder, columnName,
u -> options.useLogicalTypes
? u.type(LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType()))
: u.stringType());
break;
case TIMESTAMP:
case TIMESTAMP_WITH_TIMEZONE:
case -101: // Oracle's TIMESTAMP WITH TIME ZONE
case -102: // Oracle's TIMESTAMP WITH LOCAL TIME ZONE
case -155: // SQL Server's DATETIMEOFFSET
addNullableField(builder, columnName,
u -> options.useLogicalTypes
? u.type(LogicalTypes.timestampMillis().addToSchema(SchemaBuilder.builder().longType()))
: u.stringType());
break;
case BINARY:
case VARBINARY:
case LONGVARBINARY:
case ARRAY:
case BLOB:
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault();
break;
case -150: // SQLServer may return -150 from the driver even though it's really -156 (sql_variant), treat as a union since we don't know what the values will actually be
case -156:
builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().and().intType().and().longType().and().booleanType().and().bytesType().and()
.doubleType().and().floatType().endUnion().noDefault();
break;
default:
throw new IllegalArgumentException("createSchema: Unknown SQL type " + meta.getColumnType(i) + " / " + meta.getColumnTypeName(i)
+ " (table: " + tableName + ", column: " + columnName + ") cannot be converted to Avro type");
}
}
return builder.endRecord();
}