def process_replacements()

in charts/airflow-dags/scripts/replace.py [0:0]


def process_replacements(content: str, replacements: Dict[str, Any]) -> str:
    """Process the file content with the given replacements."""
    result = []
    for line in content.splitlines():
        modified_line = line
        for item in replacements:
            find = item['find']
            replace = item['replace']
            
            if find in line:
                if isinstance(replace, (dict, list)):
                    if '=' in line:
                        var_name, _ = line.split('=', 1)
                        
                        # Convert dictionary values
                        processed_dict = {}
                        for k, v in replace.items():
                            # Handle template variables
                            if isinstance(v, str) and '{{' in v:
                                processed_dict[k] = v  # Keep template syntax intact
                            # Handle boolean values
                            elif isinstance(v, str) and v.lower() in ['true', 'false']:
                                processed_dict[k] = v.lower()
                            else:
                                processed_dict[k] = str(v)
                        
                        # Format as Python code
                        replace_str = json.dumps(processed_dict, indent=2)
                        modified_line = f"{var_name.rstrip()} = {replace_str}"
                    else:
                        modified_line = line.replace(find, json.dumps(replace))
                else:
                    modified_line = line.replace(find, str(replace))
                break
        if modified_line.strip():
            result.append(modified_line)
    return '\n'.join(result)