in integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala [461:606]
override def internalCompute(split: Partition, context: TaskContext): Iterator[T] = {
val queryStartTime = System.currentTimeMillis
val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
if (null == carbonPropertiesFilePath) {
System.setProperty("carbon.properties.filepath",
System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties"
)
}
val executionId = context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
val taskId = split.index
val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
val attemptContext = new TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)
val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
TaskMetricsMap.getInstance().registerThreadCallback()
inputMetricsStats.initBytesReadCallback(context, inputSplit, inputMetricsInterval)
val iterator = if (inputSplit.getAllSplits.size() > 0) {
val model = format.createQueryModel(inputSplit, attemptContext, indexFilter)
// one query id per table
model.setQueryId(queryId)
val timeStampProjectionColumns = getTimeStampProjectionColumns(model.getProjectionColumns)
// get RecordReader by FileFormat
var reader: RecordReader[Void, Object] =
if (inputSplit.getFileFormat.equals(FileFormat.ROW_V1)) {
// create record reader for row format
DataTypeUtil.setDataTypeConverter(dataTypeConverterClz.newInstance())
val inputFormat = new CarbonStreamInputFormat
inputFormat.setIsVectorReader(vectorReader)
inputFormat.setInputMetricsStats(inputMetricsStats)
model.setStatisticsRecorder(
CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId))
inputFormat.setModel(model)
val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext)
.asInstanceOf[RecordReader[Void, Object]]
streamReader
} else {
// create record reader for CarbonData file format
if (vectorReader) {
model.setDirectVectorFill(directFill)
val carbonRecordReader = createVectorizedCarbonRecordReader(model,
inputMetricsStats,
"true")
if (carbonRecordReader == null) {
new CarbonRecordReader(model,
format.getReadSupportClass(attemptContext.getConfiguration),
inputMetricsStats,
attemptContext.getConfiguration)
} else {
carbonRecordReader
}
} else {
new CarbonRecordReader(model,
format.getReadSupportClass(attemptContext.getConfiguration),
inputMetricsStats, attemptContext.getConfiguration)
}
}
val closeReader = () => {
if (reader != null) {
try {
reader.close()
} catch {
case e: Exception =>
LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(e)
}
reader = null
}
}
// create a statistics recorder
val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId())
model.setStatisticsRecorder(recorder)
new Iterator[Any] {
private var havePair = false
private var finished = false
private var first = true
override def hasNext: Boolean = {
if (context.isInterrupted) {
throw new TaskKilledException
}
if (first) {
first = false
addTaskCompletionListener(
split,
context,
queryStartTime,
executionId,
taskId,
model,
reader)
// initialize the reader
reader.initialize(inputSplit, attemptContext)
}
if (!finished && !havePair) {
finished = !reader.nextKeyValue
havePair = !finished
}
if (finished) {
closeReader.apply()
}
!finished
}
override def next(): Any = {
if (!hasNext) {
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
val value = reader.getCurrentValue
if (CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_SPARK_VERSION_SPARK3,
CarbonCommonConstants.CARBON_SPARK_VERSION_SPARK3_DEFAULT).toBoolean &&
timeStampProjectionColumns.nonEmpty) {
value match {
case row: GenericInternalRow if needRebaseTimeValue(reader) =>
// rebase timestamp data by converting julian to Gregorian time
timeStampProjectionColumns.foreach {
projectionColumnWithIndex =>
val timeStampData = row.get(projectionColumnWithIndex._2,
org.apache.spark.sql.types.DataTypes.TimestampType)
if (null != timeStampData) {
row.update(projectionColumnWithIndex._2,
CarbonToSparkAdapter.rebaseTime(timeStampData.asInstanceOf[Long]))
}
}
case _ =>
}
}
value
}
}
} else {
new Iterator[Any] {
override def hasNext: Boolean = false
override def next(): Any = throw new java.util.NoSuchElementException("End of stream")
}
}
iterator.asInstanceOf[Iterator[T]]
}