backend/bms_app/services/operations/restore.py (255 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging from datetime import datetime from marshmallow import ValidationError from bms_app.models import ( FAILOVER_ALLOWED_STATUSES, PRE_RESTORE_ALLOWED_STATUSES, RESTORE_ALLOWED_STATUSES, ROLLBACK_RESTORE_ALLOWED_STATUSES, OperationDetailsError, OperationType, SourceDB, SourceDBStatus, db ) from bms_app.services.ansible.restore import ( AnsibleFailOverConfigService, AnsiblePreRestoreConfigService, AnsibleRestoreConfigService, AnsibleRollbackRestoreConfigService ) from bms_app.services.control_node import ( FailOverControlNodeService, PreRestoreControlNodeService, RestoreControlNodeService, RollbackRestoreControlNodeService ) from bms_app.services.disk_space_validator import ( DiskSpaceError, DiskSpaceValidator ) from bms_app.services.restore_config import RestoreConfigValidationsService from bms_app.services.status_handlers.operation import ( FailOverOperationStatusHandler, PreRestoreOperationStatusHandler, RestoreOperationStatusHandler, RollbackRestoreOperationStatusHandler ) from bms_app.services.status_handlers.operation_detail import ( PreRestoreOperationDetailStatusHandler ) from .base import BaseOperation from .db_mappings import get_restore_db_mappings_objects from .utils import is_pre_restore_allowed, is_restore_allowed logger = logging.getLogger(__name__) class BaseRestoreOperation(BaseOperation): OPERATION_TYPE = None IN_PROGRESS_STATUS = None ALLOWED_STATUSES = None CONTROL_NODE_CLS = None GCS_CONFIG_DIR_TMPL = None ANSIBLE_SERVICE = None OPERATION_STATUS_HANDLER = None def __init_subclass__(cls, **kwargs): """Check if all required variables are set in subclasses.""" super().__init_subclass__(**kwargs) required_attrs = ( 'OPERATION_TYPE', 'IN_PROGRESS_STATUS', 'ALLOWED_STATUSES', 'CONTROL_NODE_CLS', 'GCS_CONFIG_DIR_TMPL', 'ANSIBLE_SERVICE', 'OPERATION_STATUS_HANDLER' ) missing_attrs = [ attr for attr in required_attrs if getattr(cls, attr) is None ] if missing_attrs: raise NotImplementedError( f'{cls.__name__} missing required class variables: {missing_attrs}' ) def run(self, db_id): source_db = db.session.query(SourceDB).with_for_update().get(db_id) self._validate_source_db(source_db) self._set_source_db_status(source_db) db_mappings_objects = get_restore_db_mappings_objects(db_id=db_id) self._validate_db_mappings_objects(db_mappings_objects) operation = self._create_operation_model(wave_id=None) self._log(operation, db_mappings_objects) operation_details = self._create_operation_details_models( operation, db_mappings_objects ) try: self._run_operation( source_db, operation, operation_details, db_mappings_objects ) except Exception: logger.exception( 'Error starting %s %s', operation.id, operation.operation_type.value ) self._handle_pre_deployment_failure(operation) db.session.commit() def _handle_pre_deployment_failure(self, operation): self.OPERATION_STATUS_HANDLER( operation, completed_at=datetime.now() ).terminate() def _run_operation(self, source_db, operation, operation_details, db_mappings_objects): return self._start_pre_deployment( source_db, operation, db_mappings_objects ) def _start_pre_deployment(self, source_db, operation, db_mappings_objects): """Generate ansible configs and start control node.""" gcs_config_dir = self._get_gcs_config_dir(operation_id=operation.id) db_mappings_object = db_mappings_objects[0] # generate and upload ansible config files self.ANSIBLE_SERVICE( db_mappings_object, gcs_config_dir ).run() cn_context = self.get_control_node_context( source_db, operation, db_mappings_objects ) # run control node self.CONTROL_NODE_CLS.run( project=source_db.project, operation=operation, gcs_config_dir=gcs_config_dir, **cn_context ) @classmethod def _validate_source_db(cls, source_db): if not source_db: raise ValidationError( {'db_id': ['db not found']} ) if source_db.is_rac: raise ValidationError( {'db_id': ['restore operations for RAC is not supported']} ) cls._extra_validate_source_db(source_db) @classmethod def _extra_validate_source_db(cls, source_db): if source_db.status not in cls.ALLOWED_STATUSES: raise ValidationError( {'db_id': [f'wrong db status: {source_db.status}']} ) @classmethod def _set_source_db_status(cls, source_db): source_db.status = cls.IN_PROGRESS_STATUS db.session.add(source_db) db.session.commit() def get_control_node_context(self, source_db, operation, db_mappings_objects): """Data to pass to startup script template for contorl node.""" context = { 'total_targets': self._count_total_targets(db_mappings_objects) } return context class PreRestoreOperation(BaseRestoreOperation): OPERATION_TYPE = OperationType.PRE_RESTORE IN_PROGRESS_STATUS = SourceDBStatus.PRE_RESTORE ALLOWED_STATUSES = PRE_RESTORE_ALLOWED_STATUSES CONTROL_NODE_CLS = PreRestoreControlNodeService GCS_CONFIG_DIR_TMPL = 'pre_restore_{operation_id}_{dt}' ANSIBLE_SERVICE = AnsiblePreRestoreConfigService OPERATION_STATUS_HANDLER = PreRestoreOperationStatusHandler def _run_operation(self, source_db, operation, operation_details, db_mappings_objects): """Run pre-restore operation. Depending on RestoreConfig.validations possible options to run are: - rman validations (requires starting control node) - disk space validation (does not require control node) - both: rman and disk space validation """ rcv = RestoreConfigValidationsService(source_db) if rcv.needs_rman_validation() \ and not rcv.needs_disk_space_validation(): # start control node only # operation will be closed as usually by msg from pubsub self._start_pre_deployment( source_db, operation, db_mappings_objects, ) elif not rcv.needs_rman_validation() \ and rcv.needs_disk_space_validation(): # run only disk space validation # and finish/close operation depending on the validation result now = datetime.now() opd_handler = PreRestoreOperationDetailStatusHandler( operation_details[0], now, ) is_valid = self._run_disk_space_validation( source_db, operation_details[0] ) if is_valid: opd_handler.set_complete() else: opd_handler.set_fail() self.OPERATION_STATUS_HANDLER( operation, completed_at=now, ).finish() elif rcv.needs_disk_space_validation() \ and rcv.needs_disk_space_validation(): # start contorl node and run disk space validation # operation will be closed as usually by msg from pubsub self._run_disk_space_validation( source_db, operation_details[0] ) self._start_pre_deployment( source_db, operation, db_mappings_objects, ) @staticmethod def _run_disk_space_validation(source_db, operation_detail): """Return whether validation was successfull or not.""" is_valid = True try: DiskSpaceValidator(source_db).validate() except DiskSpaceError: is_valid = False err = OperationDetailsError( operation_details_id=operation_detail.id, message='disk space error' ) db.session.add(err) db.session.flush() return is_valid @classmethod def _extra_validate_source_db(cls, source_db): if not is_pre_restore_allowed(source_db): raise ValidationError( {'db_id': ['operation is not allowed']} ) def get_control_node_context(self, source_db, operation, db_mappings_objects): context = super().get_control_node_context( source_db, operation, db_mappings_objects ) context['source_db'] = source_db return context class RestoreOperation(BaseRestoreOperation): OPERATION_TYPE = OperationType.BACKUP_RESTORE IN_PROGRESS_STATUS = SourceDBStatus.DT ALLOWED_STATUSES = RESTORE_ALLOWED_STATUSES CONTROL_NODE_CLS = RestoreControlNodeService GCS_CONFIG_DIR_TMPL = 'restore_{operation_id}_{dt}' ANSIBLE_SERVICE = AnsibleRestoreConfigService OPERATION_STATUS_HANDLER = RestoreOperationStatusHandler def get_control_node_context(self, source_db, operation, db_mappings_objects): context = super().get_control_node_context( source_db, operation, db_mappings_objects ) context['source_db'] = source_db return context @classmethod def _extra_validate_source_db(cls, source_db): if not is_restore_allowed(source_db): raise ValidationError( {'db_id': ['operation is not allowed']} ) class RollbackRestoreOperation(BaseRestoreOperation): OPERATION_TYPE = OperationType.ROLLBACK_RESTORE IN_PROGRESS_STATUS = SourceDBStatus.DT_ROLLBACK ALLOWED_STATUSES = ROLLBACK_RESTORE_ALLOWED_STATUSES CONTROL_NODE_CLS = RollbackRestoreControlNodeService GCS_CONFIG_DIR_TMPL = 'dt_rollback_{operation_id}_{dt}' ANSIBLE_SERVICE = AnsibleRollbackRestoreConfigService OPERATION_STATUS_HANDLER = RollbackRestoreOperationStatusHandler class FailOverOperation(BaseRestoreOperation): OPERATION_TYPE = OperationType.DB_FAILOVER IN_PROGRESS_STATUS = SourceDBStatus.FAILOVER ALLOWED_STATUSES = FAILOVER_ALLOWED_STATUSES CONTROL_NODE_CLS = FailOverControlNodeService GCS_CONFIG_DIR_TMPL = 'dt_failover_{operation_id}_{dt}' ANSIBLE_SERVICE = AnsibleFailOverConfigService OPERATION_STATUS_HANDLER = FailOverOperationStatusHandler