in connect/api/src/main/java/org/apache/kafka/connect/data/Values.java [384:565]
protected static Object convertTo(Schema toSchema, Schema fromSchema, Object value) throws DataException {
if (value == null) {
if (toSchema.isOptional()) {
return null;
}
throw new DataException("Unable to convert a null value to a schema that requires a value");
}
switch (toSchema.type()) {
case BYTES:
if (Decimal.LOGICAL_NAME.equals(toSchema.name())) {
if (value instanceof ByteBuffer) {
value = Utils.toArray((ByteBuffer) value);
}
if (value instanceof byte[]) {
return Decimal.toLogical(toSchema, (byte[]) value);
}
if (value instanceof BigDecimal) {
return value;
}
if (value instanceof Number) {
// Not already a decimal, so treat it as a double ...
double converted = ((Number) value).doubleValue();
return new BigDecimal(converted);
}
if (value instanceof String) {
return new BigDecimal(value.toString()).doubleValue();
}
}
if (value instanceof ByteBuffer) {
return Utils.toArray((ByteBuffer) value);
}
if (value instanceof byte[]) {
return value;
}
if (value instanceof BigDecimal) {
return Decimal.fromLogical(toSchema, (BigDecimal) value);
}
break;
case STRING:
StringBuilder sb = new StringBuilder();
append(sb, value, false);
return sb.toString();
case BOOLEAN:
if (value instanceof Boolean) {
return value;
}
if (value instanceof String) {
SchemaAndValue parsed = parseString(value.toString());
if (parsed.value() instanceof Boolean) {
return parsed.value();
}
}
return asLong(value, fromSchema, null) == 0L ? Boolean.FALSE : Boolean.TRUE;
case INT8:
if (value instanceof Byte) {
return value;
}
return (byte) asLong(value, fromSchema, null);
case INT16:
if (value instanceof Short) {
return value;
}
return (short) asLong(value, fromSchema, null);
case INT32:
if (Date.LOGICAL_NAME.equals(toSchema.name())) {
if (value instanceof String) {
SchemaAndValue parsed = parseString(value.toString());
value = parsed.value();
}
if (value instanceof java.util.Date) {
if (fromSchema != null) {
String fromSchemaName = fromSchema.name();
if (Date.LOGICAL_NAME.equals(fromSchemaName)) {
return value;
}
if (Timestamp.LOGICAL_NAME.equals(fromSchemaName)) {
// Just get the number of days from this timestamp
long millis = ((java.util.Date) value).getTime();
int days = (int) (millis / MILLIS_PER_DAY); // truncates
return Date.toLogical(toSchema, days);
}
}
}
long numeric = asLong(value, fromSchema, null);
return Date.toLogical(toSchema, (int) numeric);
}
if (Time.LOGICAL_NAME.equals(toSchema.name())) {
if (value instanceof String) {
SchemaAndValue parsed = parseString(value.toString());
value = parsed.value();
}
if (value instanceof java.util.Date) {
if (fromSchema != null) {
String fromSchemaName = fromSchema.name();
if (Time.LOGICAL_NAME.equals(fromSchemaName)) {
return value;
}
if (Timestamp.LOGICAL_NAME.equals(fromSchemaName)) {
// Just get the time portion of this timestamp
Calendar calendar = Calendar.getInstance(UTC);
calendar.setTime((java.util.Date) value);
calendar.set(Calendar.YEAR, 1970);
calendar.set(Calendar.MONTH, 1);
calendar.set(Calendar.DAY_OF_MONTH, 1);
return Time.toLogical(toSchema, (int) calendar.getTimeInMillis());
}
}
}
long numeric = asLong(value, fromSchema, null);
return Time.toLogical(toSchema, (int) numeric);
}
if (value instanceof Integer) {
return value;
}
return (int) asLong(value, fromSchema, null);
case INT64:
if (Timestamp.LOGICAL_NAME.equals(toSchema.name())) {
if (value instanceof String) {
SchemaAndValue parsed = parseString(value.toString());
value = parsed.value();
}
if (value instanceof java.util.Date) {
java.util.Date date = (java.util.Date) value;
if (fromSchema != null) {
String fromSchemaName = fromSchema.name();
if (Date.LOGICAL_NAME.equals(fromSchemaName)) {
int days = Date.fromLogical(fromSchema, date);
long millis = days * MILLIS_PER_DAY;
return Timestamp.toLogical(toSchema, millis);
}
if (Time.LOGICAL_NAME.equals(fromSchemaName)) {
long millis = Time.fromLogical(fromSchema, date);
return Timestamp.toLogical(toSchema, millis);
}
if (Timestamp.LOGICAL_NAME.equals(fromSchemaName)) {
return value;
}
}
}
long numeric = asLong(value, fromSchema, null);
return Timestamp.toLogical(toSchema, numeric);
}
if (value instanceof Long) {
return value;
}
return asLong(value, fromSchema, null);
case FLOAT32:
if (value instanceof Float) {
return value;
}
return (float) asDouble(value, fromSchema, null);
case FLOAT64:
if (value instanceof Double) {
return value;
}
return asDouble(value, fromSchema, null);
case ARRAY:
if (value instanceof String) {
SchemaAndValue schemaAndValue = parseString(value.toString());
value = schemaAndValue.value();
}
if (value instanceof List) {
return value;
}
break;
case MAP:
if (value instanceof String) {
SchemaAndValue schemaAndValue = parseString(value.toString());
value = schemaAndValue.value();
}
if (value instanceof Map) {
return value;
}
break;
case STRUCT:
if (value instanceof Struct) {
Struct struct = (Struct) value;
return struct;
}
}
throw new DataException("Unable to convert " + value + " (" + value.getClass() + ") to " + toSchema);
}