private[eventhubs] override def receive()

in core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala [374:418]


  private[eventhubs] override def receive(ehConf: EventHubsConf,
                                          nAndP: NameAndPartition,
                                          requestSeqNo: SequenceNumber,
                                          batchSize: Int): Iterator[EventData] = {
    val taskId = EventHubsUtils.getTaskId

    logInfo(s"(TID $taskId) EventHubsCachedReceiver look up. For namespaceUri ${ehConf.namespaceUri} " +
      s"EventHubNameAndPartition $nAndP consumer group ${ehConf.consumerGroup.getOrElse(DefaultConsumerGroup)}. " +
      s"requestSeqNo: $requestSeqNo, batchSize: $batchSize")
    var receiver: CachedEventHubsReceiver = null
    receivers.synchronized {
      receiver = receivers.getOrElseUpdate(key(ehConf, nAndP), {
        CachedEventHubsReceiver(ehConf, nAndP, requestSeqNo)
      })
    }
    try {
      receiver.receive(requestSeqNo, batchSize)
    } catch {
      case completionExecution: CompletionException =>
        val exceptionCause = completionExecution.getCause
        if (exceptionCause != null &&  exceptionCause.isInstanceOf[RejectedExecutionException] && exceptionCause.getMessage.contains("ReactorDispatcher instance is closed")) {
          // reactor dispatcher closed case
          logInfo(s"(TID $taskId) EventHubsCachedReceiver receive execution for namespaceUri ${ehConf.namespaceUri} " +
            s"EventHubNameAndPartition $nAndP consumer group ${ehConf.consumerGroup.getOrElse(DefaultConsumerGroup)} " +
            s"failed with $completionExecution. Try to recreate the entire CachedEventHubsReceiver instance in order to " +
            s"use a fresh EventHubClient from the underlying java SDK, then try receiving events again.")
          receiver.client.close();
          receiver = CachedEventHubsReceiver(ehConf, nAndP, requestSeqNo)
          receivers.synchronized {
            receivers.update(key(ehConf, nAndP), receiver)
          }
          receiver.receive(requestSeqNo, batchSize)
        } else if (exceptionCause != null &&  exceptionCause.isInstanceOf[ReceiverDisconnectedException]) {
          logInfo(s"(TID $taskId) EventHubsCachedReceiver receive execution for namespaceUri ${ehConf.namespaceUri} " +
            s"EventHubNameAndPartition $nAndP consumer group ${ehConf.consumerGroup.getOrElse(DefaultConsumerGroup)} " +
            s"failed because another receiver for the same <NS-EH-CG-Part> combo has been created and caused this one " +
            s"to get disconnected. The full error is: $completionExecution. Throw the exception so that the driver can " +
            s"retry the task.")
          throw completionExecution
        } else {
          throw completionExecution
        }

    }
  }