in connectors/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/WideConverter.scala [468:503]
def toTsRecord(row: InternalRow, dataSchema: StructType): List[TSRecord] = {
val time = row.getLong(0)
val deviceToRecord = scala.collection.mutable.Map[String, TSRecord]()
var index = 1
dataSchema.fields.filter(f => {
!QueryConstant.RESERVED_TIME.equals(f.name)
}).foreach(f => {
val name = f.name
val fullPath = new Path(name, true)
val device = fullPath.getDeviceString
val measurement = fullPath.getMeasurement
if (!deviceToRecord.contains(device)) {
deviceToRecord.put(device, new TSRecord(device, time))
}
val tsRecord: TSRecord = deviceToRecord.getOrElse(device, new TSRecord(device, time))
val dataType = getTsDataType(f.dataType)
if (!row.isNullAt(index)) {
val value = f.dataType match {
case BooleanType => row.getBoolean(index)
case IntegerType => row.getInt(index)
case LongType => row.getLong(index)
case FloatType => row.getFloat(index)
case DoubleType => row.getDouble(index)
case StringType => row.getString(index)
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
}
val dataPoint = DataPoint.getDataPoint(dataType, measurement, value.toString)
tsRecord.addTuple(dataPoint)
}
index += 1
})
deviceToRecord.values.toList
}