backend/bms_app/wave/services/waves.py (275 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from marshmallow import ValidationError
from sqlalchemy import func
from bms_app.models import (
DEPLOYED_STATUSES, OPERATION_STATUSES_ORDER, BMSServer, Config, Mapping,
Operation, OperationDetails, OperationType, SourceDB, SourceDBStatus, Wave,
SourceDBEngine, db
)
from bms_app.schema import WaveSchema
from bms_app.services.utils import generate_target_gcp_logs_link
from bms_app.wave_steps import DEPLOYMENT_STEPS, ROLLBACK_STEPS
def validate_wave_name_is_unique(name, project_id, exclude_wave_id=None):
"""Raise err if wave name already exist in the project."""
qs = Wave.query.filter(
Wave.name == name,
Wave.project_id == project_id
)
if exclude_wave_id:
qs = qs.filter(Wave.id != exclude_wave_id)
if qs.count():
raise ValidationError({'name': ['This name already exists']})
def get_wave_steps(operation):
if operation.is_deployment:
return DEPLOYMENT_STEPS
if operation.is_rollback:
return ROLLBACK_STEPS
return []
def wave_rate_info(wave_id):
"""Calculate of undeployed, deployed and failed db for each wave."""
undeployed_db = 0
deployed_db = 0
failed_db = 0
rate_info = db.session.query(SourceDB.status, func.count()) \
.filter(SourceDB.wave_id == wave_id) \
.group_by(SourceDB.status) \
.all()
if rate_info:
undeployed_db = sum((x[1] for x in rate_info if x[0] in SourceDB.DEPLOYABLE_STATUSES), 0)
deployed_db = sum((x[1] for x in rate_info if x[0] in DEPLOYED_STATUSES), 0)
failed_db = next((x[1] for x in rate_info if x[0] == SourceDBStatus.FAILED), 0)
return {
'undeployed': undeployed_db,
'deployed': deployed_db,
'failed': failed_db
}
def list_waves_service(project_id):
"""Return list of waves."""
qs = db.session.query(Wave)
if project_id:
qs = qs.filter(Wave.project_id == project_id)
waves_data = WaveSchema(many=True).dump(qs)
for wave in waves_data:
wave['status_rate'] = wave_rate_info(wave['id'])
# add step data if wave is running
if wave['is_running']:
wave['step'] = RunningWaveDetails(wave['id']).get_step_data()
return waves_data
def add_secret_name_status(mappings_data, secret_names):
for db_id in mappings_data:
mappings_data[db_id]['has_secret_name'] = all(secret_names[db_id])
def add_aggregated_db_status(wave_mappings_data):
"""Add calculated db status based on statuses of targtes.
Compare statuses from the db and to predifined ordered list.
That list is organized in the order in which stasuses can change.
So the 1-st found status can be treated as status of the operation.
"""
for db_id in wave_mappings_data:
all_statuses = [
x['operation_status'] for x in wave_mappings_data[db_id]['bms']
]
status = ''
for st_option in OPERATION_STATUSES_ORDER:
if st_option.value in all_statuses:
status = st_option.value
break
wave_mappings_data[db_id]['operation_status'] = status
class LastOpWaveDetails:
"""Provide info about wave last operation."""
def __init__(self, wave_id):
self.wave_id = wave_id
def get_extra_data(self):
"""Return details re last deployment and all db mappings."""
return {
'last_deployment': self._get_last_deployment_data(),
'mappings': self._get_mappings_data(),
}
def _get_last_deployment_data(self):
"""Retun last deploymnet data."""
last_deploy = db.session.query(Operation) \
.filter(Operation.wave_id == self.wave_id,
Operation.operation_type == OperationType.DEPLOYMENT) \
.order_by(Operation.id.desc()) \
.first()
if last_deploy:
return {
'id': last_deploy.id,
'started_at': last_deploy.started_at,
'completed_at': last_deploy.completed_at,
'operation_type': last_deploy.operation_type.value,
}
def _get_dms_auto_mappings(self):
query = db.session.query(SourceDB)\
.outerjoin(Config) \
.with_entities(SourceDB, Config.is_configured) \
.filter(SourceDB.wave_id == self.wave_id) \
.filter(SourceDB.db_engine == SourceDBEngine.POSTGRES)
mappings = []
for source_db, is_configured in query:
mapping = {
'server': source_db.server,
'db_id': source_db.id,
'db_name': source_db.db_name,
'is_deployable': source_db.is_deployable,
'is_dms_auto_mapping': True,
'db_engine': source_db.db_engine.value,
'operation_type': None, # TODO: get last operation type
'operation_status': '',
'operation_id': None, # TODO: get last operation id
'is_configured': is_configured if is_configured is not None else False,
}
mappings.append(mapping)
return mappings
def _get_mappings_data(self):
"""Return info and last operation for each db mapping."""
mappings_data = {}
secret_names = {}
# get latest operation id per mapping/bms_target
subq = db.session.query(OperationDetails) \
.with_entities(func.max(OperationDetails.id)) \
.filter(OperationDetails.wave_id == self.wave_id) \
.group_by(OperationDetails.mapping_id)
# get latest operation details/history for these mappings
joinq = db.session.query(OperationDetails) \
.filter(OperationDetails.id.in_(subq)) \
.subquery()
query = db.session.query(Mapping, SourceDB, BMSServer, joinq) \
.outerjoin(SourceDB) \
.outerjoin(BMSServer) \
.outerjoin(Config) \
.with_entities(SourceDB, BMSServer, Config.is_configured,
joinq.c.operation_type, joinq.c.status,
joinq.c.operation_id, joinq.c.step) \
.outerjoin(joinq, Mapping.id == joinq.c.mapping_id) \
.filter(SourceDB.wave_id == self.wave_id)
for row in query:
source_db, bms_server, is_configured, last_op_type, \
last_op_status, last_op_id, last_op_step = row
db_id = source_db.id
if db_id not in mappings_data:
mappings_data[db_id] = {
'server': source_db.server,
'db_id': source_db.id,
'db_name': source_db.db_name,
'db_type': source_db.db_type.value,
'is_deployable': source_db.is_deployable,
'operation_type': last_op_type.value if last_op_type else None,
'operation_status': '',
'operation_id': last_op_id,
'bms': [],
'is_configured': is_configured if is_configured is not None else False,
}
secret_names[db_id] = [bool(bms_server.secret_name)]
mappings_data[db_id]['bms'].append({
'bms_id': bms_server.id,
'bms_name': bms_server.name,
'operation_status': last_op_status.value if last_op_status else None,
'operation_step': last_op_step,
})
add_secret_name_status(mappings_data, secret_names)
add_aggregated_db_status(mappings_data)
dms_auto_mappings = self._get_dms_auto_mappings()
return list(mappings_data.values()) + dms_auto_mappings
class RunningWaveDetails:
"""Running wave data."""
def __init__(self, wave_id):
self.wave_id = wave_id
self._curr_op = None
@property
def curr_op(self):
if not self._curr_op:
self._curr_op = db.session.query(Operation) \
.filter(Operation.wave_id == self.wave_id) \
.order_by(Operation.id.desc()) \
.first()
return self._curr_op
def get_extra_data(self):
"""Return details re current running operation and its db mappings."""
return {
'curr_operation': self._get_running_op_data(),
'mappings': self._get_running_mappings_data(),
}
def get_step_data(self):
"""Return current/total step number."""
all_steps = db.session.query(OperationDetails.step). \
filter(OperationDetails.operation_id == self.curr_op.id). \
all()
all_steps = [x[0] for x in all_steps]
wave_steps = get_wave_steps(self.curr_op)
last_step_index = 0
# find the latest step within all_steps
for ind, item in enumerate(wave_steps, 1):
if item['id'] in all_steps:
last_step_index = ind
return {
'curr_step': last_step_index,
'total_steps': len(wave_steps)
}
def _get_running_op_data(self):
return {
'operation_type': self.curr_op.operation_type.value,
'started_at': self.curr_op.started_at,
'completed_at': self.curr_op.completed_at,
'id': self.curr_op.id,
}
def _get_running_mappings_data(self):
"""Return running mappings data."""
query = db.session.query(OperationDetails, SourceDB, BMSServer, Config) \
.filter(OperationDetails.operation_id == self.curr_op.id) \
.outerjoin(Mapping, OperationDetails.mapping_id == Mapping.id) \
.outerjoin(SourceDB, Mapping.db_id == SourceDB.id) \
.outerjoin(BMSServer, Mapping.bms_id == BMSServer.id) \
.outerjoin(Config)
mappings = {}
secret_names = {}
for op_details, source_db, bms_server, config in query:
db_id = source_db.id
if db_id not in mappings:
mappings[db_id] = {
'server': source_db.server,
'db_id': source_db.id,
'db_name': source_db.db_name,
'db_type': source_db.db_type.value,
'is_deployable': source_db.is_deployable,
'operation_type': op_details.operation_type.value,
'operation_id': op_details.operation_id,
'bms': [],
'is_configured': config.is_configured if config else False,
}
logs_url = generate_target_gcp_logs_link(op_details, bms_server)
mappings[db_id]['bms'].append({
'id': bms_server.id,
'name': bms_server.name,
'operation_status': op_details.status.value,
'operation_step': op_details.step,
'logs_url': logs_url
})
secret_names[db_id] = [bool(bms_server.secret_name)]
add_secret_name_status(mappings, secret_names)
add_aggregated_db_status(mappings)
return list(mappings.values())
class GetWaveService:
@classmethod
def run(cls, wave_id, return_details=False):
"""Return wave data."""
wave = Wave.query.get_or_404(wave_id)
data = WaveSchema().dump(wave)
data['status_rate'] = wave_rate_info(wave_id)
if wave.is_running:
wave_details_cls = RunningWaveDetails(wave_id=wave_id)
data['step'] = wave_details_cls.get_step_data()
else:
wave_details_cls = LastOpWaveDetails(wave_id=wave_id)
if return_details:
data['mappings_count'] = cls._get_mappings_count(wave_id)
extra_data = wave_details_cls.get_extra_data()
data.update(extra_data)
return data
@staticmethod
def _get_mappings_count(wave_id):
"""Count total number of mappings of the specific wave."""
return db.session.query(Mapping) \
.join(SourceDB) \
.filter(SourceDB.wave_id == wave_id) \
.count()
def assign_source_db_wave(wave, db_ids):
"""Assign wave to databases, and count assigned, skipped and unmapped"""
assigned = skipped = unmapped = 0
# join Mapping in order to know if db is mapped to any target
query = db.session.query(SourceDB, func.count(Mapping.id))\
.outerjoin(Mapping, SourceDB.id == Mapping.db_id)\
.filter(
SourceDB.id.in_(db_ids),
SourceDB.project_id == wave.project_id) \
.group_by(SourceDB)\
.all()
for source_db, mapp_count in query:
if not mapp_count and source_db.db_engine == SourceDBEngine.ORACLE:
unmapped += 1
# assign only db without any operation
elif source_db.status == SourceDBStatus.EMPTY and \
(not source_db.wave or not source_db.wave.is_running):
source_db.wave_id = wave.id
assigned += 1
else:
skipped += 1
db.session.commit()
return {
'assigned': assigned,
'skipped': skipped,
'unmapped': unmapped,
}