in connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java [191:339]
private void convertStructSchema(org.apache.kafka.connect.data.SchemaBuilder schemaBuilder, io.openmessaging.connector.api.data.Schema originalSchema) {
for (Field field : originalSchema.getFields()) {
try {
// schema
Schema schema = field.getSchema();
String schemaName = convertSchemaName(field.getSchema().getName());
// field name
String fieldName = field.getName();
FieldType type = schema.getFieldType();
Map<String, String> parameters = schema.getParameters() == null ? new HashMap<>() : schema.getParameters();
switch (type) {
case INT8:
schemaBuilder.field(
fieldName,
SchemaBuilder
.int8()
.name(schemaName)
.doc(schema.getDoc())
.defaultValue(schema.getDefaultValue())
.parameters(parameters)
.optional()
.build()
);
break;
case INT16:
schemaBuilder.field(
fieldName,
SchemaBuilder
.int16()
.name(schemaName)
.doc(schema.getDoc())
.defaultValue(schema.getDefaultValue())
.parameters(parameters)
.optional()
.build()
);
break;
case INT32:
schemaBuilder.field(
fieldName,
SchemaBuilder
.int32()
.name(schemaName)
.doc(schema.getDoc())
.defaultValue(schema.getDefaultValue())
.parameters(parameters)
.optional()
.build()
);
break;
case INT64:
schemaBuilder.field(
fieldName,
SchemaBuilder
.int64()
.name(schemaName)
.doc(schema.getDoc())
.defaultValue(schema.getDefaultValue())
.parameters(parameters)
.optional()
.build()
);
break;
case FLOAT32:
schemaBuilder.field(
fieldName,
SchemaBuilder
.float32()
.name(schemaName)
.doc(schema.getDoc())
.defaultValue(schema.getDefaultValue())
.parameters(parameters)
.optional()
.build()
);
break;
case FLOAT64:
schemaBuilder.field(
fieldName,
SchemaBuilder
.float64()
.name(schemaName)
.doc(schema.getDoc())
.defaultValue(schema.getDefaultValue())
.parameters(parameters)
.optional()
.build()
);
break;
case BOOLEAN:
schemaBuilder.field(
fieldName,
SchemaBuilder
.bool()
.name(schemaName)
.doc(schema.getDoc())
.defaultValue(schema.getDefaultValue())
.parameters(parameters)
.optional()
.build()
);
break;
case STRING:
schemaBuilder.field(
fieldName,
SchemaBuilder
.string()
.name(schemaName)
.doc(schema.getDoc())
.defaultValue(schema.getDefaultValue())
.parameters(parameters)
.optional()
.build()
);
break;
case BYTES:
schemaBuilder.field(
fieldName,
SchemaBuilder
.bytes()
.name(schemaName)
.doc(schema.getDoc())
.defaultValue(schema.getDefaultValue())
.parameters(parameters)
.optional()
.build()
);
break;
case STRUCT:
case ARRAY:
case MAP:
schemaBuilder.field(
fieldName,
convertKafkaSchema(field.getSchema()).build()
);
break;
default:
break;
}
} catch (Exception ex) {
logger.error("Convert schema failure! ex {}", ex);
throw new ConnectException(ex);
}
}
}