in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs [29:74]
public KafkaProducer(
Handle producerHandle,
object valueSerializer,
object keySerializer,
ILogger logger)
{
this.ValueSerializer = valueSerializer;
this.KeySerializer = keySerializer;
this.logger = logger;
this.MessageBuilder = new KafkaMessageBuilder<TKey, TValue>();
var builder = new DependentProducerBuilder<TKey, TValue>(producerHandle);
if (valueSerializer != null)
{
if (valueSerializer is IAsyncSerializer<TValue> asyncSerializer)
{
builder.SetValueSerializer(asyncSerializer);
}
else if (valueSerializer is ISerializer<TValue> syncSerializer)
{
builder.SetValueSerializer(syncSerializer);
}
else
{
throw new ArgumentException($"Value serializer must implement either IAsyncSerializer or ISerializer. Type {valueSerializer.GetType().Name} does not", nameof(valueSerializer));
}
}
if (keySerializer != null)
{
if (keySerializer is IAsyncSerializer<TKey> asyncSerializer)
{
builder.SetKeySerializer(asyncSerializer);
}
else if (keySerializer is ISerializer<TKey> syncSerializer)
{
builder.SetKeySerializer(syncSerializer);
}
else
{
throw new ArgumentException($"Key serializer must implement either IAsyncSerializer or ISerializer. Type {keySerializer.GetType().Name} does not", nameof(keySerializer));
}
}
this.producer = builder.Build();
}