in src/translation/dags/translation_utils/csv_utils.py [0:0]
def rules_filter(rules):
"""
creates an error row filter based on
rules specs
"""
if not isinstance(rules, list) or not rules:
logging.info("returning None error row filter. no rules supplied")
return None
logging.info(f"creating error row filter for rules: {rules}")
def filter(row):
for rule in rules:
field = rule["field"]
matchType = rule["matchType"] if "matchType" in rule else "equals"
caseSensitive = rule["caseSensitive"] if "caseSensitive" in rule else False
value = rule["value"]
fieldValue = row[field]
if not caseSensitive:
value = value.casefold()
fieldValue = fieldValue.casefold()
if field in row and matchers[matchType](fieldValue, value):
return True
return False
return filter