in flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java [129:194]
public ValueReader<?> primitive(Type partner, Schema primitive) {
LogicalType logicalType = primitive.getLogicalType();
if (logicalType != null) {
switch (logicalType.getName()) {
case "date":
// Flink uses the same representation
return ValueReaders.ints();
case "time-micros":
return FlinkValueReaders.timeMicros();
case "timestamp-millis":
return FlinkValueReaders.timestampMills();
case "timestamp-micros":
return FlinkValueReaders.timestampMicros();
case "timestamp-nanos":
return FlinkValueReaders.timestampNanos();
case "decimal":
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
return FlinkValueReaders.decimal(
ValueReaders.decimalBytesReader(primitive),
decimal.getPrecision(),
decimal.getScale());
case "uuid":
return FlinkValueReaders.uuids();
default:
throw new IllegalArgumentException("Unknown logical type: " + logicalType.getName());
}
}
switch (primitive.getType()) {
case NULL:
return ValueReaders.nulls();
case BOOLEAN:
return ValueReaders.booleans();
case INT:
if (partner != null && partner.typeId() == Type.TypeID.LONG) {
return ValueReaders.intsAsLongs();
}
return ValueReaders.ints();
case LONG:
return ValueReaders.longs();
case FLOAT:
if (partner != null && partner.typeId() == Type.TypeID.DOUBLE) {
return ValueReaders.floatsAsDoubles();
}
return ValueReaders.floats();
case DOUBLE:
return ValueReaders.doubles();
case STRING:
return FlinkValueReaders.strings();
case FIXED:
return ValueReaders.fixed(primitive.getFixedSize());
case BYTES:
return ValueReaders.bytes();
case ENUM:
return FlinkValueReaders.enums(primitive.getEnumSymbols());
default:
throw new IllegalArgumentException("Unsupported type: " + primitive);
}
}