in spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala [59:89]
override def buildScan(): RDD[Row] = buildScan(Array.empty)
// PrunedScan
override def buildScan(requiredColumns: Array[String]): RDD[Row] = buildScan(requiredColumns, Array.empty)
// PrunedFilteredScan
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val paramWithScan = mutable.LinkedHashMap[String, String]() ++ parameters
// filter where clause can be handled by Doris BE
val filterWhereClause: String = {
filters.flatMap(Utils.compileFilter(_, dialect, inValueLengthLimit))
.map(filter => s"($filter)").mkString(" and ")
}
// required columns for column pruner
if (requiredColumns != null && requiredColumns.length > 0) {
paramWithScan += (ConfigurationOptions.DORIS_READ_FIELD ->
requiredColumns.map(Utils.quote).mkString(","))
} else {
paramWithScan += (ConfigurationOptions.DORIS_READ_FIELD ->
lazySchema.fields.map(f => f.name).mkString(","))
}
if (filters != null && filters.length > 0) {
val dorisFilterQuery = cfg.getProperty(ConfigurationOptions.DORIS_FILTER_QUERY, "1=1")
paramWithScan += (ConfigurationOptions.DORIS_FILTER_QUERY -> (dorisFilterQuery + " and " + filterWhereClause))
}
new ScalaDorisRowRDD(sqlContext.sparkContext, paramWithScan.toMap, lazySchema)
}