override def translate()

in core/src/main/scala/org/apache/spark/eventhubs/client/EventHubsClient.scala [264:353]


  override def translate(ehConf: EventHubsConf,
                         partitionCount: Int,
                         useStart: Boolean = true): Map[PartitionId, SequenceNumber] = {

    val completed = mutable.Map[PartitionId, SequenceNumber]()
    val needsTranslation = ArrayBuffer[(NameAndPartition, EventPosition)]()
    val NamespaceAndEhName: String = ehConf.namespaceUri + ":" + ehConf.name

    logInfo(s"translate: NsAndEhName: $NamespaceAndEhName useStart is set to $useStart.")
    val positions = if (useStart) {
      ehConf.startingPositions.getOrElse(Map.empty).par
    } else {
      ehConf.endingPositions.getOrElse(Map.empty).par
    }
    val defaultPos = if (useStart) {
      ehConf.startingPosition.getOrElse(DefaultEventPosition)
    } else {
      ehConf.endingPosition.getOrElse(DefaultEndingPosition)
    }
    logInfo(s"translate: NsAndEhName: $NamespaceAndEhName PerPartitionPositions = $positions")
    logInfo(s"translate: NsAndEhName: $NamespaceAndEhName Default position = $defaultPos")

    (0 until partitionCount).par.foreach { id =>
      val nAndP = NameAndPartition(ehConf.name, id)
      val position = positions.getOrElse(nAndP, defaultPos)
      if (position.seqNo >= 0L) {
        // We don't need to translate a sequence number.
        // Put it straight into the results.
        synchronized(completed.put(id, position.seqNo))
      } else {
        val tuple = (nAndP, position)
        synchronized(needsTranslation += tuple)
      }
    }
    logInfo(s"translate: NsAndEhName: $NamespaceAndEhName needsTranslation = $needsTranslation")

    val consumerGroup = ehConf.consumerGroup.getOrElse(DefaultConsumerGroup)
    val futures = for ((nAndP, pos) <- needsTranslation)
      yield
        pos.offset match {
          case StartOfStream => (nAndP.partitionId, earliestSeqNoF(nAndP.partitionId))
          case EndOfStream   => (nAndP.partitionId, latestSeqNoF(nAndP.partitionId))
          case _ =>
            val runtimeInfo =
              Await.result(getRunTimeInfoF(nAndP.partitionId), ehConf.internalOperationTimeout)
            var receiver: Future[PartitionReceiver] = null
            val seqNo =
              if (runtimeInfo.getIsEmpty || (pos.enqueuedTime != null &&
                  runtimeInfo.getLastEnqueuedTimeUtc.isBefore(pos.enqueuedTime.toInstant))) {
                Future.successful(runtimeInfo.getLastEnqueuedSequenceNumber + 1)
              } else {
                logInfo(
                  s"translate: creating receiver for Event Hub ${nAndP.ehName} on partition ${nAndP.partitionId}. filter: ${pos.convert}")
                val receiverOptions = new ReceiverOptions
                receiverOptions.setPrefetchCount(1)
                receiverOptions.setIdentifier(s"spark-${SparkEnv.get.executorId}")

                receiver = retryJava(
                  EventHubsUtils.createReceiverInner(client,
                                                     ehConf.useExclusiveReceiver,
                                                     consumerGroup,
                                                     nAndP.partitionId.toString,
                                                     pos.convert,
                                                     receiverOptions),
                  "translate: receiver creation."
                )
                receiver
                  .flatMap { r =>
                    r.setReceiveTimeout(ehConf.receiverTimeout.getOrElse(DefaultReceiverTimeout))
                    retryNotNull(r.receive(1), "translate: receive call")
                  }
                  .map { e =>
                    e.iterator.next.getSystemProperties.getSequenceNumber
                  }
              }

            (nAndP.partitionId, seqNo)
        }

    val future = Future
      .traverse(futures) {
        case (p, f) =>
          f.map { seqNo =>
            (p, seqNo)
          }
      }
      .map(x => x.toMap ++ completed)
      .map(identity)
    Await.result(future, ehConf.internalOperationTimeout)
  }