in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs [70:92]
private static ISchemaRegistryClient CreateSchemaRegistry(Type valueType, string specifiedAvroSchema, string schemaRegistryUrl, string schemaRegistryUsername, string schemaRegistryPassword, bool isSpecificRecord)
{
if (string.IsNullOrWhiteSpace(specifiedAvroSchema) && isSpecificRecord)
{
specifiedAvroSchema = ((ISpecificRecord)Activator.CreateInstance(valueType)).Schema.ToString();
}
if (!string.IsNullOrWhiteSpace(specifiedAvroSchema))
{
return new LocalSchemaRegistry(specifiedAvroSchema);
}
if (schemaRegistryUrl != null)
{
var schemaRegistryConfig = new List<KeyValuePair<string, string>>();
schemaRegistryConfig.Add(new KeyValuePair<string, string>("schema.registry.url", schemaRegistryUrl));
if (schemaRegistryUsername != null && schemaRegistryPassword != null) {
var authString = schemaRegistryUsername + ":" + schemaRegistryPassword;
schemaRegistryConfig.Add(new KeyValuePair<string, string>("schema.registry.basic.auth.user.info", authString));
}
return new CachedSchemaRegistryClient(schemaRegistryConfig.ToArray());
}
throw new ArgumentNullException(nameof(specifiedAvroSchema), $@"parameter is required when creating an generic avro serializer");
}