in connect/api/src/main/java/org/apache/kafka/connect/data/Values.java [869:936]
protected static Schema commonSchemaFor(Schema previous, SchemaAndValue latest) {
if (latest == null) {
return previous;
}
if (previous == null) {
return latest.schema();
}
Schema newSchema = latest.schema();
Type previousType = previous.type();
Type newType = newSchema.type();
if (previousType != newType) {
switch (previous.type()) {
case INT8:
if (newType == Type.INT16 || newType == Type.INT32 || newType == Type.INT64 || newType == Type.FLOAT32 || newType ==
Type.FLOAT64) {
return newSchema;
}
break;
case INT16:
if (newType == Type.INT8) {
return previous;
}
if (newType == Type.INT32 || newType == Type.INT64 || newType == Type.FLOAT32 || newType == Type.FLOAT64) {
return newSchema;
}
break;
case INT32:
if (newType == Type.INT8 || newType == Type.INT16) {
return previous;
}
if (newType == Type.INT64 || newType == Type.FLOAT32 || newType == Type.FLOAT64) {
return newSchema;
}
break;
case INT64:
if (newType == Type.INT8 || newType == Type.INT16 || newType == Type.INT32) {
return previous;
}
if (newType == Type.FLOAT32 || newType == Type.FLOAT64) {
return newSchema;
}
break;
case FLOAT32:
if (newType == Type.INT8 || newType == Type.INT16 || newType == Type.INT32 || newType == Type.INT64) {
return previous;
}
if (newType == Type.FLOAT64) {
return newSchema;
}
break;
case FLOAT64:
if (newType == Type.INT8 || newType == Type.INT16 || newType == Type.INT32 || newType == Type.INT64 || newType ==
Type.FLOAT32) {
return previous;
}
break;
}
return null;
}
if (previous.isOptional() == newSchema.isOptional()) {
// Use the optional one
return previous.isOptional() ? previous : newSchema;
}
if (!previous.equals(newSchema)) {
return null;
}
return previous;
}