in adapter/avro/src/main/java/org/apache/arrow/adapter/avro/AvroToArrowUtils.java [161:396]
private static Consumer createConsumer(
Schema schema,
String name,
boolean nullable,
AvroToArrowConfig config,
FieldVector consumerVector) {
Preconditions.checkNotNull(schema, "Avro schema object can't be null");
Preconditions.checkNotNull(config, "Config can't be null");
final BufferAllocator allocator = config.getAllocator();
final Schema.Type type = schema.getType();
final LogicalType logicalType = schema.getLogicalType();
final ArrowType arrowType;
final FieldType fieldType;
final FieldVector vector;
final Consumer consumer;
switch (type) {
case UNION:
boolean nullableUnion =
schema.getTypes().stream().anyMatch(t -> t.getType() == Schema.Type.NULL);
if (schema.getTypes().size() == 2 && nullableUnion && !config.isLegacyMode()) {
// For a simple nullable (null | type), interpret the union as a single nullable field.
// Not available in legacy mode, which uses the literal interpretation instead
int nullIndex = schema.getTypes().get(0).getType() == Schema.Type.NULL ? 0 : 1;
int childIndex = nullIndex == 0 ? 1 : 0;
Schema childSchema = schema.getTypes().get(childIndex);
Consumer<?> childConsumer =
createConsumer(childSchema, name, true, config, consumerVector);
consumer = new AvroNullableConsumer<>(childConsumer, nullIndex);
} else {
// Literal interpretation of a union, which may or may not include a null element.
consumer = createUnionConsumer(schema, name, nullableUnion, config, consumerVector);
}
break;
case ARRAY:
consumer = createArrayConsumer(schema, name, nullable, config, consumerVector);
break;
case MAP:
consumer = createMapConsumer(schema, name, nullable, config, consumerVector);
break;
case RECORD:
consumer = createStructConsumer(schema, name, nullable, config, consumerVector);
break;
case ENUM:
consumer = createEnumConsumer(schema, name, nullable, config, consumerVector);
break;
case STRING:
arrowType = new ArrowType.Utf8();
fieldType =
new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroStringConsumer((VarCharVector) vector);
break;
case FIXED:
Map<String, String> extProps = createExternalProps(schema, config);
if (logicalType instanceof LogicalTypes.Decimal) {
arrowType = createDecimalArrowType((LogicalTypes.Decimal) logicalType, schema);
fieldType =
new FieldType(
nullable,
arrowType,
/* dictionary= */ null,
getMetaData(schema, extProps, config));
vector = createVector(consumerVector, fieldType, name, allocator);
if (schema.getFixedSize() <= 16) {
consumer =
new AvroDecimalConsumer.FixedDecimalConsumer(
(DecimalVector) vector, schema.getFixedSize());
} else {
consumer =
new AvroDecimal256Consumer.FixedDecimal256Consumer(
(Decimal256Vector) vector, schema.getFixedSize());
}
} else {
arrowType = new ArrowType.FixedSizeBinary(schema.getFixedSize());
fieldType =
new FieldType(
nullable,
arrowType,
/* dictionary= */ null,
getMetaData(schema, extProps, config));
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroFixedConsumer((FixedSizeBinaryVector) vector, schema.getFixedSize());
}
break;
case INT:
if (logicalType instanceof LogicalTypes.Date) {
arrowType = new ArrowType.Date(DateUnit.DAY);
fieldType =
new FieldType(
nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroDateConsumer((DateDayVector) vector);
} else if (logicalType instanceof LogicalTypes.TimeMillis) {
arrowType = new ArrowType.Time(TimeUnit.MILLISECOND, 32);
fieldType =
new FieldType(
nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroTimeMillisConsumer((TimeMilliVector) vector);
} else {
arrowType = new ArrowType.Int(32, /* isSigned= */ true);
fieldType =
new FieldType(
nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroIntConsumer((IntVector) vector);
}
break;
case BOOLEAN:
arrowType = new ArrowType.Bool();
fieldType =
new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroBooleanConsumer((BitVector) vector);
break;
case LONG:
if (logicalType instanceof LogicalTypes.TimeMicros) {
arrowType = new ArrowType.Time(TimeUnit.MICROSECOND, 64);
fieldType =
new FieldType(
nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroTimeMicroConsumer((TimeMicroVector) vector);
} else if (logicalType instanceof LogicalTypes.TimestampMillis && !config.isLegacyMode()) {
// In legacy mode the timestamp-xxx types are treated as local, there is no zone aware
// type
arrowType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC");
fieldType =
new FieldType(
nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroTimestampMillisTzConsumer((TimeStampMilliTZVector) vector);
} else if (logicalType instanceof LogicalTypes.TimestampMicros && !config.isLegacyMode()) {
arrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC");
fieldType =
new FieldType(
nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroTimestampMicrosTzConsumer((TimeStampMicroTZVector) vector);
} else if (logicalType instanceof LogicalTypes.TimestampNanos && !config.isLegacyMode()) {
arrowType = new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC");
fieldType =
new FieldType(
nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroTimestampNanosTzConsumer((TimeStampNanoTZVector) vector);
} else if (logicalType instanceof LogicalTypes.LocalTimestampMillis
|| (logicalType instanceof LogicalTypes.TimestampMillis && config.isLegacyMode())) {
arrowType = new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
fieldType =
new FieldType(
nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroTimestampMillisConsumer((TimeStampMilliVector) vector);
} else if (logicalType instanceof LogicalTypes.LocalTimestampMicros
|| (logicalType instanceof LogicalTypes.TimestampMicros && config.isLegacyMode())) {
// In legacy mode the timestamp-xxx types are treated as local
arrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, null);
fieldType =
new FieldType(
nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroTimestampMicrosConsumer((TimeStampMicroVector) vector);
} else if (logicalType instanceof LogicalTypes.LocalTimestampNanos
|| (logicalType instanceof LogicalTypes.TimestampNanos && config.isLegacyMode())) {
arrowType = new ArrowType.Timestamp(TimeUnit.NANOSECOND, null);
fieldType =
new FieldType(
nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroTimestampNanosConsumer((TimeStampNanoVector) vector);
} else {
arrowType = new ArrowType.Int(64, /* isSigned= */ true);
fieldType =
new FieldType(
nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroLongConsumer((BigIntVector) vector);
}
break;
case FLOAT:
arrowType = new ArrowType.FloatingPoint(SINGLE);
fieldType =
new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroFloatConsumer((Float4Vector) vector);
break;
case DOUBLE:
arrowType = new ArrowType.FloatingPoint(DOUBLE);
fieldType =
new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroDoubleConsumer((Float8Vector) vector);
break;
case BYTES:
if (logicalType instanceof LogicalTypes.Decimal) {
LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
arrowType = createDecimalArrowType(decimalType, schema);
fieldType =
new FieldType(
nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
vector = createVector(consumerVector, fieldType, name, allocator);
if (decimalType.getPrecision() <= 38) {
consumer = new AvroDecimalConsumer.BytesDecimalConsumer((DecimalVector) vector);
} else {
consumer =
new AvroDecimal256Consumer.BytesDecimal256Consumer((Decimal256Vector) vector);
}
} else {
arrowType = new ArrowType.Binary();
fieldType =
new FieldType(
nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
vector = createVector(consumerVector, fieldType, name, allocator);
consumer = new AvroBytesConsumer((VarBinaryVector) vector);
}
break;
case NULL:
arrowType = new ArrowType.Null();
fieldType =
new FieldType(nullable, arrowType, /* dictionary= */ null, getMetaData(schema, config));
vector = new NullVector(name, fieldType); // Respect nullability defined in fieldType
consumer = new AvroNullConsumer((NullVector) vector);
break;
default:
// no-op, shouldn't get here
throw new UnsupportedOperationException(
"Can't convert avro type %s to arrow type." + type.getName());
}
return consumer;
}