def read_file()

in functions/data-processing-engines/bq-saved-query-executor/main.py [0:0]


def read_file(project_id, location, repository_name, file_path, query_variables):
    """
    Reads a file from a Google Dataform repository and optionally replaces variables.

    Args:
        project_id (str): The Google Cloud project ID.
        location (str): The Dataform repository's location.
        repository_name (str): The name of the Dataform repository.
        file_path (str): The path to the file within the repository.
        query_variables (dict): A dictionary for variable replacement (optional).

    Returns:
        str: The file's contents if successful, otherwise None.
    """
    credentials.refresh(Request())
    headers = {"Authorization": f"Bearer {credentials.token}"}

    url = (f"https://dataform.googleapis.com/v1beta1/projects/{project_id}/"
           f"locations/{location}/repositories/{repository_name}:"
           f"readFile?path={file_path}")
    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        file_contents = base64.b64decode(response.json()["contents"]).decode('utf-8').lstrip("-n")
        if query_variables:
            file_contents = replace_variables(file_contents, query_variables)
        if file_contents.startswith("config"):
            file_contents = file_contents.split("\n", 3)[3]
        return file_contents
    else:
        error_message = f"Dataform API request failed. Status code:{response.status_code}"
        print(error_message)
        print(response.text)
        raise Exception(error_message)