in airflow-core/src/airflow/api_fastapi/common/parameters.py [0:0]
def to_orm(self, select: Select) -> Select:
if isinstance(self.value, (list, str)) and not self.value and self.skip_none:
return select
if self.value is None and self.skip_none:
return select
if isinstance(self.value, list):
if self.filter_option == FilterOptionEnum.IN:
return select.where(self.attribute.in_(self.value))
if self.filter_option == FilterOptionEnum.NOT_IN:
return select.where(self.attribute.notin_(self.value))
if self.filter_option == FilterOptionEnum.ANY_EQUAL:
conditions = [self.attribute == val for val in self.value]
return select.where(or_(*conditions))
if self.filter_option == FilterOptionEnum.ALL_EQUAL:
conditions = [self.attribute == val for val in self.value]
return select.where(and_(*conditions))
raise HTTPException(
400, f"Invalid filter option {self.filter_option} for list value {self.value}"
)
if self.filter_option == FilterOptionEnum.EQUAL:
return select.where(self.attribute == self.value)
if self.filter_option == FilterOptionEnum.NOT_EQUAL:
return select.where(self.attribute != self.value)
if self.filter_option == FilterOptionEnum.LESS_THAN:
return select.where(self.attribute < self.value)
if self.filter_option == FilterOptionEnum.LESS_THAN_EQUAL:
return select.where(self.attribute <= self.value)
if self.filter_option == FilterOptionEnum.GREATER_THAN:
return select.where(self.attribute > self.value)
if self.filter_option == FilterOptionEnum.GREATER_THAN_EQUAL:
return select.where(self.attribute >= self.value)
if self.filter_option == FilterOptionEnum.IS_NONE:
if self.value is None:
return select
if self.value is False:
return select.where(self.attribute.is_not(None))
if self.value is True:
return select.where(self.attribute.is_(None))
raise ValueError(f"Invalid filter option {self.filter_option} for value {self.value}")