private def filterPartition()

in spark-datasource-v3.1/src/main/scala/org/apache/spark/sql/odps/reader/OdpsScan.scala [159:244]


  private def filterPartition(odpsPartition: Partition, f: Filter): Boolean = {
    val partitionMap: Map[String, String] = TypesConverter.odpsPartition2SparkMap(odpsPartition)

    f match {
      case EqualTo(attr, value) =>
        val columnValue: Option[String] = partitionMap.get(attr)
        columnValue.exists(TypesConverter.partitionValueEqualTo(_, value))

      case EqualNullSafe(attr, value) =>
        val columnValue = partitionMap.get(attr)
        columnValue.isEmpty && value == Nil

      case LessThan(attr, value) =>
        val columnValue: Option[String] = partitionMap.get(attr)
        columnValue.exists(TypesConverter.partitionValueLessThan(_, value))

      case GreaterThan(attr, value) =>
        val columnValue = partitionMap.get(attr)
        columnValue.exists(TypesConverter.partitionValueGreaterThan(_, value))

      case LessThanOrEqual(attr, value) =>
        val columnValue = partitionMap.get(attr)
        columnValue.exists(TypesConverter.partitionValueLessThanOrEqualTo(_, value))

      case GreaterThanOrEqual(attr, value) =>
        val columnValue = partitionMap.get(attr)
        columnValue.exists(TypesConverter.partitionValueGreaterThanOrEqualTo(_, value))

      case IsNull(attr) =>
        val columnValue = partitionMap.get(attr)
        columnValue.isEmpty || !partitionMap.contains(attr) || columnValue == null

      case IsNotNull(attr) =>
        val columnValue = partitionMap.get(attr)
        columnValue.isDefined && partitionMap.contains(attr) && columnValue != null

      case StringStartsWith(attr, value) =>
        val columnValue = partitionMap.get(attr)
        val targetValue = value.asInstanceOf[String]
        columnValue.exists(_.startsWith(targetValue))

      case StringEndsWith(attr, value) =>
        val columnValue = partitionMap.get(attr)
        val targetValue = value.asInstanceOf[String]
        columnValue.exists(_.endsWith(targetValue))

      case StringContains(attr, value) =>
        val columnValue = partitionMap.get(attr)
        val targetValue = value.asInstanceOf[String]
        columnValue.exists(_.contains(targetValue))

      case In(attr, value) =>
        val columnValue = partitionMap.get(attr)
        if (columnValue.isEmpty) {
          false
        }
        val strValue = columnValue.getOrElse("")
        value.asInstanceOf[Array[Any]]
          .exists(TypesConverter.partitionValueEqualTo(strValue, _))

      case Not(f: Filter) =>
        !filterPartition(odpsPartition, f)

      case Or(f1, f2) =>
        // We can't compile Or filter unless both sub-filters are compiled successfully.
        // It applies too for the following And filter.
        // If we can make sure compileFilter supports all filters, we can remove this check.
        val f1Ret = filterPartition(odpsPartition, f1)
        if (f1Ret) {
          true
        } else {
          filterPartition(odpsPartition, f2)
        }

      case And(f1, f2) =>
        val f1Ret = filterPartition(odpsPartition, f1)

        if (!f1Ret) {
          false
        } else {
          filterPartition(odpsPartition, f2)
        }

      case _ => false
    }
  }