in charts/airflow-dags/scripts/replace.py [0:0]
def process_replacements(content: str, replacements: Dict[str, Any]) -> str:
"""Process the file content with the given replacements."""
result = []
for line in content.splitlines():
modified_line = line
for item in replacements:
find = item['find']
replace = item['replace']
if find in line:
if isinstance(replace, (dict, list)):
if '=' in line:
var_name, _ = line.split('=', 1)
# Convert dictionary values
processed_dict = {}
for k, v in replace.items():
# Handle template variables
if isinstance(v, str) and '{{' in v:
processed_dict[k] = v # Keep template syntax intact
# Handle boolean values
elif isinstance(v, str) and v.lower() in ['true', 'false']:
processed_dict[k] = v.lower()
else:
processed_dict[k] = str(v)
# Format as Python code
replace_str = json.dumps(processed_dict, indent=2)
modified_line = f"{var_name.rstrip()} = {replace_str}"
else:
modified_line = line.replace(find, json.dumps(replace))
else:
modified_line = line.replace(find, str(replace))
break
if modified_line.strip():
result.append(modified_line)
return '\n'.join(result)