in nifi-extension-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java [261:486]
public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, final AvroConversionOptions options, final ResultSetRowCallback callback)
throws SQLException, IOException {
final Schema schema = createSchema(rs, options);
final GenericRecord rec = new GenericData.Record(schema);
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
dataFileWriter.setCodec(options.codec);
dataFileWriter.create(schema, outStream);
final ResultSetMetaData meta = rs.getMetaData();
final int nrOfColumns = meta.getColumnCount();
long nrOfRows = 0;
while (rs.next()) {
if (callback != null) {
callback.processRow(rs);
}
for (int i = 1; i <= nrOfColumns; i++) {
final int javaSqlType = meta.getColumnType(i);
final Schema fieldSchema = schema.getFields().get(i - 1).schema();
// Need to handle CLOB and NCLOB before getObject() is called, due to ResultSet's maximum portability statement
if (javaSqlType == CLOB || javaSqlType == NCLOB) {
Clob clob = rs.getClob(i);
if (clob != null) {
StringBuilder sb = new StringBuilder();
char[] buffer = new char[32 * 1024]; // 32K default buffer
try (Reader reader = clob.getCharacterStream()) {
int charsRead;
while ((charsRead = reader.read(buffer)) != -1) {
sb.append(buffer, 0, charsRead);
}
}
rec.put(i - 1, sb.toString());
try {
clob.free();
} catch (SQLFeatureNotSupportedException sfnse) {
// The driver doesn't support free, but allow processing to continue
logger.debug("Database Driver does not support freeing clob objects");
}
} else {
rec.put(i - 1, null);
}
continue;
}
if (javaSqlType == BLOB) {
Blob blob = rs.getBlob(i);
if (blob != null) {
long numChars = blob.length();
byte[] buffer = new byte[(int) numChars];
try (InputStream is = blob.getBinaryStream()) {
int index = 0;
int c = is.read();
while (c >= 0) {
buffer[index++] = (byte) c;
c = is.read();
}
}
ByteBuffer bb = ByteBuffer.wrap(buffer);
rec.put(i - 1, bb);
try {
blob.free();
} catch (SQLFeatureNotSupportedException sfnse) {
// The driver doesn't support free, but allow processing to continue
logger.debug("Database Driver does not support freeing blob objects");
}
} else {
rec.put(i - 1, null);
}
continue;
}
Object value;
// If a Timestamp type, try getTimestamp() rather than getObject()
if (javaSqlType == TIMESTAMP
|| javaSqlType == TIMESTAMP_WITH_TIMEZONE
// The following are Oracle-specific codes for TIMESTAMP WITH TIME ZONE and TIMESTAMP WITH LOCAL TIME ZONE. This would be better
// located in the DatabaseAdapter interfaces, but some processors (like ExecuteSQL) use this method but don't specify a DatabaseAdapter.
|| javaSqlType == -101
|| javaSqlType == -102) {
try {
value = rs.getTimestamp(i);
// Some drivers (like Derby) return null for getTimestamp() but return a Timestamp object in getObject()
if (value == null) {
value = rs.getObject(i);
}
} catch (Exception e) {
// The cause of the exception is not known, but we'll fall back to call getObject() and handle any "real" exception there
value = rs.getObject(i);
}
} else {
value = rs.getObject(i);
}
if (value == null) {
rec.put(i - 1, null);
} else if (javaSqlType == BINARY || javaSqlType == VARBINARY || javaSqlType == LONGVARBINARY || javaSqlType == ARRAY) {
// bytes requires little bit different handling
byte[] bytes = rs.getBytes(i);
ByteBuffer bb = ByteBuffer.wrap(bytes);
rec.put(i - 1, bb);
} else if (javaSqlType == 100) { // Handle Oracle BINARY_FLOAT data type
rec.put(i - 1, rs.getFloat(i));
} else if (javaSqlType == 101) { // Handle Oracle BINARY_DOUBLE data type
rec.put(i - 1, rs.getDouble(i));
} else if (value instanceof Byte) {
// tinyint(1) type is returned by JDBC driver as java.sql.Types.TINYINT
// But value is returned by JDBC as java.lang.Byte
// (at least H2 JDBC works this way)
// direct put to avro record results:
// org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte
rec.put(i - 1, ((Byte) value).intValue());
} else if (value instanceof Short) {
//MS SQL returns TINYINT as a Java Short, which Avro doesn't understand.
rec.put(i - 1, ((Short) value).intValue());
} else if (value instanceof BigDecimal) {
if (options.useLogicalTypes) {
// Delegate mapping to AvroTypeUtil in order to utilize logical types.
rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, fieldSchema));
} else {
// As string for backward compatibility.
rec.put(i - 1, value.toString());
}
} else if (value instanceof BigInteger) {
// Check the precision of the BIGINT. Some databases allow arbitrary precision (> 19), but Avro won't handle that.
// It the SQL type is BIGINT and the precision is between 0 and 19 (inclusive); if so, the BigInteger is likely a
// long (and the schema says it will be), so try to get its value as a long.
// Otherwise, Avro can't handle BigInteger as a number - it will throw an AvroRuntimeException
// such as: "Unknown datum type: java.math.BigInteger: 38". In this case the schema is expecting a string.
if (javaSqlType == BIGINT) {
int precision = meta.getPrecision(i);
if (precision < 0 || precision > MAX_DIGITS_IN_BIGINT) {
rec.put(i - 1, value.toString());
} else {
try {
rec.put(i - 1, ((BigInteger) value).longValueExact());
} catch (ArithmeticException ae) {
// Since the value won't fit in a long, convert it to a string
rec.put(i - 1, value.toString());
}
}
} else {
rec.put(i - 1, value.toString());
}
} else if (value instanceof Number || value instanceof Boolean) {
if (javaSqlType == BIGINT) {
int precision = meta.getPrecision(i);
if (precision < 0 || precision > MAX_DIGITS_IN_BIGINT) {
rec.put(i - 1, value.toString());
} else {
rec.put(i - 1, value);
}
} else if ((value instanceof Long) && meta.getPrecision(i) < MAX_DIGITS_IN_INT) {
int intValue = ((Long) value).intValue();
rec.put(i - 1, intValue);
} else {
rec.put(i - 1, value);
}
} else if (javaSqlType == DATE) {
if (options.useLogicalTypes) {
// Handle SQL DATE fields using system default time zone without conversion
rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, fieldSchema));
} else {
// As string for backward compatibility.
rec.put(i - 1, value.toString());
}
} else if (value instanceof java.sql.Date) {
if (options.useLogicalTypes) {
// Delegate mapping to AvroTypeUtil in order to utilize logical types.
// AvroTypeUtil.convertToAvroObject() expects java.sql.Date object as a UTC normalized date (UTC 00:00:00)
// but it comes from the driver in JVM's local time zone 00:00:00 and needs to be converted.
java.sql.Date normalizedDate = DataTypeUtils.convertDateToUTC((java.sql.Date) value);
rec.put(i - 1, AvroTypeUtil.convertToAvroObject(normalizedDate, fieldSchema));
} else {
// As string for backward compatibility.
rec.put(i - 1, value.toString());
}
} else if (value instanceof Date) {
if (options.useLogicalTypes) {
// Delegate mapping to AvroTypeUtil in order to utilize logical types.
rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, fieldSchema));
} else {
// As string for backward compatibility.
rec.put(i - 1, value.toString());
}
} else if (value instanceof java.sql.SQLXML) {
rec.put(i - 1, ((SQLXML) value).getString());
} else {
// The different types that we support are numbers (int, long, double, float),
// as well as boolean values and Strings. Since Avro doesn't provide
// timestamp types, we want to convert those to Strings. So we will cast anything other
// than numbers or booleans to strings by using the toString() method.
rec.put(i - 1, value.toString());
}
}
try {
dataFileWriter.append(rec);
nrOfRows += 1;
} catch (DataFileWriter.AppendWriteException awe) {
Throwable rootCause = ExceptionUtils.getRootCause(awe);
if (rootCause instanceof UnresolvedUnionException) {
UnresolvedUnionException uue = (UnresolvedUnionException) rootCause;
throw new RuntimeException(
"Unable to resolve union for value " + uue.getUnresolvedDatum() +
" with type " + uue.getUnresolvedDatum().getClass().getCanonicalName() +
" while appending record " + rec,
awe);
} else {
throw awe;
}
}
if (options.maxRows > 0 && nrOfRows == options.maxRows)
break;
}
return nrOfRows;
}
}