in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs [46:65]
public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext context)
{
var parameter = context.Parameter;
var attribute = parameter.GetCustomAttribute<KafkaTriggerAttribute>(inherit: false);
if (attribute == null)
{
return Task.FromResult<ITriggerBinding>(null);
}
var keyAndValueTypes = SerializationHelper.GetKeyAndValueTypes(attribute.AvroSchema, attribute.KeyAvroSchema, parameter.ParameterType, attribute.KeyDataType.GetDataType());
var schemaRegistryUrl = this.config.ResolveSecureSetting(nameResolver, attribute.SchemaRegistryUrl);
var schemaRegistryUsername = this.config.ResolveSecureSetting(nameResolver, attribute.SchemaRegistryUsername);
var schemaRegistryPassword = this.config.ResolveSecureSetting(nameResolver, attribute.SchemaRegistryPassword);
var valueDeserializer = SerializationHelper.ResolveDeserializer(keyAndValueTypes.ValueType, keyAndValueTypes.ValueAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword);
var keyDeserializer = SerializationHelper.ResolveDeserializer(keyAndValueTypes.KeyType, keyAndValueTypes.KeyAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword);
var consumerConfig = CreateConsumerConfiguration(attribute);
var binding = CreateBindingStrategyFor(keyAndValueTypes.KeyType ?? typeof(Ignore), keyAndValueTypes.ValueType, keyAndValueTypes.RequiresKey, valueDeserializer, keyDeserializer, parameter, consumerConfig);
return Task.FromResult<ITriggerBinding>(new KafkaTriggerBindingWrapper(binding));
}