in processor.py [0:0]
def create_filter(condition: Callable[[ProcessorPart], bool]) -> PartProcessor:
"""Creates a processor that filters parts based on `condition`.
Args:
condition: a part is returned by this processor iff `condition(part)=True`
Returns:
a processor filtering the input stream
"""
async def filter_with_condition(
part: ProcessorPart,
) -> AsyncIterable[ProcessorPart]:
if condition(part):
yield part
return _PartProcessorWrapper(filter_with_condition)