public void Produce()

in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs [101:131]


        public void Produce(string topic, object item)
        {
            ValidateItem(item);
            IKafkaEventData actualItem = GetItem(item);
            Message<TKey, TValue> msg = BuildMessage(item, actualItem);
            string topicUsed = FindTopic(topic, actualItem);

            try
            {
                logger.LogInformation("in Produce method");
                this.producer.Produce(topicUsed, msg, 
                    deliveryResult => {
                        if (deliveryResult.Error.Code != ErrorCode.NoError)
                        {
                            this.logger.LogError("msg failed to deliver on topic :: ", topicUsed + " error :: " + deliveryResult.Error.ToString());
                            return;
                        }
                        this.logger.LogDebug("Message delivered on {topic} / {partition} / {offset}", deliveryResult.Topic, (int)deliveryResult.Partition, (long)deliveryResult.Offset);
                    });
            }
            catch (ProduceException<TKey, TValue> produceException)
            {
                logger.LogError("Failed to delivery message to {topic} / {partition} / {offset}. Reason: {reason}. Full Error: {error}", produceException.DeliveryResult?.Topic, (int)produceException.DeliveryResult?.Partition, (long)produceException.DeliveryResult?.Offset, produceException.Error.Reason, produceException.Error.ToString());
                throw;
            }
            catch (Exception ex)
            {
                this.logger.LogError(ex, "Error producing into {topic}", topicUsed);
                throw;
            }
        }