in rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java [185:279]
static {
LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new LogicalTypeConverter() {
@Override
public Object toJson(final Schema schema, final Object value, final JsonConverterConfig config) {
if (!(value instanceof BigDecimal)) {
throw new ConnectException("Invalid type for Decimal, expected BigDecimal but was " + value.getClass());
}
final BigDecimal decimal = (BigDecimal) value;
switch (config.decimalFormat()) {
case NUMERIC:
return decimal;
case BASE64:
return Decimal.fromLogical(schema, decimal);
default:
throw new ConnectException("Unexpected " + JsonConverterConfig.DECIMAL_FORMAT_CONFIG + ": " + config.decimalFormat());
}
}
@Override
public Object toConnect(final Schema schema, final Object value) {
if (value instanceof BigDecimal) {
return (BigDecimal) value;
}
if (value instanceof byte[]) {
try {
return Decimal.toLogical(schema, (byte[]) value);
} catch (Exception e) {
throw new ConnectException("Invalid bytes for Decimal field", e);
}
}
if (value instanceof String) {
try {
return Decimal.toLogical(schema, TypeUtils.castToBytes((String) value));
} catch (Exception e) {
throw new ConnectException("Invalid bytes for Decimal field", e);
}
}
throw new ConnectException("Invalid type for Decimal, underlying representation should be numeric or bytes but was " + value);
}
});
LOGICAL_CONVERTERS.put(Date.LOGICAL_NAME, new LogicalTypeConverter() {
@Override
public Object toJson(final Schema schema, final Object value, final JsonConverterConfig config) {
if (!(value instanceof java.util.Date)) {
throw new ConnectException("Invalid type for Date, expected Date but was " + value.getClass());
}
return Date.fromLogical(schema, (java.util.Date) value);
}
@Override
public Object toConnect(final Schema schema, final Object value) {
if (!(value instanceof Integer)) {
throw new ConnectException("Invalid type for Date, underlying representation should be integer but was " + value);
}
return Date.toLogical(schema, (Integer) value);
}
});
LOGICAL_CONVERTERS.put(Time.LOGICAL_NAME, new LogicalTypeConverter() {
@Override
public Object toJson(final Schema schema, final Object value, final JsonConverterConfig config) {
if (!(value instanceof java.util.Date)) {
throw new ConnectException("Invalid type for Time, expected Date but was " + value.getClass());
}
return Time.fromLogical(schema, (java.util.Date) value);
}
@Override
public Object toConnect(final Schema schema, final Object value) {
if (!(value instanceof Integer)) {
throw new ConnectException("Invalid type for Time, underlying representation should be integer but was " + value);
}
return Time.toLogical(schema, ((Integer) value).intValue());
}
});
LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new LogicalTypeConverter() {
@Override
public Object toJson(final Schema schema, final Object value, final JsonConverterConfig config) {
if (!(value instanceof java.util.Date)) {
throw new ConnectException("Invalid type for Timestamp, expected Date but was " + value.getClass());
}
return Timestamp.fromLogical(schema, (java.util.Date) value);
}
@Override
public Object toConnect(final Schema schema, final Object value) {
if (value instanceof Number) {
return Timestamp.toLogical(schema, Long.valueOf(value.toString()));
}
throw new ConnectException("Invalid type for Timestamp, underlying representation should be integral but was " + value);
}
});
}