backend/bms_app/mapping/views.py (99 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from flask import request from marshmallow import Schema, ValidationError, fields from bms_app.mapping import bp from bms_app.mapping.services import ( AddMappingService, EditMappingService, GetMappingsService ) from bms_app.models import Mapping, OperationDetails, SourceDB, db from bms_app.services.source_db import ( clear_bms_target_params, does_db_have_operation ) MAPPING_IS_USED = 'Mapping is already involved in some operation' \ ' and can not be changed' class EditMappingSchema(Schema): db_id = fields.Int(required=True) wave_id = fields.Int(allow_none=True) bms_id = fields.List(fields.Int()) fe_rac_nodes = fields.Int() class CreateMappingSchema(Schema): db_id = fields.Int(required=True) wave_id = fields.Int(allow_none=True) bms_id = fields.List(fields.Int()) fe_rac_nodes = fields.Int() @bp.route('', methods=['GET']) def list_mappings(): """Return all available mappings.""" # get all within current project project_id = request.args.get('project_id', type=int) # get for specific db db_id = request.args.get('db_id', type=int) response = GetMappingsService.run( project_id=project_id, db_id=db_id ) return {'data': response} @bp.route('', methods=['POST']) def add_mapping(): """Add mapping(s).""" validated_data = CreateMappingSchema().load(request.json) AddMappingService.run( db_id=validated_data['db_id'], bms_ids=validated_data['bms_id'], # to keep old schema on FE wave_id=validated_data.get('wave_id'), fe_rac_nodes=validated_data.get('fe_rac_nodes'), ) mappings_data = GetMappingsService.run(db_id=validated_data['db_id']) data = mappings_data[0] if mappings_data else {} return {'data': data}, 201 # @bp.route('/<int:db_id>', methods=['GET']) # def get_mapping(mapping_id): # """Return mapping.""" # # db_id = request.args['db_id', type=int] # # if not db_id: # # raise ValidationError({'db_id': 'required'}) # validated_data = DBIdSchema().load(request.args) # mappings_data = GetMappingsService.run(db_id=validated_data['db_id']) # return {'data': mappings_data[0]}, 200 @bp.route('/<int:mapping_id>/operations', methods=['GET']) def get_mapping_operation(mapping_id): """Return operation's histories for specific mapping.""" mapping_data = [] query = ( db.session.query(Mapping, OperationDetails, SourceDB) .join(OperationDetails) .join(SourceDB) .filter(Mapping.id == mapping_id) ) for mapping, operation_details, source_db in query: mapping_data.append({ 'id': mapping.id, 'db_id': mapping.db_id, 'bms_id': mapping.bms_id, 'wave_id': source_db.wave_id, 'operation': operation_details.operation_type.value, 'started_at': operation_details.started_at, 'completed_at': operation_details.completed_at, 'status': operation_details.status.value }) return {'data': mapping_data}, 200 @bp.route('', methods=['DELETE']) def delete_mapping(): """Delete mapping.""" db_id = request.args.get('db_id', type=int) if db_id: source_db = SourceDB.query.get_or_404(db_id) # check of any operation exists operation_exists = db.session.query(OperationDetails) \ .join(Mapping) \ .filter(Mapping.db_id == db_id) \ .count() if operation_exists: return {'errors': MAPPING_IS_USED}, 400 # clear config config = source_db.config if source_db.config: clear_bms_target_params(config) db.session.query(Mapping).filter(Mapping.db_id == db_id).delete() db.session.commit() return {}, 204 @bp.route('', methods=['PUT']) def edit_mapping(): """Update mapping.""" validated_data = EditMappingSchema().load(request.json) db_id = validated_data['db_id'] new_bms_ids = validated_data['bms_id'] wave_id = validated_data.get('wave_id') fe_rac_nodes = validated_data.get('fe_rac_nodes') if does_db_have_operation(db_id): raise ValidationError({'db_id': [MAPPING_IS_USED]}) EditMappingService.run( db_id=db_id, new_bms_ids=new_bms_ids, wave_id=wave_id, fe_rac_nodes=fe_rac_nodes ) mappings_data = GetMappingsService.run(db_id=db_id) data = mappings_data[0] if mappings_data else {} return {'data': data}, 201