in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaMessageBuilder.cs [16:61]
public Message<TKey, TValue> BuildFrom(IKafkaEventData eventData)
{
var msg = new Message<TKey, TValue>()
{
Value = (TValue)eventData.Value,
};
if (eventData.Key != null)
{
try
{
if (eventData.Key is TKey keyValue)
{
msg.Key = keyValue;
}
else
{
// this case is possible in out of proc when KafkaMessageKeyType is set specifically
msg.Key = typeof(TKey) switch
{
// byte[], string are added as data types supported by KafkaMessageKeyType enum
Type t when t == typeof(byte[]) => (TKey)(object)System.Text.Encoding.UTF8.GetBytes(eventData.Key.ToString()),
Type t when t == typeof(string) => (TKey)(object)eventData.Key.ToString(),
Type t when t == typeof(int) => (TKey)(object)Convert.ToInt32(eventData.Key),
Type t when t == typeof(long) => (TKey)(object)Convert.ToInt64(eventData.Key),
_ => (TKey)eventData.Key
};
}
}
catch
{
throw new ArgumentException($"Could not cast actual key value to the expected type. Expected: {typeof(TKey).Name}. Actual: {eventData.Key.GetType().Name}");
}
}
if (eventData.Headers?.Count > 0)
{
msg.Headers = new Headers();
foreach (var header in eventData.Headers)
{
msg.Headers.Add(header.Key, header.Value);
}
}
return msg;
}