private static DataType fromDebeziumAvroType()

in paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java [438:516]


    private static DataType fromDebeziumAvroType(Schema schema) {
        LogicalType logicalType = schema.getLogicalType();
        if (logicalType != null) {
            if (logicalType instanceof LogicalTypes.Date) {
                return DataTypes.DATE();
            } else if (logicalType instanceof LogicalTypes.TimestampMillis) {
                return DataTypes.TIMESTAMP_MILLIS();
            } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
                return DataTypes.TIMESTAMP();
            } else if (logicalType instanceof LogicalTypes.Decimal) {
                LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
                return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale());
            } else if (logicalType instanceof LogicalTypes.TimeMillis) {
                return DataTypes.TIME(3);
            } else if (logicalType instanceof LogicalTypes.TimeMicros) {
                return DataTypes.TIME(6);
            } else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) {
                return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE();
            } else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) {
                return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3);
            } else {
                throw new UnsupportedOperationException(
                        String.format("Don't support logical avro type '%s' yet.", logicalType));
            }
        }
        Schema.Type avroType = schema.getType();
        switch (avroType) {
            case BOOLEAN:
                return DataTypes.BOOLEAN();
            case BYTES:
            case FIXED:
                return DataTypes.BYTES();
            case DOUBLE:
                return DataTypes.DOUBLE();
            case FLOAT:
                return DataTypes.FLOAT();
            case INT:
                return DataTypes.INT();
            case LONG:
                return DataTypes.BIGINT();
            case STRING:
                return DataTypes.STRING();
            case RECORD:
                List<DataField> fields = new ArrayList<>();
                for (Schema.Field field : schema.getFields()) {
                    DataType fieldType = fromDebeziumAvroType(field.schema());
                    fields.add(DataTypes.FIELD(field.pos(), field.name(), fieldType, field.doc()));
                }
                return DataTypes.ROW(fields.toArray(new DataField[0]));
            case ARRAY:
                Schema elementSchema = schema.getElementType();
                DataType elementType = fromDebeziumAvroType(elementSchema);
                return DataTypes.ARRAY(elementType);
            case MAP:
                DataType valueType = fromDebeziumAvroType(schema.getValueType());
                return DataTypes.MAP(DataTypes.STRING(), valueType);
            case UNION:
                List<Schema> unionTypes = schema.getTypes();
                // Check if it's a nullable type union
                if (unionTypes.size() == 2
                        && unionTypes.contains(Schema.create(Schema.Type.NULL))) {
                    Schema actualSchema =
                            unionTypes.stream()
                                    .filter(s -> s.getType() != Schema.Type.NULL)
                                    .findFirst()
                                    .orElseThrow(
                                            () ->
                                                    new IllegalStateException(
                                                            "Union type does not contain a non-null type"));
                    return fromDebeziumAvroType(actualSchema)
                            .copy(true); // Return nullable version of the non-null type
                }
                // Handle generic unions or throw an exception
                throw new UnsupportedOperationException("Generic unions are not supported");
            default:
                throw new UnsupportedOperationException(
                        String.format("Don't support avro type '%s' yet.", avroType));
        }
    }