charts/airflow-dags/scripts/replace.py (47 lines of code) (raw):
#!/usr/bin/env python3
import json
import sys
import os
from typing import Dict, Any
def process_replacements(content: str, replacements: Dict[str, Any]) -> str:
"""Process the file content with the given replacements."""
result = []
for line in content.splitlines():
modified_line = line
for item in replacements:
find = item['find']
replace = item['replace']
if find in line:
if isinstance(replace, (dict, list)):
if '=' in line:
var_name, _ = line.split('=', 1)
# Convert dictionary values
processed_dict = {}
for k, v in replace.items():
# Handle template variables
if isinstance(v, str) and '{{' in v:
processed_dict[k] = v # Keep template syntax intact
# Handle boolean values
elif isinstance(v, str) and v.lower() in ['true', 'false']:
processed_dict[k] = v.lower()
else:
processed_dict[k] = str(v)
# Format as Python code
replace_str = json.dumps(processed_dict, indent=2)
modified_line = f"{var_name.rstrip()} = {replace_str}"
else:
modified_line = line.replace(find, json.dumps(replace))
else:
modified_line = line.replace(find, str(replace))
break
if modified_line.strip():
result.append(modified_line)
return '\n'.join(result)
def main():
input_file = os.environ['INPUT_FILE']
output_file = os.environ['OUTPUT_FILE']
raw_json = os.environ['SEARCH_AND_REPLACE']
print("Raw JSON:", raw_json)
replacements = json.loads(raw_json)
print("Parsed replacements:", replacements)
with open(input_file, 'r') as f:
content = f.read()
processed_content = process_replacements(content, replacements)
with open(output_file, 'w') as f:
f.write(processed_content)
if __name__ == '__main__':
main()