public SchemaAndValue toConnectData()

in java/avro-converter/src/main/java/com/microsoft/azure/schemaregistry/kafka/connect/avro/AvroConverter.java [141:176]


    public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
        String contentTypeString = "";
        String schemaId = "";

        try {
            MessageWithMetadata message = new MessageWithMetadata();
            message.setBodyAsBinaryData(BinaryData.fromBytes(value));

            Header contentTypeHeader = headers.lastHeader("content-type");
            if (contentTypeHeader != null) {
                contentTypeString = new String(contentTypeHeader.value());
                message.setContentType(contentTypeString);
            }

            Object deserializedMessage = deserializer.deserializeMessageData(message,
                TypeReference.createInstance(this.avroConverterConfig.getAvroSpecificType()));


            String[] splitSchemaId = contentTypeString.split("\\+");
            if (splitSchemaId.length < 2) {
                throw new DataException("Failed to prase schema id " + splitSchemaId[0]);
            }
            schemaId = splitSchemaId[1];

            SchemaRegistrySchema srSchema = schemaRegistryClient.getSchema(schemaId).block();

            // Convert Avro object to Connect SchemaAndValue

            AvroConverterUtils utils = new AvroConverterUtils();
            return utils.toConnectData(new Parser().parse(srSchema.getDefinition()), deserializedMessage);
        } catch (SchemaRegistryApacheAvroException e) {
            throw new DataException("Failed to deserialize Avro data: ", e);
        } catch (Exception e) {
            throw e;
        }
    }