private def createReceiver()

in core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala [102:121]


  private def createReceiver(seqNo: SequenceNumber): PartitionReceiver = {
    val taskId = EventHubsUtils.getTaskId
    logInfo(
      s"(TID $taskId) creating receiver for namespaceUri: $namespaceUri EventHubNameAndPartition: $nAndP " +
        s"consumer group: $consumerGroup. seqNo: $seqNo")
    val receiverOptions = new ReceiverOptions
    receiverOptions.setReceiverRuntimeMetricEnabled(true)
    receiverOptions.setPrefetchCount(ehConf.prefetchCount.getOrElse(DefaultPrefetchCount))
    receiverOptions.setIdentifier(s"spark-${SparkEnv.get.executorId}-$taskId")
    val consumer = retryJava(
      EventHubsUtils.createReceiverInner(client,
                                         ehConf.useExclusiveReceiver,
                                         consumerGroup,
                                         nAndP.partitionId.toString,
                                         EventPosition.fromSequenceNumber(seqNo).convert,
                                         receiverOptions),
      "CachedReceiver creation."
    )
    Await.result(consumer, ehConf.internalOperationTimeout)
  }