backend/bms_app/source_db/parsers.py (205 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import re
import numpy as np
import pandas as pd
class MigvisorFileError(Exception):
pass
class MigvisorFileFormatError(MigvisorFileError):
pass
class MigvisorFileDataError(MigvisorFileError):
pass
def convert_to_mb(size_bytes):
size = int(size_bytes)
i = int(math.floor(math.log(size, 1024)))
p = math.pow(1024, i)
s = round(size / p)
return f'{s}M'
class MigvisorParser:
"""Migvisor output file parser."""
COLUMNS_TO_PARSE = [
'server', 'oracle_version', 'arch', 'cores', 'ram',
'allocated_memory', 'db_name', 'db_size', 'db_engine'
]
COLUMNS_MAP = {
'architecture': 'arch',
'allocated memory': 'allocated_memory',
'database name': 'db_name',
'database size (gb)': 'db_size',
'database type': 'db_engine',
'version': 'oracle_version'
}
DATABASE_SERVERS_SHEET = 'Database Servers'
RAC_FEATURE = 'Real Application Clusters'
DB_FULL_VERSION = 'DB Full Version'
PSU_INSTALLED = 'PSU Installed'
ASM_DISKGROUPS = 'ASM Diskgroups'
DB_VERSIONS_MAP = {
'19': '19.3.0.0.0',
'18': '18.0.0.0.0',
'12.2': '12.2.0.1.0',
'12.1': '12.1.0.2.0',
'11.2': '11.2.0.4.0'
}
def __init__(self, file_path):
self.file_path = file_path
def parse(self):
"""Return list of parsed databases."""
wb = self._read_workbook()
db_data = self._parse_databases_sheet(wb)
self._validate_data(db_data)
self._extend_db_data(wb, db_data)
return db_data
def _read_workbook(self):
"""Read excel file."""
try:
return pd.ExcelFile(self.file_path)
except Exception as exc:
raise MigvisorFileFormatError('Incorrect file format') from exc
def _parse_databases_sheet(self, wb):
"""Parse data from "Database Servers" tab."""
df = self._read_database_sheet_to_df(wb)
return self._convert_db_df_to_data(df)
def _extend_db_data(self, wb, db_data):
"""Add db specific parameters.
Db parameters are defined in sheets which names start with "#" symbol.
#-sheets order corresponds to db order in the Databases sheet.
Read each #sheet and extract necessary data.
"""
wb_sheet_names = [n for n in wb.sheet_names if n.startswith('#')]
for ind, sheet_name in enumerate(wb_sheet_names):
df = wb.parse(sheet_name)
rac_nodes = self._get_rac_nodes(df)
db_full_version = self._full_db_version(df, db_data[ind])
oracle_release = self._add_oracle_release(df)
asm_dg_data = self._parse_asm_diskgroup_data(df)
db_data[ind].update({
'rac_nodes': rac_nodes,
'oracle_version': db_full_version,
'oracle_release': oracle_release,
})
db_data[ind].update(asm_dg_data)
def _parse_asm_diskgroup_data(self, df):
"""Parse oracle asm diskgroup data."""
asm_columns = {
'ALLOC_SIZE': 'au_size',
'DG_NAME': 'diskgroup',
'REDUNDANCY': 'redundancy',
}
misc_columns = {
'COMPAT': 'compatible_asm',
'DB_COMPAT': 'compatible_rdbms',
}
redundancy_names = {
'EXTEND': 'EXTENDED',
'EXTERN': 'EXTERNAL'
}
data = {
'asm': []
}
asm_dg_df = df[df['Feature'] == self.ASM_DISKGROUPS]
if not asm_dg_df.empty:
for row in list(asm_dg_df.Value):
asm_item = {}
for key_value in row.split():
key, value = key_value.split(':')
key = key.strip()
value = value.strip()
if key in asm_columns:
if key == 'ALLOC_SIZE':
value = convert_to_mb(value)
elif key == 'REDUNDANCY':
value = redundancy_names.get(value, value)
asm_item[asm_columns[key]] = value
if key in misc_columns:
# later this data will go to asm too
data[misc_columns[key]] = value
if asm_item:
data['asm'].append(asm_item)
return data
def _get_rac_nodes(self, df):
"""Add number of nodes in the RAC cluster."""
return len(df[df['Feature'] == self.RAC_FEATURE])
def _full_db_version(self, df, db_data_row):
"""Add full oracle version if exists."""
db_full_version = df[df['Feature'] == self.DB_FULL_VERSION]
if not db_full_version.empty:
db_full_version = list(db_full_version.Value)[0]
else:
db_full_version = db_data_row['oracle_version']
# convert to "base" version if it is not in full format.
if db_full_version.count('.') < 4:
db_full_version = self._db_version_format_conversion(
db_full_version
)
return db_full_version
def _db_version_format_conversion(self, db_version):
"""Convert db version to full format."""
for short_ver, long_ver in self.DB_VERSIONS_MAP.items():
if db_version.startswith(short_ver):
return long_ver
return db_version
def _add_oracle_release(self, df):
"""Add oracle release if exists.
Examples:
1. the original value RDBMS_12.2.0.1.0_LINUX.X64_170125 should become 12.2.0.1.170125
2. the original value RDBMS_19.11.0.0.0DBRU_LINUX.X64_210412 should become 19.11.0.0.210412
Explanation:
Trim the first 6 characters
Take version number X.X.X.X. + last 6 digits
"""
oracle_release = df[df['Feature'] == self.PSU_INSTALLED]
if not oracle_release.empty:
oracle_release = list(oracle_release.Value)[0]
oracle_release = '{}.{}'.format(
'.'.join(oracle_release[6:].split('.')[0:4]),
oracle_release[-6:]
)
else:
oracle_release = 'base'
return oracle_release
def _convert_db_df_to_data(self, df):
"""Convert dabatase df to dict data: rename/remove columns"""
lower_columns = [x.lower() for x in df.columns]
new_columns = {x: self.COLUMNS_MAP.get(y, y) for x, y in zip(df.columns, lower_columns)}
df.rename(columns=new_columns, inplace=True)
df = df.replace({np.nan: None})
df = df[self.COLUMNS_TO_PARSE]
# make sure 'oracle_version' is string
df['oracle_version'] = df['oracle_version'].apply(
lambda x: None if x is None else str(x)
)
df['db_engine'] = df['db_engine'].apply(
lambda engine: str.upper(engine)
)
return df.to_dict(orient='records')
@classmethod
def _read_database_sheet_to_df(cls, wb):
"""Read and validate 'Database Servers' sheet."""
if cls.DATABASE_SERVERS_SHEET not in wb.sheet_names:
raise MigvisorFileDataError(
f'No {cls.DATABASE_SERVERS_SHEET} sheet found'
)
df = wb.parse(cls.DATABASE_SERVERS_SHEET, header=None)
header_row_index = cls._find_header_row_index(df)
if header_row_index is None: # might be 0 which is OK
raise MigvisorFileDataError('Headers row not found')
# set correct headers
df.columns = df.iloc[header_row_index]
df.columns.name = None
# drop empty rows before headers
df = df[header_row_index + 1:]
# drop completely empty rows if present
df.dropna(how='all', inplace=True)
if df.empty:
raise MigvisorFileDataError('No database found')
return df
@staticmethod
def _find_header_row_index(df):
"""Find index of the row with headers.
It might be in different row depending on migvizor version.
Look for the row that contains 'server' and 'database name' values.
"""
header_row_index = None
for index, row in df.iterrows():
values = [x.lower() for x in row.values if isinstance(x, str)]
if all((x in values for x in ('server', 'database name'))):
header_row_index = index
break
return header_row_index
def _validate_data(self, db_data):
"""Validate required fields and db name"""
self._validate_required_fields(db_data)
self._validate_db_name(db_data)
def _validate_required_fields(self, db_data):
"""Check if ['server', 'db_name', 'oracle_version'] are parsed."""
required_fields = ('server', 'db_name', 'oracle_version')
for ind, item in enumerate(db_data):
missed = [f for f in required_fields if item.get(f) is None]
if missed:
original_col_names = self._get_original_column_names(missed)
raise MigvisorFileDataError(
f'Missed {original_col_names} field(s) in the row #{ind}'
)
@staticmethod
def _validate_db_name(db_data):
"""Validate db name"""
for item in db_data:
if not item['db_name'][0].isalpha():
raise MigvisorFileDataError(
'Db name should start with a letter'
)
if len(item['db_name']) > 8:
raise MigvisorFileDataError(
'Db name should be less than 8 characters'
)
if not re.search(r'^[a-zA-Z0-9_$#]*$', item['db_name']):
raise MigvisorFileDataError(
'Db name shoulf match the pattern: a-zA-Z0-9_$#'
)
def _get_original_column_names(self, cols):
"""Return original column names from the file."""
reversed_col_map = {v: k for k, v in self.COLUMNS_MAP.items()}
return [reversed_col_map.get(f, f).capitalize() for f in cols]