in src/SimpleReplay/replay.py [0:0]
def parse_connections(workload_directory, time_interval_between_transactions, time_interval_between_queries):
connections = []
# total number of connections before filters are applied
total_connections = 0
if workload_directory.startswith("s3://"):
workload_s3_location = workload_directory[5:].partition("/")
bucket_name = workload_s3_location[0]
prefix = workload_s3_location[2]
s3_object = client("s3").get_object(
Bucket=bucket_name, Key=prefix + "/connections.json"
)
connections_json = json.loads(s3_object["Body"].read())
else:
connections_file = open(workload_directory + "/connections.json", "r")
connections_json = json.loads(connections_file.read())
connections_file.close()
for connection_json in connections_json:
is_time_interval_between_transactions = {
"": connection_json["time_interval_between_transactions"],
"all on": True,
"all off": False,
}[time_interval_between_transactions]
is_time_interval_between_queries = {
"": connection_json["time_interval_between_queries"],
"all on": "all on",
"all off": "all off",
}[time_interval_between_queries]
try:
if connection_json["session_initiation_time"]:
session_initiation_time = dateutil.parser.isoparse(
connection_json["session_initiation_time"]
).replace(tzinfo=datetime.timezone.utc)
else:
session_initiation_time = None
if connection_json["disconnection_time"]:
disconnection_time = dateutil.parser.isoparse(
connection_json["disconnection_time"]
).replace(tzinfo=datetime.timezone.utc)
else:
disconnection_time = None
connection_key = f'{connection_json["database_name"]}_{connection_json["username"]}_{connection_json["pid"]}'
connection = ConnectionLog(
session_initiation_time,
disconnection_time,
connection_json["application_name"],
connection_json["database_name"],
connection_json["username"],
connection_json["pid"],
is_time_interval_between_transactions,
is_time_interval_between_queries,
connection_key,
)
if matches_filters(connection, g_config['filters']):
connections.append(connection)
total_connections += 1
except Exception as err:
logger.error(f"Could not parse connection: \n{str(connection_json)}\n{err}")
connections.sort(
key=lambda connection: connection.session_initiation_time
or datetime.datetime.utcfromtimestamp(0).replace(tzinfo=datetime.timezone.utc)
)
return (connections, total_connections)