private def checkCursor()

in core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala [179:243]


  private def checkCursor(requestSeqNo: SequenceNumber): Future[Iterable[EventData]] = {
    val taskId = EventHubsUtils.getTaskId

    val lastReceivedSeqNo =
      Await.result(lastReceivedOffset(), ehConf.internalOperationTimeout)

    if ((lastReceivedSeqNo > -1 && lastReceivedSeqNo + 1 != requestSeqNo) ||
        !receiver.getIsOpen) {
      logInfo(s"(TID $taskId) checkCursor. Recreating a receiver for namespaceUri: $namespaceUri " +
        s"EventHubNameAndPartition: $nAndP consumer group: $consumerGroup. requestSeqNo: $requestSeqNo, " +
        s"lastReceivedSeqNo: $lastReceivedSeqNo, isOpen: ${receiver.getIsOpen}")

      recreateReceiver(requestSeqNo)
    }

    val event = awaitReceiveMessage(
      receiveOne(ehConf.receiverTimeout.getOrElse(DefaultReceiverTimeout), "checkCursor initial"),
      requestSeqNo)
    val receivedSeqNo = event.head.getSystemProperties.getSequenceNumber

    if (receivedSeqNo != requestSeqNo) {
      // This can happen in two cases:
      // 1) Your desired event is still in the service, but the receiver
      //    cursor is in the wrong spot.
      // 2) Your desired event has expired from the service.
      // First, we'll check for case (1).

      logInfo(
        s"(TID $taskId) checkCursor. Recreating a receiver for namespaceUri: $namespaceUri EventHubNameAndPartition:" +
          s" $nAndP consumer group: $consumerGroup. requestSeqNo: $requestSeqNo, receivedSeqNo: $receivedSeqNo")
      recreateReceiver(requestSeqNo)
      val movedEvent = awaitReceiveMessage(
        receiveOne(ehConf.receiverTimeout.getOrElse(DefaultReceiverTimeout), "checkCursor move"),
        requestSeqNo)
      val movedSeqNo = movedEvent.head.getSystemProperties.getSequenceNumber
      if (movedSeqNo != requestSeqNo) {
        // The event still isn't present. It must be (2).
        val info = Await.result(
          retryJava(client.getPartitionRuntimeInformation(nAndP.partitionId.toString),
                    "partitionRuntime"),
          ehConf.internalOperationTimeout)

        if (requestSeqNo < info.getBeginSequenceNumber &&
            movedSeqNo == info.getBeginSequenceNumber) {
          Future {
            movedEvent
          }
        } else {
          throw new IllegalStateException(
            s"In partition ${info.getPartitionId} of ${info.getEventHubPath}, with consumer group $consumerGroup, " +
              s"request seqNo $requestSeqNo is less than the received seqNo $receivedSeqNo. The earliest seqNo is " +
              s"${info.getBeginSequenceNumber}, the last seqNo is ${info.getLastEnqueuedSequenceNumber}, and " +
              s"received seqNo $movedSeqNo")
        }
      } else {
        Future {
          movedEvent
        }
      }
    } else {
      Future {
        event
      }
    }
  }