in src/SimpleReplay/extract.py [0:0]
def main():
global logger
logger = init_logging(logging.INFO)
global g_config
parser = argparse.ArgumentParser()
parser.add_argument(
"config_file",
type=argparse.FileType("r"),
help="Location of extraction config file.",
)
args = parser.parse_args()
g_config = {}
with args.config_file as stream:
try:
g_config = yaml.safe_load(stream)
except yaml.YAMLError as exception:
logger.error(f"Failed to parse extraction config yaml file: {exception}")
exit(-1)
validate_config_file(g_config)
level = logging.getLevelName(g_config.get('log_level', 'INFO').upper())
set_log_level(level)
if g_config.get("logfile_level") != "none":
level = logging.getLevelName(g_config.get('logfile_level', 'DEBUG').upper())
log_file = 'extract.log'
add_logfile(log_file, level=level, preamble=yaml.dump(g_config), backup_count=g_config.get("backup_count", 2))
# print the version
log_version()
interface = load_driver()
if not interface:
logger.error("Failed to load driver.")
exit(-1)
if g_config["source_cluster_endpoint"]:
extraction_name = f'Extraction_{g_config["source_cluster_endpoint"].split(".")[0]}_{datetime.datetime.now().replace(tzinfo=datetime.timezone.utc).isoformat()}'
else:
extraction_name = f"Extraction_{datetime.datetime.now().replace(tzinfo=datetime.timezone.utc).isoformat()}"
if g_config.get("start_time"):
start_time = dateutil.parser.parse(g_config["start_time"]).astimezone(
dateutil.tz.tzutc()
)
else:
start_time = ""
if g_config.get("end_time"):
end_time = dateutil.parser.parse(g_config["end_time"]).astimezone(
dateutil.tz.tzutc()
)
else:
end_time = ""
# read the logs
if g_config.get("log_location"):
log_location = g_config["log_location"]
elif g_config.get("source_cluster_endpoint"):
log_location = get_cluster_log_location(g_config["source_cluster_endpoint"])
else:
logger.error("Either log_location or source_cluster_endpoint must be specified.")
exit(-1)
(connections, audit_logs, databases, last_connections) = get_logs(log_location, start_time, end_time)
logger.debug(f"Found {len(connections)} connection logs, {len(audit_logs)} audit logs")
if(len(audit_logs) == 0 or len(connections) == 0):
logger.warning("No audit logs or connections logs found. Please verify that the audit log location or cluster endpoint is correct. Note, audit logs can take several hours to start appearing in S3 after logging is first enabled.")
exit(-1)
if g_config["source_cluster_endpoint"]:
logger.info(f'Retrieving info from {g_config["source_cluster_endpoint"]}')
source_cluster_urls = get_connection_string(
g_config["source_cluster_endpoint"],
g_config["master_username"],
g_config["odbc_driver"],
)
source_cluster_statement_text_logs = retrieve_source_cluster_statement_text(
source_cluster_urls, databases, start_time, end_time, interface,
)
combine_logs(audit_logs, source_cluster_statement_text_logs)
if (
g_config["source_cluster_system_table_unload_location"]
and g_config["unload_system_table_queries"]
and g_config["source_cluster_system_table_unload_iam_role"]
):
logger.info(
f'Exporting system tables to {g_config["source_cluster_system_table_unload_location"]}'
)
unload_system_table(
source_cluster_urls,
g_config["odbc_driver"],
g_config["unload_system_table_queries"],
g_config["source_cluster_system_table_unload_location"] + "/" + extraction_name,
g_config["source_cluster_system_table_unload_iam_role"],
)
logger.info(
f'Exported system tables to {g_config["source_cluster_system_table_unload_location"]}'
)
save_logs(
audit_logs,
last_connections,
g_config["workload_location"] + "/" + extraction_name,
connections,
start_time,
end_time
)