backend/bms_app/schema.py (115 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from marshmallow import Schema, ValidationError, fields, validates
from bms_app import ma
from bms_app.models import Config, Label, Operation, Project, Wave
from bms_app.validators import check_if_text, validate_project_id
# ORA_VERSIONS = ['19.3.0.0.0', '18.0.0.0.0', '12.2.0.1.0', '12.1.0.2.0', '11.2.0.4.0']
# ORA_EDITIONS = ['EE', 'SE', 'SE2']
# CLUSTER_TYPES = ['RAC', 'DG']
# DISK_GROUPS = ['DATA1', 'RECO1']
class WaveSchema(ma.SQLAlchemyAutoSchema):
class Meta:
model = Wave
include_fk = True
@validates('project_id')
def validated_project_id_exists(self, value):
validate_project_id(value)
class AddWaveSchema(Schema):
name = fields.Str(required=True)
project_id = fields.Int(required=True)
db_ids = fields.List(fields.Int(), required=False)
@validates('project_id')
def validated_project_id_exists(self, value):
validate_project_id(value)
class ProjectSchema(ma.SQLAlchemyAutoSchema):
class Meta:
model = Project
class OperationSchema(ma.SQLAlchemyAutoSchema):
class Meta:
model = Operation
include_fk = True
operation_type = fields.Function(lambda obj: obj.operation_type.value)
status = fields.Function(lambda obj: obj.status.value)
class FileSchema(Schema):
file = fields.Raw(required=True)
class TextFileSchema(Schema):
file = fields.Raw(required=True)
@validates('file')
def validate_txt_file_format(self, file_object):
file_object.seek(0)
if not check_if_text(file_object):
raise ValidationError(['file format has to be txt'])
@validates('file')
def validate_not_empty_file(self, file_object):
file_object.seek(0)
if not file_object.read():
raise ValidationError(['file can not be empty'])
class BinaryFileSchema(Schema):
file = fields.Raw(required=True)
@validates('file')
def validate_binary_file_format(self, file_object):
if check_if_text(file_object):
raise ValidationError(['file format has to be binary'])
class DbParamsSchema(Schema):
# ora_version = fields.Str(validate=validate.OneOf(ORA_VERSIONS))
# ora_edition = fields.Str(validate=validate.OneOf(ORA_EDITIONS), load_default='EE')
db_name = fields.Str(load_default='ORCL')
# ora_role_separation = fields.Boolean()
# cluster_type = fields.Str(validate=validate.OneOf(CLUSTER_TYPES), missing=None)
# scan_port = fields.Integer(load_default=1521)
# oracle_user = fields.Str(load_default='oracle')
# oracle_group = fields.Str(load_default='oinstall')
# grid_user = fields.Str(load_default='grid')
# grid_group = fields.Str(load_default='asmadmin')
# oracle_root = fields.Str(load_default='/u01/app')
# home_name = fields.Str(load_default='db_home1')
#
# @validates_schema
# def validate_ora_edit_to_ora_ver(self, data, **kwargs):
# if data['ora_edition'] == 'SE' and data['ora_version'] != '11.2.0.4.0':
# raise ValidationError(
# 'Not appropriate ora_version. '
# 'For ora_edition SE only 11.2.0.4.0 ora_version is appropriate'
# )
# if data['ora_edition'] == 'SE2' and data['ora_version'] == '11.2.0.4.0':
# raise ValidationError(
# 'Not appropriate ora_version. '
# 'For ora_edition SE2 only 12.1.0.2.0 and above ora_versions are appropriate'
# )
# if data['ora_role_separation'] is False:
# data['grid_user'] = data['oracle_user']
class DataMountSchema(Schema):
purpose = fields.Str(load_default='software')
blk_device = fields.Str()
name = fields.Str(load_default='u01')
fstype = fields.Str(load_default='xfs')
mount_point = fields.Str(load_default='/u01')
mount_opts = fields.Str(load_default='nofail')
class DisksSchema(Schema):
blk_device = fields.Str()
name = fields.Str()
size = fields.Str()
class ASMConfigSchema(Schema):
diskgroup = fields.Str()
disks = fields.List(fields.Nested(DisksSchema))
au_size = fields.Str()
redundancy = fields.Str()
class MiscConfigSchema(Schema):
swap_blk_device = fields.Str()
oracle_root = fields.Str(load_default='/u01/app')
ntp_preferred = fields.Str(load_default='/etc/ntp.conf')
role_separation = fields.Boolean()
compatible_asm = fields.Str()
compatible_rdbms = fields.Str()
asm_disk_management = fields.Str()
is_swap_configured = fields.Boolean()
class InstallConfigSchema(Schema):
oracle_user = fields.Str(load_default='oracle')
oracle_group = fields.Str(load_default='oinstall')
home_name = fields.Str(load_default='db_home19c')
grid_user = fields.Str(load_default='grid')
grid_group = fields.Str(load_default='asmadmin')
class RACNodeSchema(Schema):
node_name = fields.Str()
node_id = fields.Int()
node_ip = fields.Str()
vip_name = fields.Str()
vip_ip = fields.Str()
class RACConfigSchema(Schema):
vip_name = fields.Str()
vip_ip = fields.Str()
scan_name = fields.Str(load_default='SCAN')
scan_port = fields.Int(load_default=1521)
cluster_name = fields.Str()
cluster_domain = fields.Str()
public_net = fields.Str(load_default='eth0')
private_net = fields.Str(load_default='eth1')
scan_ip1 = fields.Str()
scan_ip2 = fields.Str(required=False)
scan_ip3 = fields.Str(required=False)
dg_name = fields.Str()
rac_nodes = fields.List(fields.Nested(RACNodeSchema))
class DMSConfigSchema(Schema):
port = fields.Int()
username = fields.Str()
password = fields.Str()
password_secret_id = fields.Str()
class ConfigSchema(ma.SQLAlchemyAutoSchema):
class Meta:
model = Config
include_fk = True
class ProjectIdSchema(Schema):
project_id = fields.Integer(required=True)
class LabelSchema(ma.SQLAlchemyAutoSchema):
class Meta:
model = Label
include_fk = True