override def internalCompute()

in integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala [461:606]


  override def internalCompute(split: Partition, context: TaskContext): Iterator[T] = {
    val queryStartTime = System.currentTimeMillis
    val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
    if (null == carbonPropertiesFilePath) {
      System.setProperty("carbon.properties.filepath",
        System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties"
      )
    }
    val executionId = context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
    val taskId = split.index
    val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
    val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)
    val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
    val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
    TaskMetricsMap.getInstance().registerThreadCallback()
    inputMetricsStats.initBytesReadCallback(context, inputSplit, inputMetricsInterval)
    val iterator = if (inputSplit.getAllSplits.size() > 0) {
      val model = format.createQueryModel(inputSplit, attemptContext, indexFilter)
      // one query id per table
      model.setQueryId(queryId)

      val timeStampProjectionColumns = getTimeStampProjectionColumns(model.getProjectionColumns)
      // get RecordReader by FileFormat

      var reader: RecordReader[Void, Object] =
        if (inputSplit.getFileFormat.equals(FileFormat.ROW_V1)) {
          // create record reader for row format
          DataTypeUtil.setDataTypeConverter(dataTypeConverterClz.newInstance())
          val inputFormat = new CarbonStreamInputFormat
          inputFormat.setIsVectorReader(vectorReader)
          inputFormat.setInputMetricsStats(inputMetricsStats)
          model.setStatisticsRecorder(
            CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId))
          inputFormat.setModel(model)
          val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
            .asInstanceOf[RecordReader[Void, Object]]
          streamReader
        } else {
          // create record reader for CarbonData file format
          if (vectorReader) {
            model.setDirectVectorFill(directFill)
            val carbonRecordReader = createVectorizedCarbonRecordReader(model,
              inputMetricsStats,
              "true")
            if (carbonRecordReader == null) {
              new CarbonRecordReader(model,
                format.getReadSupportClass(attemptContext.getConfiguration),
                inputMetricsStats,
                attemptContext.getConfiguration)
            } else {
              carbonRecordReader
            }
          } else {
            new CarbonRecordReader(model,
              format.getReadSupportClass(attemptContext.getConfiguration),
              inputMetricsStats, attemptContext.getConfiguration)
          }
        }

      val closeReader = () => {
        if (reader != null) {
          try {
            reader.close()
          } catch {
            case e: Exception =>
              LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(e)
          }
          reader = null
        }
      }

      // create a statistics recorder
      val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId())
      model.setStatisticsRecorder(recorder)

      new Iterator[Any] {
        private var havePair = false
        private var finished = false
        private var first = true

        override def hasNext: Boolean = {
          if (context.isInterrupted) {
            throw new TaskKilledException
          }
          if (first) {
            first = false
            addTaskCompletionListener(
              split,
              context,
              queryStartTime,
              executionId,
              taskId,
              model,
              reader)
            // initialize the reader
            reader.initialize(inputSplit, attemptContext)
          }
          if (!finished && !havePair) {
            finished = !reader.nextKeyValue
            havePair = !finished
          }
          if (finished) {
            closeReader.apply()
          }
          !finished
        }

        override def next(): Any = {
          if (!hasNext) {
            throw new java.util.NoSuchElementException("End of stream")
          }
          havePair = false
          val value = reader.getCurrentValue
          if (CarbonProperties.getInstance()
                .getProperty(CarbonCommonConstants.CARBON_SPARK_VERSION_SPARK3,
                  CarbonCommonConstants.CARBON_SPARK_VERSION_SPARK3_DEFAULT).toBoolean &&
              timeStampProjectionColumns.nonEmpty) {
            value match {
              case row: GenericInternalRow if needRebaseTimeValue(reader) =>
                // rebase timestamp data by converting julian to Gregorian time
                timeStampProjectionColumns.foreach {
                  projectionColumnWithIndex =>
                    val timeStampData = row.get(projectionColumnWithIndex._2,
                      org.apache.spark.sql.types.DataTypes.TimestampType)
                    if (null != timeStampData) {
                      row.update(projectionColumnWithIndex._2,
                        CarbonToSparkAdapter.rebaseTime(timeStampData.asInstanceOf[Long]))
                    }
                }
              case _ =>
            }
          }
          value
        }

      }
    } else {
      new Iterator[Any] {
        override def hasNext: Boolean = false

        override def next(): Any = throw new java.util.NoSuchElementException("End of stream")
      }
    }

    iterator.asInstanceOf[Iterator[T]]
  }