in core/src/main/scala/org/apache/spark/eventhubs/client/EventHubsClient.scala [264:353]
override def translate(ehConf: EventHubsConf,
partitionCount: Int,
useStart: Boolean = true): Map[PartitionId, SequenceNumber] = {
val completed = mutable.Map[PartitionId, SequenceNumber]()
val needsTranslation = ArrayBuffer[(NameAndPartition, EventPosition)]()
val NamespaceAndEhName: String = ehConf.namespaceUri + ":" + ehConf.name
logInfo(s"translate: NsAndEhName: $NamespaceAndEhName useStart is set to $useStart.")
val positions = if (useStart) {
ehConf.startingPositions.getOrElse(Map.empty).par
} else {
ehConf.endingPositions.getOrElse(Map.empty).par
}
val defaultPos = if (useStart) {
ehConf.startingPosition.getOrElse(DefaultEventPosition)
} else {
ehConf.endingPosition.getOrElse(DefaultEndingPosition)
}
logInfo(s"translate: NsAndEhName: $NamespaceAndEhName PerPartitionPositions = $positions")
logInfo(s"translate: NsAndEhName: $NamespaceAndEhName Default position = $defaultPos")
(0 until partitionCount).par.foreach { id =>
val nAndP = NameAndPartition(ehConf.name, id)
val position = positions.getOrElse(nAndP, defaultPos)
if (position.seqNo >= 0L) {
// We don't need to translate a sequence number.
// Put it straight into the results.
synchronized(completed.put(id, position.seqNo))
} else {
val tuple = (nAndP, position)
synchronized(needsTranslation += tuple)
}
}
logInfo(s"translate: NsAndEhName: $NamespaceAndEhName needsTranslation = $needsTranslation")
val consumerGroup = ehConf.consumerGroup.getOrElse(DefaultConsumerGroup)
val futures = for ((nAndP, pos) <- needsTranslation)
yield
pos.offset match {
case StartOfStream => (nAndP.partitionId, earliestSeqNoF(nAndP.partitionId))
case EndOfStream => (nAndP.partitionId, latestSeqNoF(nAndP.partitionId))
case _ =>
val runtimeInfo =
Await.result(getRunTimeInfoF(nAndP.partitionId), ehConf.internalOperationTimeout)
var receiver: Future[PartitionReceiver] = null
val seqNo =
if (runtimeInfo.getIsEmpty || (pos.enqueuedTime != null &&
runtimeInfo.getLastEnqueuedTimeUtc.isBefore(pos.enqueuedTime.toInstant))) {
Future.successful(runtimeInfo.getLastEnqueuedSequenceNumber + 1)
} else {
logInfo(
s"translate: creating receiver for Event Hub ${nAndP.ehName} on partition ${nAndP.partitionId}. filter: ${pos.convert}")
val receiverOptions = new ReceiverOptions
receiverOptions.setPrefetchCount(1)
receiverOptions.setIdentifier(s"spark-${SparkEnv.get.executorId}")
receiver = retryJava(
EventHubsUtils.createReceiverInner(client,
ehConf.useExclusiveReceiver,
consumerGroup,
nAndP.partitionId.toString,
pos.convert,
receiverOptions),
"translate: receiver creation."
)
receiver
.flatMap { r =>
r.setReceiveTimeout(ehConf.receiverTimeout.getOrElse(DefaultReceiverTimeout))
retryNotNull(r.receive(1), "translate: receive call")
}
.map { e =>
e.iterator.next.getSystemProperties.getSequenceNumber
}
}
(nAndP.partitionId, seqNo)
}
val future = Future
.traverse(futures) {
case (p, f) =>
f.map { seqNo =>
(p, seqNo)
}
}
.map(x => x.toMap ++ completed)
.map(identity)
Await.result(future, ehConf.internalOperationTimeout)
}