in parquet/src/main/scala/magnolify/parquet/ParquetField.scala [416:501]
implicit def pfMap[K, V](implicit
pfKey: ParquetField[K],
pfValue: ParquetField[V]
): ParquetField[Map[K, V]] = {
new ParquetField[Map[K, V]] {
override def buildSchema(cm: CaseMapper, properties: MagnolifyParquetProperties): Type = {
val keySchema = Schema.rename(pfKey.schema(cm, properties), KeyField)
require(keySchema.isRepetition(Repetition.REQUIRED), "Map key must be required")
val valueSchema = Schema.rename(pfValue.schema(cm, properties), ValueField)
val keyValue = Types
.repeatedGroup()
.addField(keySchema)
.addField(valueSchema)
.named(KeyValueGroup)
Types
.requiredGroup()
.addField(keyValue)
.as(LogicalTypeAnnotation.mapType())
.named("map")
}
override protected def isEmpty(v: Map[K, V]): Boolean = v.isEmpty
override def fieldDocs(cm: CaseMapper): Map[String, String] = Map.empty
override val typeDoc: Option[String] = None
override def write(c: RecordConsumer, v: Map[K, V])(
cm: CaseMapper,
properties: MagnolifyParquetProperties
): Unit = {
if (v.nonEmpty) {
c.startGroup()
c.startField(KeyValueGroup, 0)
v.foreach { case (k, v) =>
c.startGroup()
c.startField(KeyField, 0)
pfKey.writeGroup(c, k)(cm, properties)
c.endField(KeyField, 0)
if (pfValue.nonEmpty(v)) {
c.startField(ValueField, 1)
pfValue.writeGroup(c, v)(cm, properties)
c.endField(ValueField, 1)
}
c.endGroup()
}
c.endField(KeyValueGroup, 0)
c.endGroup()
}
}
override def newConverter(writerSchema: Type): TypeConverter[Map[K, V]] = {
val kvConverter = new GroupConverter with TypeConverter.Buffered[(K, V)] {
private val keyConverter = pfKey.newConverter(writerSchema)
private val valueConverter = pfValue.newConverter(writerSchema)
private val fieldConverters = Array(keyConverter, valueConverter)
override def isPrimitive: Boolean = false
override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex)
override def start(): Unit = ()
override def end(): Unit = {
val key = keyConverter.get
val value = valueConverter.get
addValue(key -> value)
}
}.withRepetition(Repetition.REPEATED)
val mapConverter = new TypeConverter.Delegate[(K, V), Map[K, V]](kvConverter) {
override def get: Map[K, V] = inner.get(_.toMap)
}
new GroupConverter with TypeConverter.Buffered[Map[K, V]] {
override def getConverter(fieldIndex: Int): Converter = {
require(fieldIndex == 0, "Map field index != 0")
mapConverter
}
override def start(): Unit = ()
override def end(): Unit = addValue(mapConverter.get)
override def get: Map[K, V] = get(_.headOption.getOrElse(Map.empty))
}
}
}
}