private static Consumer createConsumer()

in adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowUtils.java [161:396]


  private static Consumer createConsumer(
      Schema schema,
      String name,
      boolean nullable,
      AvroToArrowConfig config,
      FieldVector consumerVector) {

    Preconditions.checkNotNull(schema, "Avro schema object can't be null");
    Preconditions.checkNotNull(config, "Config can't be null");

    final BufferAllocator allocator = config.getAllocator();

    final Schema.Type type = schema.getType();
    final LogicalType logicalType = schema.getLogicalType();

    final ArrowType arrowType;
    final FieldType fieldType;
    final FieldVector vector;
    final Consumer consumer;

    switch (type) {
      case UNION:
        boolean nullableUnion =
            schema.getTypes().stream().anyMatch(t -> t.getType() == Schema.Type.NULL);
        if (schema.getTypes().size() == 2 && nullableUnion && !config.isLegacyMode()) {
          // For a simple nullable (null | type), interpret the union as a single nullable field.
          // Not available in legacy mode, which uses the literal interpretation instead
          int nullIndex = schema.getTypes().get(0).getType() == Schema.Type.NULL ? 0 : 1;
          int childIndex = nullIndex == 0 ? 1 : 0;
          Schema childSchema = schema.getTypes().get(childIndex);
          Consumer<?> childConsumer =
              createConsumer(childSchema, name, true, config, consumerVector);
          consumer = new AvroNullableConsumer<>(childConsumer, nullIndex);
        } else {
          // Literal interpretation of a union, which may or may not include a null element.
          consumer = createUnionConsumer(schema, name, nullableUnion, config, consumerVector);
        }
        break;
      case ARRAY:
        consumer = createArrayConsumer(schema, name, nullable, config, consumerVector);
        break;
      case MAP:
        consumer = createMapConsumer(schema, name, nullable, config, consumerVector);
        break;
      case RECORD:
        consumer = createStructConsumer(schema, name, nullable, config, consumerVector);
        break;
      case ENUM:
        consumer = createEnumConsumer(schema, name, nullable, config, consumerVector);
        break;
      case STRING:
        arrowType = new ArrowType.Utf8();
        fieldType =
            new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
        vector = createVector(consumerVector, fieldType, name, allocator);
        consumer = new AvroStringConsumer((VarCharVector) vector);
        break;
      case FIXED:
        Map<String, String> extProps = createExternalProps(schema, config);
        if (logicalType instanceof LogicalTypes.Decimal) {
          arrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType, schema);
          fieldType =
              new FieldType(
                  nullable,
                  arrowType,
                  /* dictionary= */ null,
                  getMetaData(schema, extProps, config));
          vector = createVector(consumerVector, fieldType, name, allocator);
          if (schema.getFixedSize() <= 16) {
            consumer =
                new AvroDecimalConsumer.FixedDecimalConsumer(
                    (DecimalVector) vector, schema.getFixedSize());
          } else {
            consumer =
                new AvroDecimal256Consumer.FixedDecimal256Consumer(
                    (Decimal256Vector) vector, schema.getFixedSize());
          }
        } else {
          arrowType = new ArrowType.FixedSizeBinary(schema.getFixedSize());
          fieldType =
              new FieldType(
                  nullable,
                  arrowType,
                  /* dictionary= */ null,
                  getMetaData(schema, extProps, config));
          vector = createVector(consumerVector, fieldType, name, allocator);
          consumer = new AvroFixedConsumer((FixedSizeBinaryVector) vector, schema.getFixedSize());
        }
        break;
      case INT:
        if (logicalType instanceof LogicalTypes.Date) {
          arrowType = new ArrowType.Date(DateUnit.DAY);
          fieldType =
              new FieldType(
                  nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
          vector = createVector(consumerVector, fieldType, name, allocator);
          consumer = new AvroDateConsumer((DateDayVector) vector);
        } else if (logicalType instanceof LogicalTypes.TimeMillis) {
          arrowType = new ArrowType.Time(TimeUnit.MILLISECOND, 32);
          fieldType =
              new FieldType(
                  nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
          vector = createVector(consumerVector, fieldType, name, allocator);
          consumer = new AvroTimeMillisConsumer((TimeMilliVector) vector);
        } else {
          arrowType = new ArrowType.Int(32, /* isSigned= */ true);
          fieldType =
              new FieldType(
                  nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
          vector = createVector(consumerVector, fieldType, name, allocator);
          consumer = new AvroIntConsumer((IntVector) vector);
        }
        break;
      case BOOLEAN:
        arrowType = new ArrowType.Bool();
        fieldType =
            new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
        vector = createVector(consumerVector, fieldType, name, allocator);
        consumer = new AvroBooleanConsumer((BitVector) vector);
        break;
      case LONG:
        if (logicalType instanceof LogicalTypes.TimeMicros) {
          arrowType = new ArrowType.Time(TimeUnit.MICROSECOND, 64);
          fieldType =
              new FieldType(
                  nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
          vector = createVector(consumerVector, fieldType, name, allocator);
          consumer = new AvroTimeMicroConsumer((TimeMicroVector) vector);
        } else if (logicalType instanceof LogicalTypes.TimestampMillis && !config.isLegacyMode()) {
          // In legacy mode the timestamp-xxx types are treated as local, there is no zone aware
          // type
          arrowType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC");
          fieldType =
              new FieldType(
                  nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
          vector = createVector(consumerVector, fieldType, name, allocator);
          consumer = new AvroTimestampMillisTzConsumer((TimeStampMilliTZVector) vector);
        } else if (logicalType instanceof LogicalTypes.TimestampMicros && !config.isLegacyMode()) {
          arrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC");
          fieldType =
              new FieldType(
                  nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
          vector = createVector(consumerVector, fieldType, name, allocator);
          consumer = new AvroTimestampMicrosTzConsumer((TimeStampMicroTZVector) vector);
        } else if (logicalType instanceof LogicalTypes.TimestampNanos && !config.isLegacyMode()) {
          arrowType = new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC");
          fieldType =
              new FieldType(
                  nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
          vector = createVector(consumerVector, fieldType, name, allocator);
          consumer = new AvroTimestampNanosTzConsumer((TimeStampNanoTZVector) vector);
        } else if (logicalType instanceof LogicalTypes.LocalTimestampMillis
            || (logicalType instanceof LogicalTypes.TimestampMillis && config.isLegacyMode())) {
          arrowType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
          fieldType =
              new FieldType(
                  nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
          vector = createVector(consumerVector, fieldType, name, allocator);
          consumer = new AvroTimestampMillisConsumer((TimeStampMilliVector) vector);
        } else if (logicalType instanceof LogicalTypes.LocalTimestampMicros
            || (logicalType instanceof LogicalTypes.TimestampMicros && config.isLegacyMode())) {
          // In legacy mode the timestamp-xxx types are treated as local
          arrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, null);
          fieldType =
              new FieldType(
                  nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
          vector = createVector(consumerVector, fieldType, name, allocator);
          consumer = new AvroTimestampMicrosConsumer((TimeStampMicroVector) vector);
        } else if (logicalType instanceof LogicalTypes.LocalTimestampNanos
            || (logicalType instanceof LogicalTypes.TimestampNanos && config.isLegacyMode())) {
          arrowType = new ArrowType.Timestamp(TimeUnit.NANOSECOND, null);
          fieldType =
              new FieldType(
                  nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
          vector = createVector(consumerVector, fieldType, name, allocator);
          consumer = new AvroTimestampNanosConsumer((TimeStampNanoVector) vector);
        } else {
          arrowType = new ArrowType.Int(64, /* isSigned= */ true);
          fieldType =
              new FieldType(
                  nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
          vector = createVector(consumerVector, fieldType, name, allocator);
          consumer = new AvroLongConsumer((BigIntVector) vector);
        }
        break;
      case FLOAT:
        arrowType = new ArrowType.FloatingPoint(SINGLE);
        fieldType =
            new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
        vector = createVector(consumerVector, fieldType, name, allocator);
        consumer = new AvroFloatConsumer((Float4Vector) vector);
        break;
      case DOUBLE:
        arrowType = new ArrowType.FloatingPoint(DOUBLE);
        fieldType =
            new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
        vector = createVector(consumerVector, fieldType, name, allocator);
        consumer = new AvroDoubleConsumer((Float8Vector) vector);
        break;
      case BYTES:
        if (logicalType instanceof LogicalTypes.Decimal) {
          LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
          arrowType = createDecimalArrowType(decimalType, schema);
          fieldType =
              new FieldType(
                  nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
          vector = createVector(consumerVector, fieldType, name, allocator);
          if (decimalType.getPrecision() <= 38) {
            consumer = new AvroDecimalConsumer.BytesDecimalConsumer((DecimalVector) vector);
          } else {
            consumer =
                new AvroDecimal256Consumer.BytesDecimal256Consumer((Decimal256Vector) vector);
          }
        } else {
          arrowType = new ArrowType.Binary();
          fieldType =
              new FieldType(
                  nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
          vector = createVector(consumerVector, fieldType, name, allocator);
          consumer = new AvroBytesConsumer((VarBinaryVector) vector);
        }
        break;
      case NULL:
        arrowType = new ArrowType.Null();
        fieldType =
            new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
        vector = new NullVector(name, fieldType); // Respect nullability defined in fieldType
        consumer = new AvroNullConsumer((NullVector) vector);
        break;
      default:
        // no-op, shouldn't get here
        throw new UnsupportedOperationException(
            "Can't convert avro type %s to arrow type." + type.getName());
    }
    return consumer;
  }