in core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala [727:815]
private def prepareOffsetsToReset(groupId: String, partitionsToReset: Seq[TopicPartition]): Map[TopicPartition, OffsetAndMetadata] = {
if (opts.options.has(opts.resetToOffsetOpt)) {
val offset = opts.options.valueOf(opts.resetToOffsetOpt)
checkOffsetsRange(partitionsToReset.map((_, offset)).toMap).map {
case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
}
} else if (opts.options.has(opts.resetToEarliestOpt)) {
val logStartOffsets = getLogStartOffsets(partitionsToReset)
partitionsToReset.map { topicPartition =>
logStartOffsets.get(topicPartition) match {
case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting starting offset of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetToLatestOpt)) {
val logEndOffsets = getLogEndOffsets(partitionsToReset)
partitionsToReset.map { topicPartition =>
logEndOffsets.get(topicPartition) match {
case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetShiftByOpt)) {
val currentCommittedOffsets = adminClient.listGroupOffsets(groupId)
val requestedOffsets = partitionsToReset.map { topicPartition =>
val shiftBy = opts.options.valueOf(opts.resetShiftByOpt)
val currentOffset = currentCommittedOffsets.getOrElse(topicPartition,
throw new IllegalArgumentException(s"Cannot shift offset for partition $topicPartition since there is no current committed offset"))
(topicPartition, currentOffset + shiftBy)
}.toMap
checkOffsetsRange(requestedOffsets).map {
case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
}
} else if (opts.options.has(opts.resetToDatetimeOpt)) {
val timestamp = convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt))
val logTimestampOffsets = getLogTimestampOffsets(partitionsToReset, timestamp)
partitionsToReset.map { topicPartition =>
val logTimestampOffset = logTimestampOffsets.get(topicPartition)
logTimestampOffset match {
case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting offset by timestamp of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetByDurationOpt)) {
val duration = opts.options.valueOf(opts.resetByDurationOpt)
val durationParsed = DatatypeFactory.newInstance().newDuration(duration)
val now = new Date()
durationParsed.negate().addTo(now)
val timestamp = now.getTime
val logTimestampOffsets = getLogTimestampOffsets(partitionsToReset, timestamp)
partitionsToReset.map { topicPartition =>
val logTimestampOffset = logTimestampOffsets.get(topicPartition)
logTimestampOffset match {
case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting offset by timestamp of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetFromFileOpt)) {
val resetPlanPath = opts.options.valueOf(opts.resetFromFileOpt)
val resetPlanCsv = Utils.readFileAsString(resetPlanPath)
val resetPlan = parseResetPlan(resetPlanCsv)
val requestedOffsets = resetPlan.keySet.map { topicPartition =>
(topicPartition, resetPlan(topicPartition).offset())
}.toMap
checkOffsetsRange(requestedOffsets).map {
case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
}
} else if (opts.options.has(opts.resetToCurrentOpt)) {
val currentCommittedOffsets = adminClient.listGroupOffsets(groupId)
val (partitionsToResetWithCommittedOffset, partitionsToResetWithoutCommittedOffset) =
partitionsToReset.partition(currentCommittedOffsets.keySet.contains(_))
val preparedOffsetsForParititionsWithCommittedOffset = partitionsToResetWithCommittedOffset.map { topicPartition =>
(topicPartition, new OffsetAndMetadata(currentCommittedOffsets.get(topicPartition) match {
case Some(offset) => offset
case _ => throw new IllegalStateException(s"Expected a valid current offset for topic partition: $topicPartition")
}))
}.toMap
val preparedOffsetsForPartitionsWithoutCommittedOffset = getLogEndOffsets(partitionsToResetWithoutCommittedOffset).map {
case (topicPartition, LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
case (topicPartition, _) => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of topic partition: $topicPartition")
}
preparedOffsetsForParititionsWithCommittedOffset ++ preparedOffsetsForPartitionsWithoutCommittedOffset
} else {
CommandLineUtils.printUsageAndDie(opts.parser, "Option '%s' requires one of the following scenarios: %s".format(opts.resetOffsetsOpt, opts.allResetOffsetScenarioOpts) )
}
}