in core/src/main/scala/org/apache/spark/eventhubs/EventHubsUtils.scala [54:72]
private def createRpcEndpoint() = {
if (partitionPerformanceReceiverRef == null) {
try {
partitionPerformanceReceiverRef = RpcUtils.makeDriverRef(
PartitionPerformanceReceiver.ENDPOINT_NAME,
SparkEnv.get.conf,
SparkEnv.get.rpcEnv)
logInfo(
s"There is an existing partitionPerformanceReceiverRef on the driver, use that one rather than creating a new one")
} catch {
case e: Exception =>
val partitionsStatusTracker = PartitionsStatusTracker.getPartitionStatusTracker
val partitionPerformanceReceiver: PartitionPerformanceReceiver =
new PartitionPerformanceReceiver(SparkEnv.get.rpcEnv, partitionsStatusTracker)
partitionPerformanceReceiverRef = SparkEnv.get.rpcEnv
.setupEndpoint(PartitionPerformanceReceiver.ENDPOINT_NAME, partitionPerformanceReceiver)
}
}
}