protected static Schema commonSchemaFor()

in connect/api/src/main/java/org/apache/kafka/connect/data/Values.java [869:936]


    protected static Schema commonSchemaFor(Schema previous, SchemaAndValue latest) {
        if (latest == null) {
            return previous;
        }
        if (previous == null) {
            return latest.schema();
        }
        Schema newSchema = latest.schema();
        Type previousType = previous.type();
        Type newType = newSchema.type();
        if (previousType != newType) {
            switch (previous.type()) {
                case INT8:
                    if (newType == Type.INT16 || newType == Type.INT32 || newType == Type.INT64 || newType == Type.FLOAT32 || newType ==
                                                                                                                              Type.FLOAT64) {
                        return newSchema;
                    }
                    break;
                case INT16:
                    if (newType == Type.INT8) {
                        return previous;
                    }
                    if (newType == Type.INT32 || newType == Type.INT64 || newType == Type.FLOAT32 || newType == Type.FLOAT64) {
                        return newSchema;
                    }
                    break;
                case INT32:
                    if (newType == Type.INT8 || newType == Type.INT16) {
                        return previous;
                    }
                    if (newType == Type.INT64 || newType == Type.FLOAT32 || newType == Type.FLOAT64) {
                        return newSchema;
                    }
                    break;
                case INT64:
                    if (newType == Type.INT8 || newType == Type.INT16 || newType == Type.INT32) {
                        return previous;
                    }
                    if (newType == Type.FLOAT32 || newType == Type.FLOAT64) {
                        return newSchema;
                    }
                    break;
                case FLOAT32:
                    if (newType == Type.INT8 || newType == Type.INT16 || newType == Type.INT32 || newType == Type.INT64) {
                        return previous;
                    }
                    if (newType == Type.FLOAT64) {
                        return newSchema;
                    }
                    break;
                case FLOAT64:
                    if (newType == Type.INT8 || newType == Type.INT16 || newType == Type.INT32 || newType == Type.INT64 || newType ==
                                                                                                                           Type.FLOAT32) {
                        return previous;
                    }
                    break;
            }
            return null;
        }
        if (previous.isOptional() == newSchema.isOptional()) {
            // Use the optional one
            return previous.isOptional() ? previous : newSchema;
        }
        if (!previous.equals(newSchema)) {
            return null;
        }
        return previous;
    }