in core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala [102:121]
private def createReceiver(seqNo: SequenceNumber): PartitionReceiver = {
val taskId = EventHubsUtils.getTaskId
logInfo(
s"(TID $taskId) creating receiver for namespaceUri: $namespaceUri EventHubNameAndPartition: $nAndP " +
s"consumer group: $consumerGroup. seqNo: $seqNo")
val receiverOptions = new ReceiverOptions
receiverOptions.setReceiverRuntimeMetricEnabled(true)
receiverOptions.setPrefetchCount(ehConf.prefetchCount.getOrElse(DefaultPrefetchCount))
receiverOptions.setIdentifier(s"spark-${SparkEnv.get.executorId}-$taskId")
val consumer = retryJava(
EventHubsUtils.createReceiverInner(client,
ehConf.useExclusiveReceiver,
consumerGroup,
nAndP.partitionId.toString,
EventPosition.fromSequenceNumber(seqNo).convert,
receiverOptions),
"CachedReceiver creation."
)
Await.result(consumer, ehConf.internalOperationTimeout)
}