def partitionOffsets()

in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/JsonUtils.scala [71:85]


  def partitionOffsets(str: String): Map[MessageQueue, Long] = {
    try {
      Serialization.read[Map[String, Map[String, Map[Int, Long]]]](str).flatMap { case (topic, brokers) =>
        brokers.flatMap { case (broker, queues) =>
          queues.map { case (queue, offset) =>
            new MessageQueue(topic, broker, queue) -> offset
          }
        }
      }.toMap
    } catch {
      case NonFatal(x) =>
        throw new IllegalArgumentException(
          s"""Expected e.g. {"topicA":{"broker1":{"0":23,"1":-1},"broker2":{"0":23}},"topicB":{"broker3":{"0":-2}}}, got $str""")
    }
  }