private void convertStructSchema()

in connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java [191:339]


    private void convertStructSchema(org.apache.kafka.connect.data.SchemaBuilder schemaBuilder, io.openmessaging.connector.api.data.Schema originalSchema) {
        for (Field field : originalSchema.getFields()) {
            try {

                // schema
                Schema schema = field.getSchema();
                String schemaName = convertSchemaName(field.getSchema().getName());

                // field name
                String fieldName = field.getName();
                FieldType type = schema.getFieldType();

                Map<String, String> parameters = schema.getParameters() == null ? new HashMap<>() : schema.getParameters();

                switch (type) {
                    case INT8:
                        schemaBuilder.field(
                                fieldName,
                                SchemaBuilder
                                        .int8()
                                        .name(schemaName)
                                        .doc(schema.getDoc())
                                        .defaultValue(schema.getDefaultValue())
                                        .parameters(parameters)
                                        .optional()
                                        .build()
                        );
                        break;
                    case INT16:
                        schemaBuilder.field(
                                fieldName,
                                SchemaBuilder
                                        .int16()
                                        .name(schemaName)
                                        .doc(schema.getDoc())
                                        .defaultValue(schema.getDefaultValue())
                                        .parameters(parameters)
                                        .optional()
                                        .build()
                        );
                        break;
                    case INT32:
                        schemaBuilder.field(
                                fieldName,
                                SchemaBuilder
                                        .int32()
                                        .name(schemaName)
                                        .doc(schema.getDoc())
                                        .defaultValue(schema.getDefaultValue())
                                        .parameters(parameters)
                                        .optional()
                                        .build()
                        );
                        break;
                    case INT64:
                        schemaBuilder.field(
                                fieldName,
                                SchemaBuilder
                                        .int64()
                                        .name(schemaName)
                                        .doc(schema.getDoc())
                                        .defaultValue(schema.getDefaultValue())
                                        .parameters(parameters)
                                        .optional()
                                        .build()
                        );
                        break;
                    case FLOAT32:
                        schemaBuilder.field(
                                fieldName,
                                SchemaBuilder
                                        .float32()
                                        .name(schemaName)
                                        .doc(schema.getDoc())
                                        .defaultValue(schema.getDefaultValue())
                                        .parameters(parameters)
                                        .optional()
                                        .build()
                        );
                        break;
                    case FLOAT64:
                        schemaBuilder.field(
                                fieldName,
                                SchemaBuilder
                                        .float64()
                                        .name(schemaName)
                                        .doc(schema.getDoc())
                                        .defaultValue(schema.getDefaultValue())
                                        .parameters(parameters)
                                        .optional()
                                        .build()
                        );
                        break;
                    case BOOLEAN:
                        schemaBuilder.field(
                                fieldName,
                                SchemaBuilder
                                        .bool()
                                        .name(schemaName)
                                        .doc(schema.getDoc())
                                        .defaultValue(schema.getDefaultValue())
                                        .parameters(parameters)
                                        .optional()
                                        .build()
                        );
                        break;
                    case STRING:
                        schemaBuilder.field(
                                fieldName,
                                SchemaBuilder
                                        .string()
                                        .name(schemaName)
                                        .doc(schema.getDoc())
                                        .defaultValue(schema.getDefaultValue())
                                        .parameters(parameters)
                                        .optional()
                                        .build()
                        );
                        break;
                    case BYTES:
                        schemaBuilder.field(
                                fieldName,
                                SchemaBuilder
                                        .bytes()
                                        .name(schemaName)
                                        .doc(schema.getDoc())
                                        .defaultValue(schema.getDefaultValue())
                                        .parameters(parameters)
                                        .optional()
                                        .build()
                        );
                        break;
                    case STRUCT:
                    case ARRAY:
                    case MAP:
                        schemaBuilder.field(
                                fieldName,
                                convertKafkaSchema(field.getSchema()).build()
                        );
                        break;
                    default:
                        break;
                }
            } catch (Exception ex) {
                logger.error("Convert schema failure! ex {}", ex);
                throw new ConnectException(ex);
            }
        }
    }