def createRelation()

in hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala [276:418]


  def createRelation(sqlContext: SQLContext,
                     metaClient: HoodieTableMetaClient,
                     schema: StructType,
                     globPaths: Seq[StoragePath],
                     parameters: Map[String, String]): BaseRelation = {
    val tableType = metaClient.getTableType
    val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
    val queryType = SparkConfigUtils.getStringWithAltKeys(parameters, QUERY_TYPE)
    val isCdcQuery = queryType == QUERY_TYPE_INCREMENTAL_OPT_VAL &&
      parameters.get(INCREMENTAL_FORMAT.key).contains(INCREMENTAL_FORMAT_CDC_VAL)

    val createTimeLineRln = parameters.get(DataSourceReadOptions.CREATE_TIMELINE_RELATION.key())
    val createFSRln = parameters.get(DataSourceReadOptions.CREATE_FILESYSTEM_RELATION.key())

    if (createTimeLineRln.isDefined) {
      new TimelineRelation(sqlContext, parameters, metaClient)
    } else if (createFSRln.isDefined) {
      new FileSystemRelation(sqlContext, parameters, metaClient)
    } else {
      log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType")

      // NOTE: In cases when Hive Metastore is used as catalog and the table is partitioned, schema in the HMS might contain
      //       Hive-specific partitioning columns created specifically for HMS to handle partitioning appropriately. In that
      //       case  we opt in to not be providing catalog's schema, and instead force Hudi relations to fetch the schema
      //       from the table itself
      val userSchema = if (isUsingHiveCatalog(sqlContext.sparkSession)) {
        None
      } else {
        Option(schema)
      }

      val useNewParquetFileFormat = parameters.getOrElse(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
        HoodieReaderConfig.FILE_GROUP_READER_ENABLED.defaultValue().toString).toBoolean &&
        !metaClient.isMetadataTable && (globPaths == null || globPaths.isEmpty)
      if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) {
        new EmptyRelation(sqlContext, resolveSchema(metaClient, parameters, Some(schema)))
      } else if (isCdcQuery) {
        if (useNewParquetFileFormat) {
          if (tableType == COPY_ON_WRITE) {
            new HoodieCopyOnWriteCDCHadoopFsRelationFactory(
              sqlContext, metaClient, parameters, userSchema, isBootstrap = false).build()
          } else {
            new HoodieMergeOnReadCDCHadoopFsRelationFactory(
              sqlContext, metaClient, parameters, userSchema, isBootstrap = false).build()
          }
        } else {
          CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
        }
      } else {

        (tableType, queryType, isBootstrappedTable) match {
          case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
               (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
               (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
            if (useNewParquetFileFormat) {
              new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
                sqlContext, metaClient, parameters, userSchema, isBootstrap = false).build()
            } else {
              resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
            }
          case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
            if (SparkConfigUtils.containsConfigProperty(parameters, INCREMENTAL_READ_TABLE_VERSION)) {
              val writeTableVersion = Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
              if (writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()) {
                if (useNewParquetFileFormat) {
                  new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
                    sqlContext, metaClient, parameters, userSchema, isBootstrappedTable).build()
                } else {
                  new IncrementalRelationV2(sqlContext, parameters, userSchema, metaClient, RangeType.CLOSED_CLOSED)
                }
              } else {
                new IncrementalRelationV1(sqlContext, parameters, userSchema, metaClient)
              }
            } else {
              if (metaClient.getTableConfig.getTableVersion.versionCode() >= HoodieTableVersion.EIGHT.versionCode()) {
                if (useNewParquetFileFormat) {
                  new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
                    sqlContext, metaClient, parameters, userSchema, isBootstrappedTable).build()
                } else {
                  new IncrementalRelationV2(sqlContext, parameters, userSchema, metaClient, RangeType.CLOSED_CLOSED)
                }
              } else {
                new IncrementalRelationV1(sqlContext, parameters, userSchema, metaClient)
              }
            }

          case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
            if (useNewParquetFileFormat) {
              new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
                sqlContext, metaClient, parameters, userSchema, isBootstrap = false).build()
            } else {
              new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, globPaths, userSchema)
            }

          case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
            if (useNewParquetFileFormat) {
              new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
                sqlContext, metaClient, parameters, userSchema, isBootstrap = true).build()
            } else {
              HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, metaClient, parameters)
            }

          case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
            if (SparkConfigUtils.containsConfigProperty(parameters, INCREMENTAL_READ_TABLE_VERSION)) {
              val writeTableVersion = Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
              if (writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()) {
                if (useNewParquetFileFormat) {
                  new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
                    sqlContext, metaClient, parameters, userSchema, isBootstrappedTable).build()
                } else {
                  MergeOnReadIncrementalRelationV2(sqlContext, parameters, metaClient, userSchema)
                }
              } else {
                MergeOnReadIncrementalRelationV1(sqlContext, parameters, metaClient, userSchema)
              }
            } else {
              if (metaClient.getTableConfig.getTableVersion.versionCode() >= HoodieTableVersion.EIGHT.versionCode()) {
                if (useNewParquetFileFormat) {
                  new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
                    sqlContext, metaClient, parameters, userSchema, isBootstrappedTable).build()
                } else {
                  MergeOnReadIncrementalRelationV2(sqlContext, parameters, metaClient, userSchema)
                }
              } else {
                MergeOnReadIncrementalRelationV1(sqlContext, parameters, metaClient, userSchema)
              }
            }

          case (_, _, true) =>
            if (useNewParquetFileFormat) {
              new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
                sqlContext, metaClient, parameters, userSchema, isBootstrap = true).build()
            } else {
              resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
            }

          case (_, _, _) =>
            throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," +
              s"isBootstrappedTable: $isBootstrappedTable ")
        }
      }
    }
  }