in spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialTemporalFilterPushDownForStacScan.scala [111:173]
private def translateToTemporalFilter(
predicate: Expression,
pushableColumn: PushableColumnBase): Option[TemporalFilter] = {
predicate match {
case And(left, right) =>
val temporalFilterLeft = translateToTemporalFilter(left, pushableColumn)
val temporalFilterRight = translateToTemporalFilter(right, pushableColumn)
(temporalFilterLeft, temporalFilterRight) match {
case (Some(l), Some(r)) => Some(TemporalFilter.AndFilter(l, r))
case (Some(l), None) => Some(l)
case (None, Some(r)) => Some(r)
case _ => None
}
case Or(left, right) =>
for {
temporalFilterLeft <- translateToTemporalFilter(left, pushableColumn)
temporalFilterRight <- translateToTemporalFilter(right, pushableColumn)
} yield TemporalFilter.OrFilter(temporalFilterLeft, temporalFilterRight)
case LessThan(pushableColumn(name), Literal(v, TimestampType)) =>
Some(
TemporalFilter
.LessThanFilter(
unquote(name),
LocalDateTime
.ofInstant(Instant.ofEpochMilli(v.asInstanceOf[Long] / 1000), ZoneOffset.UTC)))
case LessThanOrEqual(pushableColumn(name), Literal(v, TimestampType)) =>
Some(
TemporalFilter
.LessThanFilter(
unquote(name),
LocalDateTime
.ofInstant(Instant.ofEpochMilli(v.asInstanceOf[Long] / 1000), ZoneOffset.UTC)))
case GreaterThan(pushableColumn(name), Literal(v, TimestampType)) =>
Some(
TemporalFilter
.GreaterThanFilter(
unquote(name),
LocalDateTime
.ofInstant(Instant.ofEpochMilli(v.asInstanceOf[Long] / 1000), ZoneOffset.UTC)))
case GreaterThanOrEqual(pushableColumn(name), Literal(v, TimestampType)) =>
Some(
TemporalFilter
.GreaterThanFilter(
unquote(name),
LocalDateTime
.ofInstant(Instant.ofEpochMilli(v.asInstanceOf[Long] / 1000), ZoneOffset.UTC)))
case EqualTo(pushableColumn(name), Literal(v, TimestampType)) =>
Some(
TemporalFilter
.EqualFilter(
unquote(name),
LocalDateTime
.ofInstant(Instant.ofEpochMilli(v.asInstanceOf[Long] / 1000), ZoneOffset.UTC)))
case _ => None
}
}