in transforms/src/main/java/org/apache/rocketmq/connect/transforms/BaseTransformation.java [111:174]
protected SchemaAndValue process(R record, Schema inputSchema, Object input) {
final SchemaAndValue result;
if (null == inputSchema && null == input) {
return new SchemaAndValue(
null,
null
);
}
if (input instanceof Map) {
log.trace("process() - Processing as map");
result = processMap(record, (Map<String, Object>) input);
return result;
}
if (null == inputSchema) {
log.trace("process() - Determining schema");
inputSchema = SchemaHelper.schema(input);
}
String schemaName = inputSchema.getName();
FieldType schemaType = inputSchema.getFieldType();
log.trace("process() - Input has as schema. schema = {}", inputSchema);
if (FieldType.STRUCT == schemaType) {
result = processStruct(record, inputSchema, (Struct) input);
} else if (Timestamp.LOGICAL_NAME.equals(schemaName)) {
result = processTimestamp(record, inputSchema, (Date) input);
} else if (io.openmessaging.connector.api.data.logical.Date.LOGICAL_NAME.equals(schemaName)) {
result = processDate(record, inputSchema, (Date) input);
} else if (Time.LOGICAL_NAME.equals(schemaName)) {
result = processTime(record, inputSchema, (Date) input);
} else if (Decimal.LOGICAL_NAME.equals(schemaName)) {
result = processDecimal(record, inputSchema, (BigDecimal) input);
} else if (FieldType.STRING == schemaType) {
result = processString(record, inputSchema, (String) input);
} else if (FieldType.BYTES == schemaType) {
result = processBytes(record, inputSchema, (byte[]) input);
} else if (FieldType.INT8 == schemaType) {
result = processInt8(record, inputSchema, (byte) input);
} else if (FieldType.INT16 == schemaType) {
result = processInt16(record, inputSchema, (short) input);
} else if (FieldType.INT32 == schemaType) {
result = processInt32(record, inputSchema, (int) input);
} else if (FieldType.INT64 == schemaType) {
result = processInt64(record, inputSchema, (long) input);
} else if (FieldType.FLOAT32 == schemaType) {
result = processFloat32(record, inputSchema, (float) input);
} else if (FieldType.FLOAT64 == schemaType) {
result = processFloat64(record, inputSchema, (double) input);
} else if (FieldType.ARRAY == schemaType) {
result = processArray(record, inputSchema, (List<Object>) input);
} else if (FieldType.MAP == schemaType) {
result = processMap(record, inputSchema, (Map<Object, Object>) input);
} else {
throw new UnsupportedOperationException(
String.format(
"Schema is not supported. type='%s' name='%s'",
schemaType,
schemaName
)
);
}
return result;
}