override def buildReader()

in connectors/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/DefaultSource.scala [85:257]


  override def buildReader(
                            sparkSession: SparkSession,
                            dataSchema: StructType,
                            partitionSchema: StructType,
                            requiredSchema: StructType,
                            filters: Seq[Filter],
                            options: Map[String, String],
                            hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow]
  = {
    val broadcastedConf =
      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

    (file: PartitionedFile) => {
      val log = LoggerFactory.getLogger(classOf[DefaultSource])
      log.info("This partition starts from " + file.start.asInstanceOf[java.lang.Long]
        + " and ends at " + (file.start + file.length).asInstanceOf[java.lang.Long])
      log.info(file.toString())

      val conf = broadcastedConf.value.value
      val in = new HDFSInput(new Path(new URI(file.filePath)), conf)

      val reader: TsFileSequenceReader = new TsFileSequenceReader(in)

      val tsFileMetaData = reader.readFileMetadata

      val readTsFile: TsFileReader = new TsFileReader(reader)

      Option(TaskContext.get()).foreach { taskContext => {
        taskContext.addTaskCompletionListener { _ => readTsFile.close() }
        log.info("task Id: " + taskContext.taskAttemptId() + " partition Id: " +
          taskContext.partitionId())
      }
      }

      if (options.getOrElse(DefaultSource.isNarrowForm, "").equals("narrow_form")) {
        val deviceNames = reader.getAllDevices.map(deviceID => deviceID.asInstanceOf[PlainDeviceID].toStringID)

        val measurementNames = new java.util.HashSet[String]()

        requiredSchema.foreach((field: StructField) => {
          if (field.name != QueryConstant.RESERVED_TIME
            && field.name != NarrowConverter.DEVICE_NAME) {
            measurementNames += field.name
          }
        })

        // construct queryExpression based on queriedSchema and filters
        val queryExpressions = NarrowConverter.toQueryExpression(dataSchema, deviceNames,
          measurementNames, filters, reader, file.start.asInstanceOf[java.lang.Long],
          (file.start + file.length).asInstanceOf[java.lang.Long])

        val queryDataSets = Executor.query(readTsFile, queryExpressions,
          file.start.asInstanceOf[java.lang.Long],
          (file.start + file.length).asInstanceOf[java.lang.Long])

        var queryDataSet: QueryDataSet = null
        var deviceName: String = null

        def queryNext(): Boolean = {
          if (queryDataSet != null && queryDataSet.hasNext) {
            return true
          }

          if (queryDataSets.isEmpty) {
            return false
          }

          queryDataSet = queryDataSets.remove(queryDataSets.size() - 1)
          while (!queryDataSet.hasNext) {
            if (queryDataSets.isEmpty) {
              return false
            }
            queryDataSet = queryDataSets.remove(queryDataSets.size() - 1)
          }
          deviceName = queryDataSet.getPaths.get(0).getDeviceString
          true
        }

        new Iterator[InternalRow] {
          private val rowBuffer = Array.fill[Any](requiredSchema.length)(null)

          private val safeDataRow = new GenericRow(rowBuffer)

          // Used to convert `Row`s containing data columns into `InternalRow`s.
          private val encoderForDataColumns = RowEncoder(requiredSchema)

          override def hasNext: Boolean = {
            queryNext()
          }

          override def next(): InternalRow = {
            val curRecord = queryDataSet.next()
            val fields = curRecord.getFields
            val paths = queryDataSet.getPaths

            //index in one required row
            var index = 0
            requiredSchema.foreach((field: StructField) => {
              if (field.name == QueryConstant.RESERVED_TIME) {
                rowBuffer(index) = curRecord.getTimestamp
              }
              else if (field.name == NarrowConverter.DEVICE_NAME) {
                rowBuffer(index) = deviceName
              }
              else {
                val pos = paths.indexOf(new org.apache.tsfile.read.common.Path(deviceName,
                  field.name, true))
                var curField: Field = null
                if (pos != -1) {
                  curField = fields.get(pos)
                }
                rowBuffer(index) = NarrowConverter.toSqlValue(curField)
              }
              index += 1
            })

            encoderForDataColumns.toRow(safeDataRow)
          }
        }
      }
      else {
        // get queriedSchema from requiredSchema
        var queriedSchema = WideConverter.prepSchema(requiredSchema, tsFileMetaData, reader)

        // construct queryExpression based on queriedSchema and filters
        val queryExpression = WideConverter.toQueryExpression(queriedSchema, filters)


        val queryDataSet = readTsFile.query(queryExpression,
          file.start.asInstanceOf[java.lang.Long],
          (file.start + file.length).asInstanceOf[java.lang.Long])

        new Iterator[InternalRow] {
          private val rowBuffer = Array.fill[Any](requiredSchema.length)(null)

          private val safeDataRow = new GenericRow(rowBuffer)

          // Used to convert `Row`s containing data columns into `InternalRow`s.
          private val encoderForDataColumns = RowEncoder(requiredSchema)

          override def hasNext: Boolean = {
            val hasNext = queryDataSet.hasNext
            hasNext
          }

          override def next(): InternalRow = {

            val curRecord = queryDataSet.next()
            val fields = curRecord.getFields
            val paths = queryDataSet.getPaths

            //index in one required row
            var index = 0
            requiredSchema.foreach((field: StructField) => {
              if (field.name == QueryConstant.RESERVED_TIME) {
                rowBuffer(index) = curRecord.getTimestamp
              } else {
                val pos = paths.indexOf(new org.apache.tsfile.read.common.Path(field.name, true))
                var curField: Field = null
                if (pos != -1) {
                  curField = fields.get(pos)
                }
                rowBuffer(index) = WideConverter.toSqlValue(curField)
              }
              index += 1
            })

            encoderForDataColumns.toRow(safeDataRow)
          }
        }
      }
    }
  }