in hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala [404:459]
private def rebuildFilterFromParquet(oldFilter: Filter, fileSchema: InternalSchema, querySchema: InternalSchema): Filter = {
if (fileSchema == null || querySchema == null) {
oldFilter
} else {
oldFilter match {
case eq: EqualTo =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(eq.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else eq.copy(attribute = newAttribute)
case eqs: EqualNullSafe =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(eqs.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else eqs.copy(attribute = newAttribute)
case gt: GreaterThan =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(gt.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else gt.copy(attribute = newAttribute)
case gtr: GreaterThanOrEqual =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(gtr.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else gtr.copy(attribute = newAttribute)
case lt: LessThan =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(lt.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else lt.copy(attribute = newAttribute)
case lte: LessThanOrEqual =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(lte.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else lte.copy(attribute = newAttribute)
case i: In =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(i.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else i.copy(attribute = newAttribute)
case isn: IsNull =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(isn.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else isn.copy(attribute = newAttribute)
case isnn: IsNotNull =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(isnn.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else isnn.copy(attribute = newAttribute)
case And(left, right) =>
And(rebuildFilterFromParquet(left, fileSchema, querySchema), rebuildFilterFromParquet(right, fileSchema, querySchema))
case Or(left, right) =>
Or(rebuildFilterFromParquet(left, fileSchema, querySchema), rebuildFilterFromParquet(right, fileSchema, querySchema))
case Not(child) =>
Not(rebuildFilterFromParquet(child, fileSchema, querySchema))
case ssw: StringStartsWith =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(ssw.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else ssw.copy(attribute = newAttribute)
case ses: StringEndsWith =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(ses.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else ses.copy(attribute = newAttribute)
case sc: StringContains =>
val newAttribute = InternalSchemaUtils.reBuildFilterName(sc.attribute, fileSchema, querySchema)
if (newAttribute.isEmpty) AlwaysTrue else sc.copy(attribute = newAttribute)
case AlwaysTrue =>
AlwaysTrue
case AlwaysFalse =>
AlwaysFalse
case _ =>
AlwaysTrue
}
}
}