in core/src/main/scala/org/apache/spark/eventhubs/client/EventHubsClient.scala [84:106]
override def send(event: EventData,
partition: Option[Rate] = None,
partitionKey: Option[String] = None,
properties: Option[Map[String, String]] = None): Unit = {
if (properties.isDefined) {
val p = event.getProperties
p.putAll(properties.get.asJava)
}
val sendTask = if (partition.isDefined) {
if ((partitionSender == null) || (partitionSender.getPartitionId.toInt != partition.get)) {
logInfo("Recreating partition sender.")
createPartitionSender(partition.get)
}
partitionSender.send(event)
} else if (partitionKey.isDefined) {
client.send(event, partitionKey.get)
} else {
client.send(event)
}
pendingWorks += retryJava(sendTask, "send", 1)
}