in connectors/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/DefaultSource.scala [85:257]
override def buildReader(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow]
= {
val broadcastedConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
(file: PartitionedFile) => {
val log = LoggerFactory.getLogger(classOf[DefaultSource])
log.info("This partition starts from " + file.start.asInstanceOf[java.lang.Long]
+ " and ends at " + (file.start + file.length).asInstanceOf[java.lang.Long])
log.info(file.toString())
val conf = broadcastedConf.value.value
val in = new HDFSInput(new Path(new URI(file.filePath)), conf)
val reader: TsFileSequenceReader = new TsFileSequenceReader(in)
val tsFileMetaData = reader.readFileMetadata
val readTsFile: TsFileReader = new TsFileReader(reader)
Option(TaskContext.get()).foreach { taskContext => {
taskContext.addTaskCompletionListener { _ => readTsFile.close() }
log.info("task Id: " + taskContext.taskAttemptId() + " partition Id: " +
taskContext.partitionId())
}
}
if (options.getOrElse(DefaultSource.isNarrowForm, "").equals("narrow_form")) {
val deviceNames = reader.getAllDevices.map(deviceID => deviceID.asInstanceOf[PlainDeviceID].toStringID)
val measurementNames = new java.util.HashSet[String]()
requiredSchema.foreach((field: StructField) => {
if (field.name != QueryConstant.RESERVED_TIME
&& field.name != NarrowConverter.DEVICE_NAME) {
measurementNames += field.name
}
})
// construct queryExpression based on queriedSchema and filters
val queryExpressions = NarrowConverter.toQueryExpression(dataSchema, deviceNames,
measurementNames, filters, reader, file.start.asInstanceOf[java.lang.Long],
(file.start + file.length).asInstanceOf[java.lang.Long])
val queryDataSets = Executor.query(readTsFile, queryExpressions,
file.start.asInstanceOf[java.lang.Long],
(file.start + file.length).asInstanceOf[java.lang.Long])
var queryDataSet: QueryDataSet = null
var deviceName: String = null
def queryNext(): Boolean = {
if (queryDataSet != null && queryDataSet.hasNext) {
return true
}
if (queryDataSets.isEmpty) {
return false
}
queryDataSet = queryDataSets.remove(queryDataSets.size() - 1)
while (!queryDataSet.hasNext) {
if (queryDataSets.isEmpty) {
return false
}
queryDataSet = queryDataSets.remove(queryDataSets.size() - 1)
}
deviceName = queryDataSet.getPaths.get(0).getDeviceString
true
}
new Iterator[InternalRow] {
private val rowBuffer = Array.fill[Any](requiredSchema.length)(null)
private val safeDataRow = new GenericRow(rowBuffer)
// Used to convert `Row`s containing data columns into `InternalRow`s.
private val encoderForDataColumns = RowEncoder(requiredSchema)
override def hasNext: Boolean = {
queryNext()
}
override def next(): InternalRow = {
val curRecord = queryDataSet.next()
val fields = curRecord.getFields
val paths = queryDataSet.getPaths
//index in one required row
var index = 0
requiredSchema.foreach((field: StructField) => {
if (field.name == QueryConstant.RESERVED_TIME) {
rowBuffer(index) = curRecord.getTimestamp
}
else if (field.name == NarrowConverter.DEVICE_NAME) {
rowBuffer(index) = deviceName
}
else {
val pos = paths.indexOf(new org.apache.tsfile.read.common.Path(deviceName,
field.name, true))
var curField: Field = null
if (pos != -1) {
curField = fields.get(pos)
}
rowBuffer(index) = NarrowConverter.toSqlValue(curField)
}
index += 1
})
encoderForDataColumns.toRow(safeDataRow)
}
}
}
else {
// get queriedSchema from requiredSchema
var queriedSchema = WideConverter.prepSchema(requiredSchema, tsFileMetaData, reader)
// construct queryExpression based on queriedSchema and filters
val queryExpression = WideConverter.toQueryExpression(queriedSchema, filters)
val queryDataSet = readTsFile.query(queryExpression,
file.start.asInstanceOf[java.lang.Long],
(file.start + file.length).asInstanceOf[java.lang.Long])
new Iterator[InternalRow] {
private val rowBuffer = Array.fill[Any](requiredSchema.length)(null)
private val safeDataRow = new GenericRow(rowBuffer)
// Used to convert `Row`s containing data columns into `InternalRow`s.
private val encoderForDataColumns = RowEncoder(requiredSchema)
override def hasNext: Boolean = {
val hasNext = queryDataSet.hasNext
hasNext
}
override def next(): InternalRow = {
val curRecord = queryDataSet.next()
val fields = curRecord.getFields
val paths = queryDataSet.getPaths
//index in one required row
var index = 0
requiredSchema.foreach((field: StructField) => {
if (field.name == QueryConstant.RESERVED_TIME) {
rowBuffer(index) = curRecord.getTimestamp
} else {
val pos = paths.indexOf(new org.apache.tsfile.read.common.Path(field.name, true))
var curField: Field = null
if (pos != -1) {
curField = fields.get(pos)
}
rowBuffer(index) = WideConverter.toSqlValue(curField)
}
index += 1
})
encoderForDataColumns.toRow(safeDataRow)
}
}
}
}
}