backend/bms_app/services/operations/base.py (62 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from datetime import datetime
from marshmallow import ValidationError
from bms_app import settings
from bms_app.models import Operation, OperationDetails, OperationStatus, db
logger = logging.getLogger(__name__)
class BaseOperation:
OPERATION_TYPE = None
GCS_CONFIG_DIR_TMPL = None
def _log(self, operation, db_mappings_objects):
logger.debug(
'%s operation: found %s dbs and %s targets ',
operation.operation_type.value,
len(db_mappings_objects),
self._count_total_targets(db_mappings_objects)
)
def _create_operation_model(self, wave_id):
operation = Operation(
wave_id=wave_id,
status=OperationStatus.STARTING,
operation_type=self.OPERATION_TYPE,
)
db.session.add(operation)
db.session.commit()
return operation
def _create_operation_details_models(self, operation, db_mappings_objects):
operation_details_models = []
all_mappings = [m for obj in db_mappings_objects for m in obj.mappings]
for mapping in all_mappings:
if mapping:
operation_details_models.append(
OperationDetails(
mapping_id=mapping.id,
wave_id=operation.wave_id,
operation_id=operation.id,
status=OperationStatus.STARTING,
operation_type=self.OPERATION_TYPE,
step='PRE_DEPLOYMENT',
)
)
db.session.add_all(operation_details_models)
db.session.commit()
return operation_details_models
def _validate_db_mappings_objects(self, db_mappings_objects):
if not db_mappings_objects \
or not self._count_total_targets(db_mappings_objects):
raise ValidationError(
{'_schema': ['no db/mappings to run operation']}
)
@staticmethod
def _count_total_targets(db_mappings_objects):
return sum([len(obj.mappings)for obj in db_mappings_objects])
@classmethod
def _get_gcs_config_dir(cls, **kwargs):
"""Return full gcs prefix to specific operation configuration dir."""
params = {'dt': datetime.now().strftime('%Y%m%d%H%M%S')}
params.update(kwargs)
config_dir = cls.GCS_CONFIG_DIR_TMPL.format(**params)
return os.path.join(
settings.GCS_DEPLOYMENT_CONFIG_PREFIX,
config_dir
)