in hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV1.scala [150:304]
override def buildScan(): RDD[Row] = {
if (usedSchema == StructType(Nil)) {
// if first commit in a table is an empty commit without schema, return empty RDD here
sqlContext.sparkContext.emptyRDD[Row]
} else {
val regularFileIdToFullPath = mutable.HashMap[String, String]()
var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()
// create Replaced file group
val replacedTimeline = commitsTimelineToReturn.getCompletedReplaceTimeline
val replacedFile = replacedTimeline.getInstants.asScala.flatMap { instant =>
val replaceMetadata = metaClient.getActiveTimeline.readReplaceCommitMetadata(instant)
replaceMetadata.getPartitionToReplaceFileIds.entrySet().asScala.flatMap { entry =>
entry.getValue.asScala.map { e =>
val fullPath = FSUtils.constructAbsolutePath(basePath, entry.getKey).toString
(e, fullPath)
}
}
}.toMap
for (commit <- commitsToReturn) {
val metadata: HoodieCommitMetadata = commitTimeline.readCommitMetadata(commit)
if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.requestedTime) {
metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).asScala.filterNot { case (k, v) =>
replacedFile.contains(k) && v.startsWith(replacedFile(k))
}
} else {
regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).asScala.filterNot { case (k, v) =>
replacedFile.contains(k) && v.startsWith(replacedFile(k))
}
}
}
if (metaBootstrapFileIdToFullPath.nonEmpty) {
// filer out meta bootstrap files that have had more commits since metadata bootstrap
metaBootstrapFileIdToFullPath = metaBootstrapFileIdToFullPath
.filterNot(fileIdFullPath => regularFileIdToFullPath.contains(fileIdFullPath._1))
}
val pathGlobPattern = optParams.getOrElse(
DataSourceReadOptions.INCR_PATH_GLOB.key,
DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)
val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = {
if (!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) {
val globMatcher = new GlobPattern("*" + pathGlobPattern)
(regularFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values,
metaBootstrapFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values)
} else {
(regularFileIdToFullPath.values, metaBootstrapFileIdToFullPath.values)
}
}
// pass internalSchema to hadoopConf, so it can be used in executors.
val fileNameGenerator = metaClient.getInstantFileNameGenerator
val validCommits = metaClient
.getCommitsAndCompactionTimeline.filterCompletedInstants.getInstantsAsStream.toArray()
.map(e => fileNameGenerator.getFileName(e.asInstanceOf[HoodieInstant])).mkString(",")
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema))
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, metaClient.getBasePath.toString)
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits)
val formatClassName = metaClient.getTableConfig.getBaseFileFormat match {
case HoodieFileFormat.PARQUET => LegacyHoodieParquetFileFormat.FILE_FORMAT_ID
case HoodieFileFormat.ORC => "orc"
}
// Fallback to full table scan if any of the following conditions matches:
// 1. the start commit is archived
// 2. the end commit is archived
// 3. there are files in metadata be deleted
val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.key, "false").toBoolean
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
val startInstantTime = optParams(DataSourceReadOptions.START_COMMIT.key)
val startInstantArchived = commitTimeline.isBeforeTimelineStarts(startInstantTime)
val endInstantTime = optParams.getOrElse(DataSourceReadOptions.END_COMMIT.key(), lastInstant.requestedTime)
val endInstantArchived = commitTimeline.isBeforeTimelineStarts(endInstantTime)
val scanDf = if (fallbackToFullTableScan && (startInstantArchived || endInstantArchived)) {
if (hollowCommitHandling == USE_TRANSITION_TIME) {
throw new HoodieException("Cannot use stateTransitionTime while enables full table scan")
}
log.info(s"Falling back to full table scan as startInstantArchived: $startInstantArchived, endInstantArchived: $endInstantArchived")
fullTableScanDataFrame(startInstantTime, endInstantTime)
} else {
if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) {
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
} else {
log.info("Additional Filters to be applied to incremental source are :" + filters.mkString("Array(", ", ", ")"))
var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
var doFullTableScan = false
if (fallbackToFullTableScan) {
val timer = HoodieTimer.start
val allFilesToCheck = filteredMetaBootstrapFullPaths ++ filteredRegularFullPaths
val storageConf = HadoopFSUtils.getStorageConfWithCopy(sqlContext.sparkContext.hadoopConfiguration)
val localBasePathStr = basePath.toString
val firstNotFoundPath = sqlContext.sparkContext.parallelize(allFilesToCheck.toSeq, allFilesToCheck.size)
.map(path => {
val storage = HoodieStorageUtils.getStorage(localBasePathStr, storageConf)
storage.exists(new StoragePath(path))
}).collect().find(v => !v)
val timeTaken = timer.endTimer()
log.info("Checking if paths exists took " + timeTaken + "ms")
if (firstNotFoundPath.isDefined) {
doFullTableScan = true
log.info("Falling back to full table scan as some files cannot be found.")
}
}
if (doFullTableScan) {
fullTableScanDataFrame(startInstantTime, endInstantTime)
} else {
if (metaBootstrapFileIdToFullPath.nonEmpty) {
df = sqlContext.sparkSession.read
.format("hudi_v1")
.schema(usedSchema)
.option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(","))
// Setting time to the END_INSTANT_TIME, to avoid pathFilter filter out files incorrectly.
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key(), endInstantTime)
.load()
}
if (regularFileIdToFullPath.nonEmpty) {
try {
df = df.union(sqlContext.read.options(sOpts)
.schema(usedSchema).format(formatClassName)
// Setting time to the END_INSTANT_TIME, to avoid pathFilter filter out files incorrectly.
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key(), endInstantTime)
.load(filteredRegularFullPaths.toList: _*)
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.head.requestedTime))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.last.requestedTime)))
} catch {
case e: AnalysisException =>
if (e.getMessage.contains("Path does not exist")) {
throw new HoodieIncrementalPathNotFoundException(e)
} else {
throw e
}
}
}
df
}
}
}
filters.foldLeft(scanDf)((e, f) => e.filter(f)).rdd
}
}