def insertDataFrame()

in connectors/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DataFrameTools.scala [31:91]


  def insertDataFrame(options: IoTDBOptions, dataframe: DataFrame): Unit = {
    val filteredColumns = Array[String]("Time", "Device")
    val sensorTypes = dataframe.dtypes.filter(x => !filteredColumns.contains(x._1))

    dataframe
      .repartition(options.numPartition.toInt)
      .sortWithinPartitions(dataframe.col("Device"))
      .foreachPartition { (partition: Iterator[Row]) =>
        val hostPort = options.url.split("//")(1).replace("/", "").split(":")
        val session = new Session(
          hostPort(0),
          hostPort(1).toInt,
          options.user,
          options.password
        )
        session.open()

        var device: lang.String = ""
        val times = new util.ArrayList[lang.Long]()
        val measurementsList = new util.ArrayList[util.List[lang.String]]()
        val typesList = new util.ArrayList[util.List[TSDataType]]()
        val valuesList = new util.ArrayList[util.List[Object]]()

        val batchSize = 1000
        var currentSize = 0

        partition.foreach { record =>
          if ("".equals(device)) device = record.get(1).toString
          else if (!device.equals(record.get(1).toString)) {
            insertAndEmptyDataSet(session, device, times, measurementsList, typesList, valuesList)
            device = record.get(1).toString
            currentSize = 0
          }
          val measurements = new util.ArrayList[lang.String]()
          val types = new util.ArrayList[TSDataType]()
          val values = new util.ArrayList[Object]()
          for (i <- 2 until record.length if !(record.get(i) == null)) {
            val value = typeTrans(record.get(i).toString, getType(sensorTypes(i - 2)._2))

            values.add(value)
            measurements.add(sensorTypes(i - 2)._1)
            types.add(getType(sensorTypes(i - 2)._2))
          }
          if (!values.isEmpty) {
            times.add(record.get(0).asInstanceOf[Long])
            measurementsList.add(measurements)
            typesList.add(types)
            valuesList.add(values)
            currentSize += 1
          }
          if (currentSize >= batchSize) {
            insertAndEmptyDataSet(session, device, times, measurementsList, typesList, valuesList)
            currentSize = 0
          }
        }

        insertAndEmptyDataSet(session, device, times, measurementsList, typesList, valuesList)
        session.close()
      }

  }