def toInternalRow()

in core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSourceProvider.scala [167:231]


  def toInternalRow(rdd: EventHubsRDD): RDD[InternalRow] = {
    rdd.mapPartitionsWithIndex { (p, iter) =>
      {
        iter.map { ed =>
          InternalRow(
            ed.getBytes,
            UTF8String.fromString(p.toString),
            UTF8String.fromString(ed.getSystemProperties.getOffset),
            ed.getSystemProperties.getSequenceNumber,
            DateTimeUtils.fromJavaTimestamp(
              new java.sql.Timestamp(ed.getSystemProperties.getEnqueuedTime.toEpochMilli)),
            UTF8String.fromString(ed.getSystemProperties.getPublisher),
            UTF8String.fromString(ed.getSystemProperties.getPartitionKey),
            ArrayBasedMapData(ed.getProperties.asScala
              .mapValues {
                case b: Binary =>
                  val buf = b.asByteBuffer()
                  val arr = new Array[Byte](buf.remaining)
                  buf.get(arr)
                  arr.asInstanceOf[AnyRef]
                case d128: Decimal128    => d128.asBytes.asInstanceOf[AnyRef]
                case d32: Decimal32      => d32.getBits.asInstanceOf[AnyRef]
                case d64: Decimal64      => d64.getBits.asInstanceOf[AnyRef]
                case s: Symbol           => s.toString.asInstanceOf[AnyRef]
                case ub: UnsignedByte    => ub.toString.asInstanceOf[AnyRef]
                case ui: UnsignedInteger => ui.toString.asInstanceOf[AnyRef]
                case ul: UnsignedLong    => ul.toString.asInstanceOf[AnyRef]
                case us: UnsignedShort   => us.toString.asInstanceOf[AnyRef]
                case c: Character        => c.toString.asInstanceOf[AnyRef]
                case d: DescribedType    => d.getDescribed
                case default             => default
              }
              .map { p =>
                p._2 match {
                  case s: String => UTF8String.fromString(p._1) -> UTF8String.fromString(s)
                  case default =>
                    UTF8String.fromString(p._1) -> UTF8String.fromString(Serialization.write(p._2))
                }
              }),
            ArrayBasedMapData(
              // Don't duplicate offset, enqueued time, and seqNo
              (ed.getSystemProperties.asScala -- Seq(OffsetAnnotation,
                                                     SequenceNumberAnnotation,
                                                     EnqueuedTimeAnnotation))
                .mapValues {
                  case b: Binary =>
                    val buf = b.asByteBuffer()
                    val arr = new Array[Byte](buf.remaining)
                    buf.get(arr)
                    arr.asInstanceOf[AnyRef]
                  case default => default
                }
                .map { p =>
                  p._2 match {
                    case s: String => UTF8String.fromString(p._1) -> UTF8String.fromString(s)
                    case default =>
                      UTF8String.fromString(p._1) -> UTF8String.fromString(
                        Serialization.write(p._2))
                  }
                })
          )
        }
      }
    }
  }