in connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java [449:560]
public Schema asConnectSchema(JsonNode jsonSchema) {
if (jsonSchema.isNull())
return null;
Schema cached = toConnectSchemaCache.get(jsonSchema);
if (cached != null)
return cached;
JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
if (schemaTypeNode == null || !schemaTypeNode.isTextual())
throw new DataException("Schema must contain 'type' field");
final SchemaBuilder builder;
switch (schemaTypeNode.textValue()) {
case JsonSchema.BOOLEAN_TYPE_NAME:
builder = SchemaBuilder.bool();
break;
case JsonSchema.INT8_TYPE_NAME:
builder = SchemaBuilder.int8();
break;
case JsonSchema.INT16_TYPE_NAME:
builder = SchemaBuilder.int16();
break;
case JsonSchema.INT32_TYPE_NAME:
builder = SchemaBuilder.int32();
break;
case JsonSchema.INT64_TYPE_NAME:
builder = SchemaBuilder.int64();
break;
case JsonSchema.FLOAT_TYPE_NAME:
builder = SchemaBuilder.float32();
break;
case JsonSchema.DOUBLE_TYPE_NAME:
builder = SchemaBuilder.float64();
break;
case JsonSchema.BYTES_TYPE_NAME:
builder = SchemaBuilder.bytes();
break;
case JsonSchema.STRING_TYPE_NAME:
builder = SchemaBuilder.string();
break;
case JsonSchema.ARRAY_TYPE_NAME:
JsonNode elemSchema = jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME);
if (elemSchema == null || elemSchema.isNull())
throw new DataException("Array schema did not specify the element type");
builder = SchemaBuilder.array(asConnectSchema(elemSchema));
break;
case JsonSchema.MAP_TYPE_NAME:
JsonNode keySchema = jsonSchema.get(JsonSchema.MAP_KEY_FIELD_NAME);
if (keySchema == null)
throw new DataException("Map schema did not specify the key type");
JsonNode valueSchema = jsonSchema.get(JsonSchema.MAP_VALUE_FIELD_NAME);
if (valueSchema == null)
throw new DataException("Map schema did not specify the value type");
builder = SchemaBuilder.map(asConnectSchema(keySchema), asConnectSchema(valueSchema));
break;
case JsonSchema.STRUCT_TYPE_NAME:
builder = SchemaBuilder.struct();
JsonNode fields = jsonSchema.get(JsonSchema.STRUCT_FIELDS_FIELD_NAME);
if (fields == null || !fields.isArray())
throw new DataException("Struct schema's \"fields\" argument is not an array.");
for (JsonNode field : fields) {
JsonNode jsonFieldName = field.get(JsonSchema.STRUCT_FIELD_NAME_FIELD_NAME);
if (jsonFieldName == null || !jsonFieldName.isTextual())
throw new DataException("Struct schema's field name not specified properly");
builder.field(jsonFieldName.asText(), asConnectSchema(field));
}
break;
default:
throw new DataException("Unknown schema type: " + schemaTypeNode.textValue());
}
JsonNode schemaOptionalNode = jsonSchema.get(JsonSchema.SCHEMA_OPTIONAL_FIELD_NAME);
if (schemaOptionalNode != null && schemaOptionalNode.isBoolean() && schemaOptionalNode.booleanValue())
builder.optional();
else
builder.required();
JsonNode schemaNameNode = jsonSchema.get(JsonSchema.SCHEMA_NAME_FIELD_NAME);
if (schemaNameNode != null && schemaNameNode.isTextual())
builder.name(schemaNameNode.textValue());
JsonNode schemaVersionNode = jsonSchema.get(JsonSchema.SCHEMA_VERSION_FIELD_NAME);
if (schemaVersionNode != null && schemaVersionNode.isIntegralNumber()) {
builder.version(schemaVersionNode.intValue());
}
JsonNode schemaDocNode = jsonSchema.get(JsonSchema.SCHEMA_DOC_FIELD_NAME);
if (schemaDocNode != null && schemaDocNode.isTextual())
builder.doc(schemaDocNode.textValue());
JsonNode schemaParamsNode = jsonSchema.get(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME);
if (schemaParamsNode != null && schemaParamsNode.isObject()) {
Iterator<Map.Entry<String, JsonNode>> paramsIt = schemaParamsNode.fields();
while (paramsIt.hasNext()) {
Map.Entry<String, JsonNode> entry = paramsIt.next();
JsonNode paramValue = entry.getValue();
if (!paramValue.isTextual())
throw new DataException("Schema parameters must have string values.");
builder.parameter(entry.getKey(), paramValue.textValue());
}
}
JsonNode schemaDefaultNode = jsonSchema.get(JsonSchema.SCHEMA_DEFAULT_FIELD_NAME);
if (schemaDefaultNode != null)
builder.defaultValue(convertToConnect(builder, schemaDefaultNode, config));
Schema result = builder.build();
toConnectSchemaCache.put(jsonSchema, result);
return result;
}