public SchemaAndValue toConnectData()

in jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaConverter.java [158:207]


    public SchemaAndValue toConnectData(String topic,
                                        byte[] value) {
        Object deserialized;

        try {
            deserialized = deserializer.deserialize(topic, value);
        } catch (SerializationException | AWSSchemaRegistryException e) {
            throw new DataException("Converting byte[] to Kafka Connect data failed due to serialization error: ", e);
        }

        if (deserialized == null) {
            return SchemaAndValue.NULL;
        }

        String jsonSchemaString = deserializer.getGlueSchemaRegistryDeserializationFacade()
                .getSchemaDefinition(value);

        org.everit.json.schema.Schema jsonSchema = null;

        try {
            JSONObject jsonSchemaObject = new JSONObject(jsonSchemaString);
            jsonSchema = SchemaLoader.builder()
                    .schemaJson(jsonSchemaObject)
                    .build()
                    .load()
                    .build();
        } catch (Exception e) {
            throw new DataException("Failed to read JSON Schema : " + jsonSchemaString, e);
        }

        JsonNode jsonNode = null;

        if (deserialized instanceof JsonDataWithSchema) {
            JsonDataWithSchema envelope = (JsonDataWithSchema) deserialized;
            String payload = envelope.getPayload();
            try {
                jsonNode = objectMapper.readTree(payload);
            } catch (IOException e) {
                throw new DataException("Failed to read JSON Payload : " + payload, e);
            }
        } else {
            throw new DataException("JSON Deserialized data is not in envelope format.");
        }

        Schema connectSchema = jsonSchemaToConnectSchemaConverter.toConnectSchema(jsonSchema);
        Object connectValue = jsonNodeToConnectValueConverter.toConnectValue(connectSchema, jsonNode);

        SchemaAndValue schemaAndValue = new SchemaAndValue(connectSchema, connectValue);
        return schemaAndValue;
    }