in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerAsyncCollector.cs [125:145]
private static object BuildKafkaEventDataForKeyValue(JObject dataObj)
{
string value = null;
if (dataObj["Value"] != null && dataObj["Value"].Type.ToString().Equals("Object"))
{
value = Newtonsoft.Json.JsonConvert.SerializeObject(dataObj["Value"]);
}
else
{
value = (string)dataObj["Value"];
}
KafkaEventData<string, string> messageToSend = new KafkaEventData<string, string>((string)dataObj["Key"], value);
messageToSend.Timestamp = (DateTime)dataObj["Timestamp"];
messageToSend.Partition = (int)dataObj["Partition"];
JArray headerList = (JArray)dataObj["Headers"];
foreach (JObject header in headerList)
{
messageToSend.Headers.Add((string)header["Key"], Encoding.UTF8.GetBytes((string)header["Value"]));
}
return messageToSend;
}