public static RowDataToAvroConverter createConverter()

in hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java [83:270]


  public static RowDataToAvroConverter createConverter(LogicalType type, boolean utcTimezone) {
    final RowDataToAvroConverter converter;
    switch (type.getTypeRoot()) {
      case NULL:
        converter =
            new RowDataToAvroConverter() {
              private static final long serialVersionUID = 1L;

              @Override
              public Object convert(Schema schema, Object object) {
                return null;
              }
            };
        break;
      case TINYINT:
        converter =
            new RowDataToAvroConverter() {
              private static final long serialVersionUID = 1L;

              @Override
              public Object convert(Schema schema, Object object) {
                return ((Byte) object).intValue();
              }
            };
        break;
      case SMALLINT:
        converter =
            new RowDataToAvroConverter() {
              private static final long serialVersionUID = 1L;

              @Override
              public Object convert(Schema schema, Object object) {
                return ((Short) object).intValue();
              }
            };
        break;
      case BOOLEAN: // boolean
      case INTEGER: // int
      case INTERVAL_YEAR_MONTH: // long
      case BIGINT: // long
      case INTERVAL_DAY_TIME: // long
      case FLOAT: // float
      case DOUBLE: // double
      case TIME_WITHOUT_TIME_ZONE: // int
      case DATE: // int
        converter =
            new RowDataToAvroConverter() {
              private static final long serialVersionUID = 1L;

              @Override
              public Object convert(Schema schema, Object object) {
                return object;
              }
            };
        break;
      case CHAR:
      case VARCHAR:
        converter =
            new RowDataToAvroConverter() {
              private static final long serialVersionUID = 1L;

              @Override
              public Object convert(Schema schema, Object object) {
                return new Utf8(object.toString());
              }
            };
        break;
      case BINARY:
      case VARBINARY:
        converter =
            new RowDataToAvroConverter() {
              private static final long serialVersionUID = 1L;

              @Override
              public Object convert(Schema schema, Object object) {
                return ByteBuffer.wrap((byte[]) object);
              }
            };
        break;
      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
        int precision = RowDataUtils.precision(type);
        if (precision <= 3) {
          converter = new RowDataToAvroConverter() {
            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Schema schema, Object object) {
                return ((TimestampData) object).toInstant().toEpochMilli();
              }
          };
        } else if (precision <= 6) {
          converter = new RowDataToAvroConverter() {
            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Schema schema, Object object) {
              Instant instant = ((TimestampData) object).toInstant();
              return Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 1000_000), instant.getNano() / 1000);
            }
          };
        } else {
          throw new UnsupportedOperationException("Unsupported timestamp precision: " + precision);
        }
        break;
      case TIMESTAMP_WITHOUT_TIME_ZONE:
        precision = RowDataUtils.precision(type);
        if (precision <= 3) {
          converter =
              new RowDataToAvroConverter() {
                private static final long serialVersionUID = 1L;

                @Override
                public Object convert(Schema schema, Object object) {
                  return utcTimezone ? ((TimestampData) object).toInstant().toEpochMilli() : ((TimestampData) object).toTimestamp().getTime();
                }
              };
        } else if (precision <= 6) {
          converter =
              new RowDataToAvroConverter() {
                private static final long serialVersionUID = 1L;

                @Override
                public Object convert(Schema schema, Object object) {
                  Instant instant = utcTimezone ? ((TimestampData) object).toInstant() : ((TimestampData) object).toTimestamp().toInstant();
                  return  Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 1000_000), instant.getNano() / 1000);
                }
              };
        } else {
          throw new UnsupportedOperationException("Unsupported timestamp precision: " + precision);
        }
        break;
      case DECIMAL:
        converter =
            new RowDataToAvroConverter() {
              private static final long serialVersionUID = 1L;

              @Override
              public Object convert(Schema schema, Object object) {
                BigDecimal javaDecimal = ((DecimalData) object).toBigDecimal();
                return DECIMAL_CONVERSION.toFixed(javaDecimal, schema, schema.getLogicalType());
              }
            };
        break;
      case ARRAY:
        converter = createArrayConverter((ArrayType) type, utcTimezone);
        break;
      case ROW:
        converter = createRowConverter((RowType) type, utcTimezone);
        break;
      case MAP:
      case MULTISET:
        converter = createMapConverter(type, utcTimezone);
        break;
      case RAW:
      default:
        throw new UnsupportedOperationException("Unsupported type: " + type);
    }

    // wrap into nullable converter
    return new RowDataToAvroConverter() {
      private static final long serialVersionUID = 1L;

      @Override
      public Object convert(Schema schema, Object object) {
        if (object == null) {
          return null;
        }

        // get actual schema if it is a nullable schema
        Schema actualSchema;
        if (schema.getType() == Schema.Type.UNION) {
          List<Schema> types = schema.getTypes();
          int size = types.size();
          if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
            actualSchema = types.get(0);
          } else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
            actualSchema = types.get(1);
          } else {
            throw new IllegalArgumentException(
                "The Avro schema is not a nullable type: " + schema);
          }
        } else {
          actualSchema = schema;
        }
        return converter.convert(actualSchema, object);
      }
    };
  }