in core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSourceProvider.scala [167:231]
def toInternalRow(rdd: EventHubsRDD): RDD[InternalRow] = {
rdd.mapPartitionsWithIndex { (p, iter) =>
{
iter.map { ed =>
InternalRow(
ed.getBytes,
UTF8String.fromString(p.toString),
UTF8String.fromString(ed.getSystemProperties.getOffset),
ed.getSystemProperties.getSequenceNumber,
DateTimeUtils.fromJavaTimestamp(
new java.sql.Timestamp(ed.getSystemProperties.getEnqueuedTime.toEpochMilli)),
UTF8String.fromString(ed.getSystemProperties.getPublisher),
UTF8String.fromString(ed.getSystemProperties.getPartitionKey),
ArrayBasedMapData(ed.getProperties.asScala
.mapValues {
case b: Binary =>
val buf = b.asByteBuffer()
val arr = new Array[Byte](buf.remaining)
buf.get(arr)
arr.asInstanceOf[AnyRef]
case d128: Decimal128 => d128.asBytes.asInstanceOf[AnyRef]
case d32: Decimal32 => d32.getBits.asInstanceOf[AnyRef]
case d64: Decimal64 => d64.getBits.asInstanceOf[AnyRef]
case s: Symbol => s.toString.asInstanceOf[AnyRef]
case ub: UnsignedByte => ub.toString.asInstanceOf[AnyRef]
case ui: UnsignedInteger => ui.toString.asInstanceOf[AnyRef]
case ul: UnsignedLong => ul.toString.asInstanceOf[AnyRef]
case us: UnsignedShort => us.toString.asInstanceOf[AnyRef]
case c: Character => c.toString.asInstanceOf[AnyRef]
case d: DescribedType => d.getDescribed
case default => default
}
.map { p =>
p._2 match {
case s: String => UTF8String.fromString(p._1) -> UTF8String.fromString(s)
case default =>
UTF8String.fromString(p._1) -> UTF8String.fromString(Serialization.write(p._2))
}
}),
ArrayBasedMapData(
// Don't duplicate offset, enqueued time, and seqNo
(ed.getSystemProperties.asScala -- Seq(OffsetAnnotation,
SequenceNumberAnnotation,
EnqueuedTimeAnnotation))
.mapValues {
case b: Binary =>
val buf = b.asByteBuffer()
val arr = new Array[Byte](buf.remaining)
buf.get(arr)
arr.asInstanceOf[AnyRef]
case default => default
}
.map { p =>
p._2 match {
case s: String => UTF8String.fromString(p._1) -> UTF8String.fromString(s)
case default =>
UTF8String.fromString(p._1) -> UTF8String.fromString(
Serialization.write(p._2))
}
})
)
}
}
}
}