in core/src/main/scala/org/apache/spark/eventhubs/EventHubsUtils.scala [135:157]
def createReceiverInner(
client: EventHubClient,
useExclusiveReceiver: Boolean,
consumerGroup: String,
partitionId: String,
eventPosition: ehep,
receiverOptions: ReceiverOptions): CompletableFuture[PartitionReceiver] = {
val taskId = EventHubsUtils.getTaskId
logInfo(
s"(TID $taskId) creating receiver for Event Hub partition $partitionId, consumer group $consumerGroup " +
s"with epoch receiver option $useExclusiveReceiver")
if (useExclusiveReceiver) {
client.createEpochReceiver(consumerGroup,
partitionId,
eventPosition,
DefaultEpoch,
receiverOptions)
} else {
client.createReceiver(consumerGroup, partitionId, eventPosition, receiverOptions)
}
}