in core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala [179:243]
private def checkCursor(requestSeqNo: SequenceNumber): Future[Iterable[EventData]] = {
val taskId = EventHubsUtils.getTaskId
val lastReceivedSeqNo =
Await.result(lastReceivedOffset(), ehConf.internalOperationTimeout)
if ((lastReceivedSeqNo > -1 && lastReceivedSeqNo + 1 != requestSeqNo) ||
!receiver.getIsOpen) {
logInfo(s"(TID $taskId) checkCursor. Recreating a receiver for namespaceUri: $namespaceUri " +
s"EventHubNameAndPartition: $nAndP consumer group: $consumerGroup. requestSeqNo: $requestSeqNo, " +
s"lastReceivedSeqNo: $lastReceivedSeqNo, isOpen: ${receiver.getIsOpen}")
recreateReceiver(requestSeqNo)
}
val event = awaitReceiveMessage(
receiveOne(ehConf.receiverTimeout.getOrElse(DefaultReceiverTimeout), "checkCursor initial"),
requestSeqNo)
val receivedSeqNo = event.head.getSystemProperties.getSequenceNumber
if (receivedSeqNo != requestSeqNo) {
// This can happen in two cases:
// 1) Your desired event is still in the service, but the receiver
// cursor is in the wrong spot.
// 2) Your desired event has expired from the service.
// First, we'll check for case (1).
logInfo(
s"(TID $taskId) checkCursor. Recreating a receiver for namespaceUri: $namespaceUri EventHubNameAndPartition:" +
s" $nAndP consumer group: $consumerGroup. requestSeqNo: $requestSeqNo, receivedSeqNo: $receivedSeqNo")
recreateReceiver(requestSeqNo)
val movedEvent = awaitReceiveMessage(
receiveOne(ehConf.receiverTimeout.getOrElse(DefaultReceiverTimeout), "checkCursor move"),
requestSeqNo)
val movedSeqNo = movedEvent.head.getSystemProperties.getSequenceNumber
if (movedSeqNo != requestSeqNo) {
// The event still isn't present. It must be (2).
val info = Await.result(
retryJava(client.getPartitionRuntimeInformation(nAndP.partitionId.toString),
"partitionRuntime"),
ehConf.internalOperationTimeout)
if (requestSeqNo < info.getBeginSequenceNumber &&
movedSeqNo == info.getBeginSequenceNumber) {
Future {
movedEvent
}
} else {
throw new IllegalStateException(
s"In partition ${info.getPartitionId} of ${info.getEventHubPath}, with consumer group $consumerGroup, " +
s"request seqNo $requestSeqNo is less than the received seqNo $receivedSeqNo. The earliest seqNo is " +
s"${info.getBeginSequenceNumber}, the last seqNo is ${info.getLastEnqueuedSequenceNumber}, and " +
s"received seqNo $movedSeqNo")
}
} else {
Future {
movedEvent
}
}
} else {
Future {
event
}
}
}