private static object BuildKafkaEventDataForKeyValue()

in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerAsyncCollector.cs [125:145]


            private static object BuildKafkaEventDataForKeyValue(JObject dataObj)
            {
                string value = null;
                if (dataObj["Value"] != null && dataObj["Value"].Type.ToString().Equals("Object"))
                {
                    value = Newtonsoft.Json.JsonConvert.SerializeObject(dataObj["Value"]);
                }
                else
                {
                    value = (string)dataObj["Value"];
                }
                KafkaEventData<string, string> messageToSend = new KafkaEventData<string, string>((string)dataObj["Key"], value);
                messageToSend.Timestamp = (DateTime)dataObj["Timestamp"];
                messageToSend.Partition = (int)dataObj["Partition"];
                JArray headerList = (JArray)dataObj["Headers"];
                foreach (JObject header in headerList)
                {
                    messageToSend.Headers.Add((string)header["Key"], Encoding.UTF8.GetBytes((string)header["Value"]));
                }
                return messageToSend;
            }