in connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java [185:327]
private void convertStructSchema(io.openmessaging.connector.api.data.SchemaBuilder schemaBuilder, org.apache.kafka.connect.data.Schema originalSchema) {
for (Field field : originalSchema.fields()) {
try {
Schema schema = field.schema();
org.apache.kafka.connect.data.Schema.Type type = schema.type();
String schemaName = convertSchemaName(schema.name());
Map<String, String> parameters = schema.parameters() == null ? new HashMap<>() : schema.parameters();
switch (type) {
case INT8:
schemaBuilder.field(
field.name(),
SchemaBuilder
.int8()
.name(schemaName)
.doc(schema.doc())
.defaultValue(schema.defaultValue())
.parameters(parameters)
.optional()
.build()
);
break;
case INT16:
schemaBuilder.field(
field.name(),
SchemaBuilder
.int16()
.name(schemaName)
.doc(schema.doc())
.defaultValue(schema.defaultValue())
.parameters(parameters)
.optional()
.build()
);
break;
case INT32:
schemaBuilder.field(
field.name(),
SchemaBuilder
.int32()
.name(schemaName)
.doc(schema.doc())
.defaultValue(schema.defaultValue())
.parameters(parameters)
.optional()
.build()
);
break;
case INT64:
schemaBuilder.field(
field.name(),
SchemaBuilder
.int64()
.name(schemaName)
.doc(schema.doc())
.defaultValue(schema.defaultValue())
.parameters(parameters)
.optional()
.build()
);
break;
case FLOAT32:
schemaBuilder.field(
field.name(),
SchemaBuilder
.float32()
.name(schemaName)
.doc(schema.doc())
.defaultValue(schema.defaultValue())
.parameters(parameters)
.optional()
.build()
);
break;
case FLOAT64:
schemaBuilder.field(
field.name(),
SchemaBuilder
.float64()
.name(schemaName)
.doc(schema.doc())
.defaultValue(schema.defaultValue())
.parameters(parameters)
.optional()
.build()
);
break;
case BOOLEAN:
schemaBuilder.field(
field.name(),
SchemaBuilder
.bool()
.name(schemaName)
.doc(schema.doc())
.defaultValue(schema.defaultValue())
.parameters(parameters)
.optional()
.build()
);
break;
case STRING:
schemaBuilder.field(
field.name(),
SchemaBuilder
.string()
.name(schemaName)
.doc(schema.doc())
.defaultValue(schema.defaultValue())
.parameters(parameters)
.optional()
.build()
);
break;
case BYTES:
schemaBuilder.field(
field.name(),
SchemaBuilder
.bytes()
.name(schemaName)
.doc(schema.doc())
.defaultValue(schema.defaultValue())
.parameters(parameters)
.optional()
.build()
);
break;
case STRUCT:
case ARRAY:
case MAP:
schemaBuilder.field(
field.name(),
convertKafkaSchema(field.schema()).build()
);
break;
default:
break;
}
} catch (Exception ex) {
logger.error("Convert schema failure! ex {}", ex);
throw new ConnectException(ex);
}
}
}