public static long convertToAvroStream()

in nifi-extension-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java [261:486]


    public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, final AvroConversionOptions options, final ResultSetRowCallback callback)
            throws SQLException, IOException {
        final Schema schema = createSchema(rs, options);
        final GenericRecord rec = new GenericData.Record(schema);

        final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
        try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
            dataFileWriter.setCodec(options.codec);
            dataFileWriter.create(schema, outStream);

            final ResultSetMetaData meta = rs.getMetaData();
            final int nrOfColumns = meta.getColumnCount();
            long nrOfRows = 0;
            while (rs.next()) {
                if (callback != null) {
                    callback.processRow(rs);
                }
                for (int i = 1; i <= nrOfColumns; i++) {
                    final int javaSqlType = meta.getColumnType(i);
                    final Schema fieldSchema = schema.getFields().get(i - 1).schema();

                    // Need to handle CLOB and NCLOB before getObject() is called, due to ResultSet's maximum portability statement
                    if (javaSqlType == CLOB || javaSqlType == NCLOB) {
                        Clob clob = rs.getClob(i);
                        if (clob != null) {
                            StringBuilder sb = new StringBuilder();
                            char[] buffer = new char[32 * 1024]; // 32K default buffer
                            try (Reader reader = clob.getCharacterStream()) {
                                int charsRead;
                                while ((charsRead = reader.read(buffer)) != -1) {
                                    sb.append(buffer, 0, charsRead);
                                }
                            }
                            rec.put(i - 1, sb.toString());
                            try {
                                clob.free();
                            } catch (SQLFeatureNotSupportedException sfnse) {
                                // The driver doesn't support free, but allow processing to continue
                                logger.debug("Database Driver does not support freeing clob objects");
                            }
                        } else {
                            rec.put(i - 1, null);
                        }
                        continue;
                    }

                    if (javaSqlType == BLOB) {
                        Blob blob = rs.getBlob(i);
                        if (blob != null) {
                            long numChars = blob.length();
                            byte[] buffer = new byte[(int) numChars];
                            try (InputStream is = blob.getBinaryStream()) {
                                int index = 0;
                                int c = is.read();
                                while (c >= 0) {
                                    buffer[index++] = (byte) c;
                                    c = is.read();
                                }
                            }
                            ByteBuffer bb = ByteBuffer.wrap(buffer);
                            rec.put(i - 1, bb);
                            try {
                                blob.free();
                            } catch (SQLFeatureNotSupportedException sfnse) {
                                // The driver doesn't support free, but allow processing to continue
                                logger.debug("Database Driver does not support freeing blob objects");
                            }
                        } else {
                            rec.put(i - 1, null);
                        }
                        continue;
                    }

                    Object value;

                    // If a Timestamp type, try getTimestamp() rather than getObject()
                    if (javaSqlType == TIMESTAMP
                            || javaSqlType == TIMESTAMP_WITH_TIMEZONE
                            // The following are Oracle-specific codes for TIMESTAMP WITH TIME ZONE and TIMESTAMP WITH LOCAL TIME ZONE. This would be better
                            // located in the DatabaseAdapter interfaces, but some processors (like ExecuteSQL) use this method but don't specify a DatabaseAdapter.
                            || javaSqlType == -101
                            || javaSqlType == -102) {
                        try {
                            value = rs.getTimestamp(i);
                            // Some drivers (like Derby) return null for getTimestamp() but return a Timestamp object in getObject()
                            if (value == null) {
                                value = rs.getObject(i);
                            }
                        } catch (Exception e) {
                            // The cause of the exception is not known, but we'll fall back to call getObject() and handle any "real" exception there
                            value = rs.getObject(i);
                        }
                    } else {
                        value = rs.getObject(i);
                    }

                    if (value == null) {
                        rec.put(i - 1, null);

                    } else if (javaSqlType == BINARY || javaSqlType == VARBINARY || javaSqlType == LONGVARBINARY || javaSqlType == ARRAY) {
                        // bytes requires little bit different handling
                        byte[] bytes = rs.getBytes(i);
                        ByteBuffer bb = ByteBuffer.wrap(bytes);
                        rec.put(i - 1, bb);
                    } else if (javaSqlType == 100) { // Handle Oracle BINARY_FLOAT data type
                        rec.put(i - 1, rs.getFloat(i));
                    } else if (javaSqlType == 101) { // Handle Oracle BINARY_DOUBLE data type
                        rec.put(i - 1, rs.getDouble(i));
                    } else if (value instanceof Byte) {
                        // tinyint(1) type is returned by JDBC driver as java.sql.Types.TINYINT
                        // But value is returned by JDBC as java.lang.Byte
                        // (at least H2 JDBC works this way)
                        // direct put to avro record results:
                        // org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte
                        rec.put(i - 1, ((Byte) value).intValue());
                    } else if (value instanceof Short) {
                        //MS SQL returns TINYINT as a Java Short, which Avro doesn't understand.
                        rec.put(i - 1, ((Short) value).intValue());
                    } else if (value instanceof BigDecimal) {
                        if (options.useLogicalTypes) {
                            // Delegate mapping to AvroTypeUtil in order to utilize logical types.
                            rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, fieldSchema));
                        } else {
                            // As string for backward compatibility.
                            rec.put(i - 1, value.toString());
                        }

                    } else if (value instanceof BigInteger) {
                        // Check the precision of the BIGINT. Some databases allow arbitrary precision (> 19), but Avro won't handle that.
                        // It the SQL type is BIGINT and the precision is between 0 and 19 (inclusive); if so, the BigInteger is likely a
                        // long (and the schema says it will be), so try to get its value as a long.
                        // Otherwise, Avro can't handle BigInteger as a number - it will throw an AvroRuntimeException
                        // such as: "Unknown datum type: java.math.BigInteger: 38". In this case the schema is expecting a string.
                        if (javaSqlType == BIGINT) {
                            int precision = meta.getPrecision(i);
                            if (precision < 0 || precision > MAX_DIGITS_IN_BIGINT) {
                                rec.put(i - 1, value.toString());
                            } else {
                                try {
                                    rec.put(i - 1, ((BigInteger) value).longValueExact());
                                } catch (ArithmeticException ae) {
                                    // Since the value won't fit in a long, convert it to a string
                                    rec.put(i - 1, value.toString());
                                }
                            }
                        } else {
                            rec.put(i - 1, value.toString());
                        }

                    } else if (value instanceof Number || value instanceof Boolean) {
                        if (javaSqlType == BIGINT) {
                            int precision = meta.getPrecision(i);
                            if (precision < 0 || precision > MAX_DIGITS_IN_BIGINT) {
                                rec.put(i - 1, value.toString());
                            } else {
                                rec.put(i - 1, value);
                            }
                        } else if ((value instanceof Long) && meta.getPrecision(i) < MAX_DIGITS_IN_INT) {
                            int intValue = ((Long) value).intValue();
                            rec.put(i - 1, intValue);
                        } else {
                            rec.put(i - 1, value);
                        }
                    } else if (javaSqlType == DATE) {
                        if (options.useLogicalTypes) {
                            // Handle SQL DATE fields using system default time zone without conversion
                            rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, fieldSchema));
                        } else {
                            // As string for backward compatibility.
                            rec.put(i - 1, value.toString());
                        }
                    } else if (value instanceof java.sql.Date) {
                        if (options.useLogicalTypes) {
                            // Delegate mapping to AvroTypeUtil in order to utilize logical types.
                            // AvroTypeUtil.convertToAvroObject() expects java.sql.Date object as a UTC normalized date (UTC 00:00:00)
                            // but it comes from the driver in JVM's local time zone 00:00:00 and needs to be converted.
                            java.sql.Date normalizedDate = DataTypeUtils.convertDateToUTC((java.sql.Date) value);
                            rec.put(i - 1, AvroTypeUtil.convertToAvroObject(normalizedDate, fieldSchema));
                        } else {
                            // As string for backward compatibility.
                            rec.put(i - 1, value.toString());
                        }

                    } else if (value instanceof Date) {
                        if (options.useLogicalTypes) {
                            // Delegate mapping to AvroTypeUtil in order to utilize logical types.
                            rec.put(i - 1, AvroTypeUtil.convertToAvroObject(value, fieldSchema));
                        } else {
                            // As string for backward compatibility.
                            rec.put(i - 1, value.toString());
                        }

                    } else if (value instanceof java.sql.SQLXML) {
                        rec.put(i - 1, ((SQLXML) value).getString());
                    } else {
                        // The different types that we support are numbers (int, long, double, float),
                        // as well as boolean values and Strings. Since Avro doesn't provide
                        // timestamp types, we want to convert those to Strings. So we will cast anything other
                        // than numbers or booleans to strings by using the toString() method.
                        rec.put(i - 1, value.toString());
                    }
                }
                try {
                    dataFileWriter.append(rec);
                    nrOfRows += 1;
                } catch (DataFileWriter.AppendWriteException awe) {
                    Throwable rootCause = ExceptionUtils.getRootCause(awe);
                    if (rootCause instanceof UnresolvedUnionException) {
                        UnresolvedUnionException uue = (UnresolvedUnionException) rootCause;
                        throw new RuntimeException(
                                "Unable to resolve union for value " + uue.getUnresolvedDatum() +
                                " with type " + uue.getUnresolvedDatum().getClass().getCanonicalName() +
                                " while appending record " + rec,
                                awe);
                    } else {
                        throw awe;
                    }
                }

                if (options.maxRows > 0 && nrOfRows == options.maxRows)
                    break;
            }

            return nrOfRows;
        }
    }