private def getOffsetsOnAssign()

in kafka/src/main/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImpl.scala [139:159]


  private def getOffsetsOnAssign(
      readOffsets: ReadOffsets,
      metadataClient: MetadataClientAdapter): Set[TopicPartition] => Future[Map[TopicPartition, Long]] =
    (assignedTps: Set[TopicPartition]) => {
      val delay = sourceProviderSettings.readOffsetDelay
      pekko.pattern.after(delay, scheduler) {
        readOffsets()
          .flatMap {
            case Some(groupOffsets) =>
              val filteredMap = groupOffsets.entries.collect {
                case (topicPartitionKey, offset) if assignedTps.contains(keyToPartition(topicPartitionKey)) =>
                  keyToPartition(topicPartitionKey) -> (offset.asInstanceOf[Long] + 1L)
              }
              Future.successful(filteredMap)
            case None => metadataClient.getBeginningOffsets(assignedTps)
          }
          .recover {
            case ex => throw new RuntimeException("External offsets could not be retrieved", ex)
          }
      }
    }