in backend/bms_app/wave/services/waves.py [0:0]
def _get_mappings_data(self):
"""Return info and last operation for each db mapping."""
mappings_data = {}
secret_names = {}
# get latest operation id per mapping/bms_target
subq = db.session.query(OperationDetails) \
.with_entities(func.max(OperationDetails.id)) \
.filter(OperationDetails.wave_id == self.wave_id) \
.group_by(OperationDetails.mapping_id)
# get latest operation details/history for these mappings
joinq = db.session.query(OperationDetails) \
.filter(OperationDetails.id.in_(subq)) \
.subquery()
query = db.session.query(Mapping, SourceDB, BMSServer, joinq) \
.outerjoin(SourceDB) \
.outerjoin(BMSServer) \
.outerjoin(Config) \
.with_entities(SourceDB, BMSServer, Config.is_configured,
joinq.c.operation_type, joinq.c.status,
joinq.c.operation_id, joinq.c.step) \
.outerjoin(joinq, Mapping.id == joinq.c.mapping_id) \
.filter(SourceDB.wave_id == self.wave_id)
for row in query:
source_db, bms_server, is_configured, last_op_type, \
last_op_status, last_op_id, last_op_step = row
db_id = source_db.id
if db_id not in mappings_data:
mappings_data[db_id] = {
'server': source_db.server,
'db_id': source_db.id,
'db_name': source_db.db_name,
'db_type': source_db.db_type.value,
'is_deployable': source_db.is_deployable,
'operation_type': last_op_type.value if last_op_type else None,
'operation_status': '',
'operation_id': last_op_id,
'bms': [],
'is_configured': is_configured if is_configured is not None else False,
}
secret_names[db_id] = [bool(bms_server.secret_name)]
mappings_data[db_id]['bms'].append({
'bms_id': bms_server.id,
'bms_name': bms_server.name,
'operation_status': last_op_status.value if last_op_status else None,
'operation_step': last_op_step,
})
add_secret_name_status(mappings_data, secret_names)
add_aggregated_db_status(mappings_data)
dms_auto_mappings = self._get_dms_auto_mappings()
return list(mappings_data.values()) + dms_auto_mappings