in data_validation/cli_tools.py [0:0]
def _get_result_handler(rc_value: str, sa_file=None) -> dict:
"""Returns dict of result handler config. Backwards compatible for JSON input.
rc_value (str): Result config argument specified.
sa_file (str): SA path argument specified.
"""
config = rc_value.split(".", 1)
if len(config) != 2:
raise ValueError(f"Unable to parse result handler config: `{rc_value}`")
# Check if the first part of the result handler is a connection name.
mgr = state_manager.StateManager()
connections = mgr.list_connections()
if config[0] in connections:
# We received connection_name.results_table.
conn_from_file = get_connection(config[0])
if conn_from_file[consts.SOURCE_TYPE] == consts.SOURCE_TYPE_BIGQUERY:
result_handler = {
consts.RH_TYPE: conn_from_file[consts.SOURCE_TYPE],
consts.PROJECT_ID: conn_from_file["project_id"],
consts.TABLE_ID: config[1],
consts.API_ENDPOINT: conn_from_file.get("api_endpoint", None),
}
elif conn_from_file[consts.SOURCE_TYPE] == consts.SOURCE_TYPE_POSTGRES:
result_handler = {
consts.RH_TYPE: conn_from_file[consts.SOURCE_TYPE],
consts.TABLE_ID: config[1],
consts.RH_CONN: conn_from_file,
}
# TODO Add filesytem handler too.
else:
raise exceptions.ResultHandlerException(
f"Unsupported result handler connection type: {conn_from_file[consts.SOURCE_TYPE]}"
)
else:
# We received legacy format "bq-project-name.bq_results_table".
result_handler = {
consts.RH_TYPE: consts.SOURCE_TYPE_BIGQUERY,
consts.PROJECT_ID: config[0],
consts.TABLE_ID: config[1],
}
if sa_file:
result_handler[consts.GOOGLE_SERVICE_ACCOUNT_KEY_PATH] = sa_file
return result_handler