in parquet/src/main/scala/magnolify/parquet/ParquetField.scala [337:411]
implicit def ptIterable[T, C[T]](implicit
t: ParquetField[T],
ti: C[T] => Iterable[T],
fc: FactoryCompat[T, C[T]],
pa: ParquetArray
): ParquetField[C[T]] = {
new ParquetField[C[T]] {
// Legacy compat with Magnolify <= 0.7; future versions will remove AvroCompat in favor of
// Configuration-based approach
@nowarn("cat=deprecation")
val avroCompatImported: Boolean = pa match {
case ParquetArray.default => false
case ParquetArray.AvroCompat.avroCompat => true
}
override def buildSchema(cm: CaseMapper, properties: MagnolifyParquetProperties): Type = {
val repeatedSchema =
Schema.setRepetition(t.schema(cm, properties), Repetition.REPEATED)
if (isGroup(properties)) {
Types
.requiredGroup()
.addField(Schema.rename(repeatedSchema, AvroArrayField))
.as(LogicalTypeAnnotation.listType())
.named("iterable")
} else {
repeatedSchema
}
}
override protected def isGroup(properties: MagnolifyParquetProperties): Boolean =
avroCompatImported || properties.WriteAvroCompatibleArrays
override protected def isEmpty(v: C[T]): Boolean = v.forall(t.isEmpty)
override def write(
c: RecordConsumer,
v: C[T]
)(cm: CaseMapper, properties: MagnolifyParquetProperties): Unit =
if (isGroup(properties)) {
c.startField(AvroArrayField, 0)
v.foreach(t.writeGroup(c, _)(cm, properties))
c.endField(AvroArrayField, 0)
} else {
v.foreach(t.writeGroup(c, _)(cm, properties))
}
override def newConverter(writerSchema: Type): TypeConverter[C[T]] = {
val buffered = t
.newConverter(writerSchema)
.asInstanceOf[TypeConverter.Buffered[T]]
.withRepetition(Repetition.REPEATED)
val arrayConverter = new TypeConverter.Delegate[T, C[T]](buffered) {
override def get: C[T] = inner.get(fc.fromSpecific)
}
if (Schema.hasGroupedArray(writerSchema)) {
new GroupConverter with TypeConverter.Buffered[C[T]] {
override def getConverter(fieldIndex: Int): Converter = {
require(fieldIndex == 0, "Avro array field index != 0")
arrayConverter
}
override def start(): Unit = ()
override def end(): Unit = addValue(arrayConverter.get)
override def get: C[T] = get(_.headOption.getOrElse(fc.newBuilder.result()))
}
} else {
arrayConverter
}
}
override def fieldDocs(cm: CaseMapper): Map[String, String] = t.fieldDocs(cm)
override def typeDoc: Option[String] = None
}
}