public ValueReader primitive()

in flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkPlannedAvroReader.java [129:194]


    public ValueReader<?> primitive(Type partner, Schema primitive) {
      LogicalType logicalType = primitive.getLogicalType();
      if (logicalType != null) {
        switch (logicalType.getName()) {
          case "date":
            // Flink uses the same representation
            return ValueReaders.ints();

          case "time-micros":
            return FlinkValueReaders.timeMicros();

          case "timestamp-millis":
            return FlinkValueReaders.timestampMills();

          case "timestamp-micros":
            return FlinkValueReaders.timestampMicros();

          case "timestamp-nanos":
            return FlinkValueReaders.timestampNanos();

          case "decimal":
            LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
            return FlinkValueReaders.decimal(
                ValueReaders.decimalBytesReader(primitive),
                decimal.getPrecision(),
                decimal.getScale());

          case "uuid":
            return FlinkValueReaders.uuids();

          default:
            throw new IllegalArgumentException("Unknown logical type: " + logicalType.getName());
        }
      }

      switch (primitive.getType()) {
        case NULL:
          return ValueReaders.nulls();
        case BOOLEAN:
          return ValueReaders.booleans();
        case INT:
          if (partner != null && partner.typeId() == Type.TypeID.LONG) {
            return ValueReaders.intsAsLongs();
          }
          return ValueReaders.ints();
        case LONG:
          return ValueReaders.longs();
        case FLOAT:
          if (partner != null && partner.typeId() == Type.TypeID.DOUBLE) {
            return ValueReaders.floatsAsDoubles();
          }
          return ValueReaders.floats();
        case DOUBLE:
          return ValueReaders.doubles();
        case STRING:
          return FlinkValueReaders.strings();
        case FIXED:
          return ValueReaders.fixed(primitive.getFixedSize());
        case BYTES:
          return ValueReaders.bytes();
        case ENUM:
          return FlinkValueReaders.enums(primitive.getEnumSymbols());
        default:
          throw new IllegalArgumentException("Unsupported type: " + primitive);
      }
    }