in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs [76:99]
public async Task ProduceAsync(string topic, object item)
{
ValidateItem(item);
IKafkaEventData actualItem = GetItem(item);
Message<TKey, TValue> msg = BuildMessage(item, actualItem);
string topicUsed = FindTopic(topic, actualItem);
try
{
var deliveryResult = await this.producer.ProduceAsync(topicUsed, msg);
this.logger.LogDebug("Message delivered on {topic} / {partition} / {offset}", deliveryResult.Topic, (int)deliveryResult.Partition, (long)deliveryResult.Offset);
}
catch (ProduceException<TKey, TValue> produceException)
{
logger.LogError("Failed to delivery message to {topic} / {partition} / {offset}. Reason: {reason}. Full Error: {error}", produceException.DeliveryResult?.Topic, (int)produceException.DeliveryResult?.Partition, (long)produceException.DeliveryResult?.Offset, produceException.Error.Reason, produceException.Error.ToString());
throw;
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error producing into {topic}", topicUsed);
throw;
}
}