in spark-datasource-v3.1/src/main/scala/org/apache/spark/sql/odps/reader/OdpsScan.scala [159:244]
private def filterPartition(odpsPartition: Partition, f: Filter): Boolean = {
val partitionMap: Map[String, String] = TypesConverter.odpsPartition2SparkMap(odpsPartition)
f match {
case EqualTo(attr, value) =>
val columnValue: Option[String] = partitionMap.get(attr)
columnValue.exists(TypesConverter.partitionValueEqualTo(_, value))
case EqualNullSafe(attr, value) =>
val columnValue = partitionMap.get(attr)
columnValue.isEmpty && value == Nil
case LessThan(attr, value) =>
val columnValue: Option[String] = partitionMap.get(attr)
columnValue.exists(TypesConverter.partitionValueLessThan(_, value))
case GreaterThan(attr, value) =>
val columnValue = partitionMap.get(attr)
columnValue.exists(TypesConverter.partitionValueGreaterThan(_, value))
case LessThanOrEqual(attr, value) =>
val columnValue = partitionMap.get(attr)
columnValue.exists(TypesConverter.partitionValueLessThanOrEqualTo(_, value))
case GreaterThanOrEqual(attr, value) =>
val columnValue = partitionMap.get(attr)
columnValue.exists(TypesConverter.partitionValueGreaterThanOrEqualTo(_, value))
case IsNull(attr) =>
val columnValue = partitionMap.get(attr)
columnValue.isEmpty || !partitionMap.contains(attr) || columnValue == null
case IsNotNull(attr) =>
val columnValue = partitionMap.get(attr)
columnValue.isDefined && partitionMap.contains(attr) && columnValue != null
case StringStartsWith(attr, value) =>
val columnValue = partitionMap.get(attr)
val targetValue = value.asInstanceOf[String]
columnValue.exists(_.startsWith(targetValue))
case StringEndsWith(attr, value) =>
val columnValue = partitionMap.get(attr)
val targetValue = value.asInstanceOf[String]
columnValue.exists(_.endsWith(targetValue))
case StringContains(attr, value) =>
val columnValue = partitionMap.get(attr)
val targetValue = value.asInstanceOf[String]
columnValue.exists(_.contains(targetValue))
case In(attr, value) =>
val columnValue = partitionMap.get(attr)
if (columnValue.isEmpty) {
false
}
val strValue = columnValue.getOrElse("")
value.asInstanceOf[Array[Any]]
.exists(TypesConverter.partitionValueEqualTo(strValue, _))
case Not(f: Filter) =>
!filterPartition(odpsPartition, f)
case Or(f1, f2) =>
// We can't compile Or filter unless both sub-filters are compiled successfully.
// It applies too for the following And filter.
// If we can make sure compileFilter supports all filters, we can remove this check.
val f1Ret = filterPartition(odpsPartition, f1)
if (f1Ret) {
true
} else {
filterPartition(odpsPartition, f2)
}
case And(f1, f2) =>
val f1Ret = filterPartition(odpsPartition, f1)
if (!f1Ret) {
false
} else {
filterPartition(odpsPartition, f2)
}
case _ => false
}
}