in data_validation/schema_validation.py [0:0]
def execute(self):
"""Performs a validation between source and a target schema"""
ibis_source_schema = clients.get_ibis_table_schema(
self.config_manager.source_client,
self.config_manager.source_schema,
self.config_manager.source_table,
)
ibis_target_schema = clients.get_ibis_table_schema(
self.config_manager.target_client,
self.config_manager.target_schema,
self.config_manager.target_table,
)
source_fields = {}
for field_name, data_type in ibis_source_schema.items():
source_fields[field_name] = data_type
target_fields = {}
for field_name, data_type in ibis_target_schema.items():
target_fields[field_name] = data_type
results = schema_validation_matching(
source_fields,
target_fields,
self.config_manager.exclusion_columns,
self.config_manager.allow_list,
)
df = pandas.DataFrame(
results,
columns=[
consts.SOURCE_COLUMN_NAME,
consts.TARGET_COLUMN_NAME,
consts.SOURCE_AGG_VALUE,
consts.TARGET_AGG_VALUE,
consts.VALIDATION_STATUS,
],
)
# Update and Assign Metadata Values
self.run_metadata.end_time = datetime.datetime.now(datetime.timezone.utc)
df.insert(loc=0, column=consts.CONFIG_RUN_ID, value=self.run_metadata.run_id)
df.insert(loc=1, column=consts.VALIDATION_NAME, value="Schema")
df.insert(loc=2, column=consts.VALIDATION_TYPE, value="Schema")
df.insert(
loc=3,
column=consts.CONFIG_LABELS,
value=[self.run_metadata.labels for _ in range(len(df.index))],
)
df.insert(
loc=4, column=consts.CONFIG_START_TIME, value=self.run_metadata.start_time
)
df.insert(
loc=5, column=consts.CONFIG_END_TIME, value=self.run_metadata.end_time
)
df.insert(
loc=6,
column=consts.SOURCE_TABLE_NAME,
value=self.config_manager.full_source_table,
)
df.insert(
loc=7,
column=consts.TARGET_TABLE_NAME,
value=self.config_manager.full_target_table,
)
df.insert(loc=10, column=consts.AGGREGATION_TYPE, value="Schema")
# empty columns added due to changes on the results schema
df.insert(loc=14, column=consts.CONFIG_PRIMARY_KEYS, value=None)
df.insert(loc=15, column=consts.NUM_RANDOM_ROWS, value=None)
df.insert(loc=16, column=consts.GROUP_BY_COLUMNS, value=None)
df.insert(loc=17, column=consts.VALIDATION_DIFFERENCE, value=None)
df.insert(loc=18, column=consts.VALIDATION_PCT_THRESHOLD, value=None)
return df