in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs [51:68]
internal static object ResolveValueSerializer(Type valueType, string specifiedAvroSchema, string schemaRegistryUrl, string schemaRegistryUsername, string schemaRegistryPassword)
{
if (typeof(IMessage).IsAssignableFrom(valueType))
{
return Activator.CreateInstance(typeof(ProtobufSerializer<>).MakeGenericType(valueType));
}
var isSpecificRecord = typeof(ISpecificRecord).IsAssignableFrom(valueType);
if (!isSpecificRecord && !typeof(GenericRecord).IsAssignableFrom(valueType) && schemaRegistryUrl == null)
{
return null;
}
var schemaRegistry = CreateSchemaRegistry(valueType, specifiedAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword, isSpecificRecord);
var serializer = Activator.CreateInstance(typeof(AvroSerializer<>).MakeGenericType(valueType), schemaRegistry, null /* config */);
return typeof(SyncOverAsyncSerializerExtensionMethods).GetMethod("AsSyncOverAsync").MakeGenericMethod(valueType).Invoke(null, new object[] { serializer });
}