in core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala [791:884]
private def prepareOffsetsToReset(groupId: String,
partitionsToReset: Seq[TopicPartition]): Map[TopicPartition, OffsetAndMetadata] = {
if (opts.options.has(opts.resetToOffsetOpt)) {
val offset = opts.options.valueOf(opts.resetToOffsetOpt)
checkOffsetsRange(groupId, partitionsToReset.map((_, offset)).toMap).map {
case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
}
} else if (opts.options.has(opts.resetToEarliestOpt)) {
val logStartOffsets = getLogStartOffsets(groupId, partitionsToReset)
partitionsToReset.map { topicPartition =>
logStartOffsets.get(topicPartition) match {
case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting starting offset of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetToLatestOpt)) {
val logEndOffsets = getLogEndOffsets(groupId, partitionsToReset)
partitionsToReset.map { topicPartition =>
logEndOffsets.get(topicPartition) match {
case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetShiftByOpt)) {
val currentCommittedOffsets = getCommittedOffsets(groupId)
val requestedOffsets = partitionsToReset.map { topicPartition =>
val shiftBy = opts.options.valueOf(opts.resetShiftByOpt)
val currentOffset = currentCommittedOffsets.getOrElse(topicPartition,
throw new IllegalArgumentException(s"Cannot shift offset for partition $topicPartition since there is no current committed offset")).offset
(topicPartition, currentOffset + shiftBy)
}.toMap
checkOffsetsRange(groupId, requestedOffsets).map {
case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
}
} else if (opts.options.has(opts.resetToDatetimeOpt)) {
val timestamp = convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt))
val logTimestampOffsets = getLogTimestampOffsets(groupId, partitionsToReset, timestamp)
partitionsToReset.map { topicPartition =>
val logTimestampOffset = logTimestampOffsets.get(topicPartition)
logTimestampOffset match {
case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting offset by timestamp of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetByDurationOpt)) {
val duration = opts.options.valueOf(opts.resetByDurationOpt)
val durationParsed = Duration.parse(duration)
val now = Instant.now()
durationParsed.negated().addTo(now)
val timestamp = now.minus(durationParsed).toEpochMilli
val logTimestampOffsets = getLogTimestampOffsets(groupId, partitionsToReset, timestamp)
partitionsToReset.map { topicPartition =>
val logTimestampOffset = logTimestampOffsets.get(topicPartition)
logTimestampOffset match {
case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting offset by timestamp of topic partition: $topicPartition")
}
}.toMap
} else if (resetPlanFromFile.isDefined) {
resetPlanFromFile.map(resetPlan => resetPlan.get(groupId).map { resetPlanForGroup =>
val requestedOffsets = resetPlanForGroup.keySet.map { topicPartition =>
topicPartition -> resetPlanForGroup(topicPartition).offset
}.toMap
checkOffsetsRange(groupId, requestedOffsets).map {
case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
}
} match {
case Some(resetPlanForGroup) => resetPlanForGroup
case None =>
printError(s"No reset plan for group $groupId found")
Map[TopicPartition, OffsetAndMetadata]()
}).getOrElse(Map.empty)
} else if (opts.options.has(opts.resetToCurrentOpt)) {
val currentCommittedOffsets = getCommittedOffsets(groupId)
val (partitionsToResetWithCommittedOffset, partitionsToResetWithoutCommittedOffset) =
partitionsToReset.partition(currentCommittedOffsets.keySet.contains(_))
val preparedOffsetsForPartitionsWithCommittedOffset = partitionsToResetWithCommittedOffset.map { topicPartition =>
(topicPartition, new OffsetAndMetadata(currentCommittedOffsets.get(topicPartition) match {
case Some(offset) => offset.offset
case None => throw new IllegalStateException(s"Expected a valid current offset for topic partition: $topicPartition")
}))
}.toMap
val preparedOffsetsForPartitionsWithoutCommittedOffset = getLogEndOffsets(groupId, partitionsToResetWithoutCommittedOffset).map {
case (topicPartition, LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
case (topicPartition, _) => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of topic partition: $topicPartition")
}
preparedOffsetsForPartitionsWithCommittedOffset ++ preparedOffsetsForPartitionsWithoutCommittedOffset
} else {
CommandLineUtils.printUsageAndDie(opts.parser, "Option '%s' requires one of the following scenarios: %s".format(opts.resetOffsetsOpt, opts.allResetOffsetScenarioOpts) )
}
}