in kafka/src/main/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImpl.scala [139:159]
private def getOffsetsOnAssign(
readOffsets: ReadOffsets,
metadataClient: MetadataClientAdapter): Set[TopicPartition] => Future[Map[TopicPartition, Long]] =
(assignedTps: Set[TopicPartition]) => {
val delay = sourceProviderSettings.readOffsetDelay
pekko.pattern.after(delay, scheduler) {
readOffsets()
.flatMap {
case Some(groupOffsets) =>
val filteredMap = groupOffsets.entries.collect {
case (topicPartitionKey, offset) if assignedTps.contains(keyToPartition(topicPartitionKey)) =>
keyToPartition(topicPartitionKey) -> (offset.asInstanceOf[Long] + 1L)
}
Future.successful(filteredMap)
case None => metadataClient.getBeginningOffsets(assignedTps)
}
.recover {
case ex => throw new RuntimeException("External offsets could not be retrieved", ex)
}
}
}