in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs [101:131]
public void Produce(string topic, object item)
{
ValidateItem(item);
IKafkaEventData actualItem = GetItem(item);
Message<TKey, TValue> msg = BuildMessage(item, actualItem);
string topicUsed = FindTopic(topic, actualItem);
try
{
logger.LogInformation("in Produce method");
this.producer.Produce(topicUsed, msg,
deliveryResult => {
if (deliveryResult.Error.Code != ErrorCode.NoError)
{
this.logger.LogError("msg failed to deliver on topic :: ", topicUsed + " error :: " + deliveryResult.Error.ToString());
return;
}
this.logger.LogDebug("Message delivered on {topic} / {partition} / {offset}", deliveryResult.Topic, (int)deliveryResult.Partition, (long)deliveryResult.Offset);
});
}
catch (ProduceException<TKey, TValue> produceException)
{
logger.LogError("Failed to delivery message to {topic} / {partition} / {offset}. Reason: {reason}. Full Error: {error}", produceException.DeliveryResult?.Topic, (int)produceException.DeliveryResult?.Partition, (long)produceException.DeliveryResult?.Offset, produceException.Error.Reason, produceException.Error.ToString());
throw;
}
catch (Exception ex)
{
this.logger.LogError(ex, "Error producing into {topic}", topicUsed);
throw;
}
}