def _get_result_handler()

in data_validation/cli_tools.py [0:0]


def _get_result_handler(rc_value: str, sa_file=None) -> dict:
    """Returns dict of result handler config. Backwards compatible for JSON input.

    rc_value (str): Result config argument specified.
    sa_file (str): SA path argument specified.
    """
    config = rc_value.split(".", 1)
    if len(config) != 2:
        raise ValueError(f"Unable to parse result handler config: `{rc_value}`")

    # Check if the first part of the result handler is a connection name.
    mgr = state_manager.StateManager()
    connections = mgr.list_connections()
    if config[0] in connections:
        # We received connection_name.results_table.
        conn_from_file = get_connection(config[0])
        if conn_from_file[consts.SOURCE_TYPE] == consts.SOURCE_TYPE_BIGQUERY:
            result_handler = {
                consts.RH_TYPE: conn_from_file[consts.SOURCE_TYPE],
                consts.PROJECT_ID: conn_from_file["project_id"],
                consts.TABLE_ID: config[1],
                consts.API_ENDPOINT: conn_from_file.get("api_endpoint", None),
            }
        elif conn_from_file[consts.SOURCE_TYPE] == consts.SOURCE_TYPE_POSTGRES:
            result_handler = {
                consts.RH_TYPE: conn_from_file[consts.SOURCE_TYPE],
                consts.TABLE_ID: config[1],
                consts.RH_CONN: conn_from_file,
            }
        # TODO Add filesytem handler too.
        else:
            raise exceptions.ResultHandlerException(
                f"Unsupported result handler connection type: {conn_from_file[consts.SOURCE_TYPE]}"
            )
    else:
        # We received legacy format "bq-project-name.bq_results_table".
        result_handler = {
            consts.RH_TYPE: consts.SOURCE_TYPE_BIGQUERY,
            consts.PROJECT_ID: config[0],
            consts.TABLE_ID: config[1],
        }

    if sa_file:
        result_handler[consts.GOOGLE_SERVICE_ACCOUNT_KEY_PATH] = sa_file

    return result_handler