in backend/bms_app/wave/views.py [0:0]
def get_operation_details(wave_id, operation_id):
"""Return details/histories of the particular operation."""
query = db.session.query(OperationDetails, SourceDB, BMSServer) \
.join(Mapping, OperationDetails.mapping_id == Mapping.id) \
.join(SourceDB, Mapping.db_id == SourceDB.id) \
.join(BMSServer, Mapping.bms_id == BMSServer.id) \
.filter(OperationDetails.operation_id == operation_id) \
.all()
response = {}
for op_details, source_db, bms_server in query:
db_id = source_db.id
if db_id not in response:
response[db_id] = {
'status': op_details.status.value,
'operation_id': op_details.operation_id,
'operation_type': op_details.operation_type.value,
'wave_id': op_details.wave_id,
'source_db': {
'source_hostname': source_db.server,
'db_name': source_db.db_name,
'db_type': source_db.db_type.value,
'is_rac': source_db.db_type == SourceDBType.RAC,
'oracle_version': source_db.oracle_version,
},
'bms': []
}
logs_url = generate_target_gcp_logs_link(op_details, bms_server)
response[db_id]['bms'].append({
'id': bms_server.id,
'name': bms_server.name,
'logs_url': logs_url,
'started_at': op_details.started_at.strftime("%a, %d %b %Y %H:%M:%S GMT"),
'completed_at': op_details.completed_at.strftime("%a, %d %b %Y %H:%M:%S GMT"),
'step': op_details.step,
'mapping_id': op_details.mapping_id
})
return {'data': list(response.values())}