in connectors/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DataFrameTools.scala [31:91]
def insertDataFrame(options: IoTDBOptions, dataframe: DataFrame): Unit = {
val filteredColumns = Array[String]("Time", "Device")
val sensorTypes = dataframe.dtypes.filter(x => !filteredColumns.contains(x._1))
dataframe
.repartition(options.numPartition.toInt)
.sortWithinPartitions(dataframe.col("Device"))
.foreachPartition { (partition: Iterator[Row]) =>
val hostPort = options.url.split("//")(1).replace("/", "").split(":")
val session = new Session(
hostPort(0),
hostPort(1).toInt,
options.user,
options.password
)
session.open()
var device: lang.String = ""
val times = new util.ArrayList[lang.Long]()
val measurementsList = new util.ArrayList[util.List[lang.String]]()
val typesList = new util.ArrayList[util.List[TSDataType]]()
val valuesList = new util.ArrayList[util.List[Object]]()
val batchSize = 1000
var currentSize = 0
partition.foreach { record =>
if ("".equals(device)) device = record.get(1).toString
else if (!device.equals(record.get(1).toString)) {
insertAndEmptyDataSet(session, device, times, measurementsList, typesList, valuesList)
device = record.get(1).toString
currentSize = 0
}
val measurements = new util.ArrayList[lang.String]()
val types = new util.ArrayList[TSDataType]()
val values = new util.ArrayList[Object]()
for (i <- 2 until record.length if !(record.get(i) == null)) {
val value = typeTrans(record.get(i).toString, getType(sensorTypes(i - 2)._2))
values.add(value)
measurements.add(sensorTypes(i - 2)._1)
types.add(getType(sensorTypes(i - 2)._2))
}
if (!values.isEmpty) {
times.add(record.get(0).asInstanceOf[Long])
measurementsList.add(measurements)
typesList.add(types)
valuesList.add(values)
currentSize += 1
}
if (currentSize >= batchSize) {
insertAndEmptyDataSet(session, device, times, measurementsList, typesList, valuesList)
currentSize = 0
}
}
insertAndEmptyDataSet(session, device, times, measurementsList, typesList, valuesList)
session.close()
}
}