in core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala [661:726]
private def handleMetadataRequest(req: Metadata.Request): Metadata.Response = req match {
case Metadata.ListTopics =>
Metadata.Topics(Try {
consumer
.listTopics(settings.getMetadataRequestTimeout)
.asScala
.map {
case (k, v) => k -> v.asScala.toList
}
.toMap
})
case Metadata.GetPartitionsFor(topic) =>
Metadata.PartitionsFor(Try {
consumer.partitionsFor(topic, settings.getMetadataRequestTimeout).asScala.toList
})
case Metadata.GetBeginningOffsets(partitions) =>
Metadata.BeginningOffsets(Try {
consumer
.beginningOffsets(partitions.asJava, settings.getMetadataRequestTimeout)
.asScala
.map {
case (k, v) => k -> (v: Long)
}
.toMap
})
case Metadata.GetEndOffsets(partitions) =>
Metadata.EndOffsets(Try {
consumer
.endOffsets(partitions.asJava, settings.getMetadataRequestTimeout)
.asScala
.map {
case (k, v) => k -> (v: Long)
}
.toMap
})
case Metadata.GetOffsetsForTimes(timestampsToSearch) =>
Metadata.OffsetsForTimes(Try {
val search = timestampsToSearch.map {
case (k, v) => k -> (v: java.lang.Long)
}.asJava
consumer.offsetsForTimes(search, settings.getMetadataRequestTimeout).asScala.toMap
})
case Metadata.GetCommittedOffsets(partitions) =>
Metadata.CommittedOffsets(
Try {
consumer
.committed(partitions.asJava, settings.getMetadataRequestTimeout)
.asScala
.filterNot(_._2 == null)
.toMap
})
case req: Metadata.GetCommittedOffset @nowarn("cat=deprecation") =>
@nowarn("cat=deprecation") val resp = Metadata.CommittedOffset(
Try {
@nowarn("cat=deprecation") val offset = consumer.committed(req.partition, settings.getMetadataRequestTimeout)
offset
},
req.partition)
resp
}