def partitions()

in core/src/main/scala/org/apache/spark/sql/eventhubs/JsonUtils.scala [57:80]


  def partitions(partitions: Iterable[NameAndPartition]): String =
    Serialization.write(partitions.groupBy(_.ehName))

  /**
   * Write per-NameAndPartition seqNos as json string
   */
  def partitionSeqNos(partitionSeqNos: Map[NameAndPartition, SequenceNumber]): String = {
    val result = new mutable.HashMap[String, mutable.HashMap[PartitionId, SequenceNumber]]()
    implicit val ordering = new Ordering[NameAndPartition] {
      override def compare(x: NameAndPartition, y: NameAndPartition): Int = {
        Ordering
          .Tuple2[String, PartitionId]
          .compare((x.ehName, x.partitionId), (y.ehName, y.partitionId))
      }
    }
    val partitions = partitionSeqNos.keySet.toSeq.sorted // sort for more determinism
    partitions.foreach { nAndP =>
      val seqNo = partitionSeqNos(nAndP)
      val parts = result.getOrElse(nAndP.ehName, new mutable.HashMap[PartitionId, SequenceNumber])
      parts += nAndP.partitionId -> seqNo
      result += nAndP.ehName -> parts
    }
    Serialization.write(result)
  }