in core/src/main/scala/org/apache/spark/sql/eventhubs/JsonUtils.scala [57:80]
def partitions(partitions: Iterable[NameAndPartition]): String =
Serialization.write(partitions.groupBy(_.ehName))
/**
* Write per-NameAndPartition seqNos as json string
*/
def partitionSeqNos(partitionSeqNos: Map[NameAndPartition, SequenceNumber]): String = {
val result = new mutable.HashMap[String, mutable.HashMap[PartitionId, SequenceNumber]]()
implicit val ordering = new Ordering[NameAndPartition] {
override def compare(x: NameAndPartition, y: NameAndPartition): Int = {
Ordering
.Tuple2[String, PartitionId]
.compare((x.ehName, x.partitionId), (y.ehName, y.partitionId))
}
}
val partitions = partitionSeqNos.keySet.toSeq.sorted // sort for more determinism
partitions.foreach { nAndP =>
val seqNo = partitionSeqNos(nAndP)
val parts = result.getOrElse(nAndP.ehName, new mutable.HashMap[PartitionId, SequenceNumber])
parts += nAndP.partitionId -> seqNo
result += nAndP.ehName -> parts
}
Serialization.write(result)
}