def toNewForm()

in connectors/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/Transformer.scala [46:108]


  def toNewForm(spark: SparkSession,
                df: DataFrame): DataFrame = {
    df.createOrReplaceTempView("tsfle_old_form")
    // use to record device and their measurement
    var map = new scala.collection.mutable.HashMap[String, List[String]]()
    // use to record all the measurement, prepare for the union
    var mMap = scala.collection.mutable.HashMap[String, DataType]()

    // this step is to record device_name and measurement_name
    df.schema.foreach(f => {
      if (!QueryConstant.RESERVED_TIME.equals(f.name)) {
        val pos = f.name.lastIndexOf('.')
        val diviceName = f.name.substring(0, pos)
        val measurement_name = f.name.substring(pos + 1)
        if (map.contains(diviceName)) {
          map(diviceName) = map(diviceName) :+ measurement_name
        }
        else {
          var l: List[String] = List()
          l = l :+ (measurement_name)
          map += (diviceName -> l)
        }
        mMap += (measurement_name -> f.dataType)
      }
    })

    // we first get each device's measurement data and then union them to get what we want, means:
    // +---------+-----------+---+---+----+
    // |timestamp|device_name| m1| m2|  m3|
    // +---------+-----------+---+---+----+
    // |        1| root.ln.d2| 21| 22|  23|
    // |        1| root.ln.d1| 11| 12|null|
    // +---------+-----------+---+---+----+
    var res: org.apache.spark.sql.DataFrame = null
    map.keys.foreach { deviceName =>
      // build query
      var query = "select " + QueryConstant.RESERVED_TIME + ", \"" + deviceName + "\" as device_name"
      val measurement_name = map(deviceName)
      mMap.keySet.foreach { m =>
        val pos = measurement_name.indexOf(m)
        if (pos >= 0) {
          // select normal column
          query += ", `" + deviceName + "." + m + "` as " + m
        }
        else {
          // fill null column
          query += ", NULL as " + m
        }
      }

      query += " from tsfle_old_form"
      val curDF = spark.sql(query)

      if (res == null) {
        res = curDF
      }
      else {
        res = res.union(curDF)
      }
    }

    res
  }