in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/JsonUtils.scala [71:85]
def partitionOffsets(str: String): Map[MessageQueue, Long] = {
try {
Serialization.read[Map[String, Map[String, Map[Int, Long]]]](str).flatMap { case (topic, brokers) =>
brokers.flatMap { case (broker, queues) =>
queues.map { case (queue, offset) =>
new MessageQueue(topic, broker, queue) -> offset
}
}
}.toMap
} catch {
case NonFatal(x) =>
throw new IllegalArgumentException(
s"""Expected e.g. {"topicA":{"broker1":{"0":23,"1":-1},"broker2":{"0":23}},"topicB":{"broker3":{"0":-2}}}, got $str""")
}
}