in core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala [374:418]
private[eventhubs] override def receive(ehConf: EventHubsConf,
nAndP: NameAndPartition,
requestSeqNo: SequenceNumber,
batchSize: Int): Iterator[EventData] = {
val taskId = EventHubsUtils.getTaskId
logInfo(s"(TID $taskId) EventHubsCachedReceiver look up. For namespaceUri ${ehConf.namespaceUri} " +
s"EventHubNameAndPartition $nAndP consumer group ${ehConf.consumerGroup.getOrElse(DefaultConsumerGroup)}. " +
s"requestSeqNo: $requestSeqNo, batchSize: $batchSize")
var receiver: CachedEventHubsReceiver = null
receivers.synchronized {
receiver = receivers.getOrElseUpdate(key(ehConf, nAndP), {
CachedEventHubsReceiver(ehConf, nAndP, requestSeqNo)
})
}
try {
receiver.receive(requestSeqNo, batchSize)
} catch {
case completionExecution: CompletionException =>
val exceptionCause = completionExecution.getCause
if (exceptionCause != null && exceptionCause.isInstanceOf[RejectedExecutionException] && exceptionCause.getMessage.contains("ReactorDispatcher instance is closed")) {
// reactor dispatcher closed case
logInfo(s"(TID $taskId) EventHubsCachedReceiver receive execution for namespaceUri ${ehConf.namespaceUri} " +
s"EventHubNameAndPartition $nAndP consumer group ${ehConf.consumerGroup.getOrElse(DefaultConsumerGroup)} " +
s"failed with $completionExecution. Try to recreate the entire CachedEventHubsReceiver instance in order to " +
s"use a fresh EventHubClient from the underlying java SDK, then try receiving events again.")
receiver.client.close();
receiver = CachedEventHubsReceiver(ehConf, nAndP, requestSeqNo)
receivers.synchronized {
receivers.update(key(ehConf, nAndP), receiver)
}
receiver.receive(requestSeqNo, batchSize)
} else if (exceptionCause != null && exceptionCause.isInstanceOf[ReceiverDisconnectedException]) {
logInfo(s"(TID $taskId) EventHubsCachedReceiver receive execution for namespaceUri ${ehConf.namespaceUri} " +
s"EventHubNameAndPartition $nAndP consumer group ${ehConf.consumerGroup.getOrElse(DefaultConsumerGroup)} " +
s"failed because another receiver for the same <NS-EH-CG-Part> combo has been created and caused this one " +
s"to get disconnected. The full error is: $completionExecution. Throw the exception so that the driver can " +
s"retry the task.")
throw completionExecution
} else {
throw completionExecution
}
}
}