in kinesis/kinesis.go [579:605]
func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interface{}) (string, bool) {
partitionKey := outputPlugin.partitionKey
if partitionKey != "" {
partitionKeys := strings.Split(partitionKey, "->")
num := len(partitionKeys)
for count, dataKey := range partitionKeys {
newRecord := getFromMap(dataKey, record)
if count == num-1 {
value := stringOrByteArray(newRecord)
if value != "" {
if len(value) > partitionKeyMaxLength {
value = value[0:partitionKeyMaxLength]
}
return value, true
}
}
_, ok := newRecord.(map[interface{}]interface{})
if ok {
record = newRecord.(map[interface{}]interface{})
} else {
logrus.Errorf("[kinesis %d] The partition key could not be found in the record, using a random string instead", outputPlugin.PluginID)
return "", false
}
}
}
return "", false
}