in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs [73:94]
private ITriggerBinding CreateBindingStrategy<TKey, TValue>(ParameterInfo parameter, KafkaListenerConfiguration listenerConfiguration, bool requiresKey, IDeserializer<TValue> valueDeserializer, IDeserializer<TKey> keySerializer)
{
// TODO: reuse connections if they match with others in same function app
Task<IListener> listenerCreator(ListenerFactoryContext factoryContext, bool singleDispatch)
{
var listener = new KafkaListener<TKey, TValue>(
factoryContext.Executor,
singleDispatch,
this.options.Value,
listenerConfiguration,
requiresKey,
valueDeserializer,
keySerializer,
this.logger,
factoryContext.Descriptor.Id,
drainModeManager);
return Task.FromResult<IListener>(listener);
}
return BindingFactory.GetTriggerBinding(new KafkaTriggerBindingStrategy<TKey, TValue>(), parameter, new KafkaEventDataConvertManager(this.converterManager, this.logger), listenerCreator);
}