in backend/bms_app/wave/services/waves.py [0:0]
def _get_running_mappings_data(self):
"""Return running mappings data."""
query = db.session.query(OperationDetails, SourceDB, BMSServer, Config) \
.filter(OperationDetails.operation_id == self.curr_op.id) \
.outerjoin(Mapping, OperationDetails.mapping_id == Mapping.id) \
.outerjoin(SourceDB, Mapping.db_id == SourceDB.id) \
.outerjoin(BMSServer, Mapping.bms_id == BMSServer.id) \
.outerjoin(Config)
mappings = {}
secret_names = {}
for op_details, source_db, bms_server, config in query:
db_id = source_db.id
if db_id not in mappings:
mappings[db_id] = {
'server': source_db.server,
'db_id': source_db.id,
'db_name': source_db.db_name,
'db_type': source_db.db_type.value,
'is_deployable': source_db.is_deployable,
'operation_type': op_details.operation_type.value,
'operation_id': op_details.operation_id,
'bms': [],
'is_configured': config.is_configured if config else False,
}
logs_url = generate_target_gcp_logs_link(op_details, bms_server)
mappings[db_id]['bms'].append({
'id': bms_server.id,
'name': bms_server.name,
'operation_status': op_details.status.value,
'operation_step': op_details.step,
'logs_url': logs_url
})
secret_names[db_id] = [bool(bms_server.secret_name)]
add_secret_name_status(mappings, secret_names)
add_aggregated_db_status(mappings)
return list(mappings.values())