protected SchemaAndValue process()

in transforms/src/main/java/org/apache/rocketmq/connect/transforms/BaseTransformation.java [111:174]


    protected SchemaAndValue process(R record, Schema inputSchema, Object input) {
        final SchemaAndValue result;
        if (null == inputSchema && null == input) {
            return new SchemaAndValue(
                    null,
                    null
            );
        }
        if (input instanceof Map) {
            log.trace("process() - Processing as map");
            result = processMap(record, (Map<String, Object>) input);
            return result;
        }

        if (null == inputSchema) {
            log.trace("process() - Determining schema");
            inputSchema = SchemaHelper.schema(input);
        }

        String schemaName = inputSchema.getName();
        FieldType schemaType =  inputSchema.getFieldType();
        log.trace("process() - Input has as schema. schema = {}", inputSchema);
        if (FieldType.STRUCT == schemaType) {
            result = processStruct(record, inputSchema, (Struct) input);
        } else if (Timestamp.LOGICAL_NAME.equals(schemaName)) {
            result = processTimestamp(record, inputSchema, (Date) input);
        } else if (io.openmessaging.connector.api.data.logical.Date.LOGICAL_NAME.equals(schemaName)) {
            result = processDate(record, inputSchema, (Date) input);
        } else if (Time.LOGICAL_NAME.equals(schemaName)) {
            result = processTime(record, inputSchema, (Date) input);
        } else if (Decimal.LOGICAL_NAME.equals(schemaName)) {
            result = processDecimal(record, inputSchema, (BigDecimal) input);
        } else if (FieldType.STRING == schemaType) {
            result = processString(record, inputSchema, (String) input);
        } else if (FieldType.BYTES == schemaType) {
            result = processBytes(record, inputSchema, (byte[]) input);
        } else if (FieldType.INT8 == schemaType) {
            result = processInt8(record, inputSchema, (byte) input);
        } else if (FieldType.INT16 == schemaType) {
            result = processInt16(record, inputSchema, (short) input);
        } else if (FieldType.INT32 == schemaType) {
            result = processInt32(record, inputSchema, (int) input);
        } else if (FieldType.INT64 == schemaType) {
            result = processInt64(record, inputSchema, (long) input);
        } else if (FieldType.FLOAT32 == schemaType) {
            result = processFloat32(record, inputSchema, (float) input);
        } else if (FieldType.FLOAT64 == schemaType) {
            result = processFloat64(record, inputSchema, (double) input);
        } else if (FieldType.ARRAY == schemaType) {
            result = processArray(record, inputSchema, (List<Object>) input);
        } else if (FieldType.MAP == schemaType) {
            result = processMap(record, inputSchema, (Map<Object, Object>) input);
        } else {
            throw new UnsupportedOperationException(
                    String.format(
                            "Schema is not supported. type='%s' name='%s'",
                            schemaType,
                            schemaName
                    )
            );
        }

        return result;
    }