in connectors/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/Transformer.scala [46:108]
def toNewForm(spark: SparkSession,
df: DataFrame): DataFrame = {
df.createOrReplaceTempView("tsfle_old_form")
// use to record device and their measurement
var map = new scala.collection.mutable.HashMap[String, List[String]]()
// use to record all the measurement, prepare for the union
var mMap = scala.collection.mutable.HashMap[String, DataType]()
// this step is to record device_name and measurement_name
df.schema.foreach(f => {
if (!QueryConstant.RESERVED_TIME.equals(f.name)) {
val pos = f.name.lastIndexOf('.')
val diviceName = f.name.substring(0, pos)
val measurement_name = f.name.substring(pos + 1)
if (map.contains(diviceName)) {
map(diviceName) = map(diviceName) :+ measurement_name
}
else {
var l: List[String] = List()
l = l :+ (measurement_name)
map += (diviceName -> l)
}
mMap += (measurement_name -> f.dataType)
}
})
// we first get each device's measurement data and then union them to get what we want, means:
// +---------+-----------+---+---+----+
// |timestamp|device_name| m1| m2| m3|
// +---------+-----------+---+---+----+
// | 1| root.ln.d2| 21| 22| 23|
// | 1| root.ln.d1| 11| 12|null|
// +---------+-----------+---+---+----+
var res: org.apache.spark.sql.DataFrame = null
map.keys.foreach { deviceName =>
// build query
var query = "select " + QueryConstant.RESERVED_TIME + ", \"" + deviceName + "\" as device_name"
val measurement_name = map(deviceName)
mMap.keySet.foreach { m =>
val pos = measurement_name.indexOf(m)
if (pos >= 0) {
// select normal column
query += ", `" + deviceName + "." + m + "` as " + m
}
else {
// fill null column
query += ", NULL as " + m
}
}
query += " from tsfle_old_form"
val curDF = spark.sql(query)
if (res == null) {
res = curDF
}
else {
res = res.union(curDF)
}
}
res
}