tfx/experimental/pipeline_testing/executor_verifier_utils.py (172 lines of code) (raw):
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper utils for executor verifier."""
import filecmp
import os
from typing import Dict, List, Optional
from absl import logging
import tensorflow_model_analysis as tfma
from tfx import types
from tfx.dsl.io import fileio
from tfx.experimental.pipeline_testing import pipeline_recorder_utils
from tfx.orchestration import metadata
from tfx.types import artifact_utils
from tfx.utils import io_utils
from ml_metadata.proto import metadata_store_pb2
from tensorflow_metadata.proto.v0 import anomalies_pb2
try:
# Try to access EvalResult from tfma directly
_EvalResult = tfma.EvalResult
except AttributeError:
# If tfma doesn't have EvalResult, use the one from view_types
from tensorflow_model_analysis.view.view_types import EvalResult as _EvalResult
def compare_dirs(dir1: str, dir2: str):
"""Recursively compares contents of the two directories.
Args:
dir1: path to a directory.
dir2: path to another directory.
Returns:
a boolean whether the specified directories have the same file contents.
"""
dir_cmp = filecmp.dircmp(dir1, dir2)
if not all(
not v
for v in (dir_cmp.left_only, dir_cmp.right_only, dir_cmp.funny_files)):
return False
_, mismatch, errors = filecmp.cmpfiles(
dir1, dir2, dir_cmp.common_files, shallow=False)
if mismatch or errors:
return False
for common_dir in dir_cmp.common_dirs:
new_dir1 = os.path.join(dir1, common_dir)
new_dir2 = os.path.join(dir2, common_dir)
if not compare_dirs(new_dir1, new_dir2):
return False
return True
def _compare_relative_difference(value: float, expected_value: float,
threshold: float) -> bool:
"""Compares relative difference between value and expected_value against threshold.
Args:
value: a float value to be compared to expected value.
expected_value: a float value that is expected.
threshold: a float larger than 0.
Returns:
a boolean indicating whether the relative difference is within the
threshold.
"""
if value != expected_value:
if expected_value:
relative_diff = abs(value - expected_value) / abs(expected_value)
if not (expected_value and relative_diff <= threshold):
logging.warning('Relative difference %f exceeded threshold %f',
relative_diff, threshold)
return False
return True
def get_pipeline_outputs(
metadata_connection_config: Optional[metadata_store_pb2.ConnectionConfig],
pipeline_name: str) -> Dict[str, Dict[str, Dict[int, types.Artifact]]]:
"""Returns a dictionary of pipeline output artifacts for every component.
Args:
metadata_connection_config: connection configuration to MLMD.
pipeline_name: Name of the pipeline.
Returns:
a dictionary of holding list of artifacts for a component id.
"""
output_map = {}
with metadata.Metadata(metadata_connection_config) as m:
executions = pipeline_recorder_utils.get_latest_executions(m, pipeline_name)
for execution in executions:
component_id = pipeline_recorder_utils.get_component_id_from_execution(
m, execution)
output_dict = {}
for event in m.store.get_events_by_execution_ids([execution.id]):
if event.type == metadata_store_pb2.Event.OUTPUT:
artifacts = m.store.get_artifacts_by_id([event.artifact_id])
steps = event.path.steps
if not steps or not steps[0].HasField('key'):
raise ValueError('Artifact key is not recorded in the MLMD.')
key = steps[0].key
artifacts = m.store.get_artifacts_by_id([event.artifact_id])
if key not in output_dict:
output_dict[key] = {}
for pb_artifact in artifacts:
if len(steps) < 2 or not steps[1].HasField('index'):
raise ValueError('Artifact index is not recorded in the MLMD.')
artifact_index = steps[1].index
if artifact_index in output_dict[key]:
raise ValueError('Artifact already in output_dict')
[artifact_type
] = m.store.get_artifact_types_by_id([pb_artifact.type_id])
artifact = artifact_utils.deserialize_artifact(
artifact_type, pb_artifact)
output_dict[key][artifact_index] = artifact
output_map[component_id] = output_dict
return output_map
def verify_file_dir(output_uri: str,
expected_uri: str,
check_file: bool = False):
"""Verify pipeline output artifact uri by comparing directory structure.
Args:
output_uri: pipeline output artifact uri.
expected_uri: recorded pipeline output artifact uri.
check_file: boolean indicating whether to check file path.
Returns:
a boolean whether file paths are matching.
"""
for dir_name, sub_dirs, leaf_files in fileio.walk(expected_uri):
for sub_dir in sub_dirs:
new_file_path = os.path.join(
dir_name.replace(expected_uri, output_uri, 1), sub_dir)
if not fileio.exists(new_file_path):
logging.error('%s doesn\'t exists.', new_file_path)
return False
if check_file:
for leaf_file in leaf_files:
new_file_path = os.path.join(
dir_name.replace(expected_uri, output_uri, 1), leaf_file)
if not fileio.exists(new_file_path):
logging.error('%s doesn\'t exists.', new_file_path)
return False
return True
def _group_metric_by_slice(
eval_result: _EvalResult) -> Dict[str, Dict[str, float]]:
"""Returns a dictionary holding metric values for every slice.
Args:
eval_result: evaluation result.
Returns:
a slice map that holds a dictionary of metric and value for slices.
"""
slice_map = {}
for metric in eval_result.slicing_metrics:
slice_map[metric[0]] = {k: v['doubleValue']
for k, v in metric[1][''][''].items()}
return slice_map # pytype: disable=bad-return-type # strict_namedtuple_checks
def compare_eval_results(output_uri: str, expected_uri: str, threshold: float,
metrics: List[str]) -> bool:
"""Compares accuracy on overall dataset using two EvalResult.
Args:
output_uri: pipeline output artifact uri.
expected_uri: recorded pipeline output artifact uri.
threshold: a float larger than 0.
metrics: metric names to compare.
Returns:
boolean whether the eval result values differ within a threshold.
"""
eval_result = tfma.load_eval_result(output_uri)
expected_eval_result = tfma.load_eval_result(expected_uri)
slice_map = _group_metric_by_slice(eval_result)
expected_slice_map = _group_metric_by_slice(expected_eval_result)
for metric_name, value in slice_map[()].items():
if metric_name not in metrics:
continue
expected_value = expected_slice_map[()][metric_name]
if not _compare_relative_difference(value, expected_value, threshold):
logging.warning('Check following metric: %s', metric_name)
return False
return True
def compare_file_sizes(output_uri: str, expected_uri: str,
threshold: float) -> bool:
"""Compares pipeline output files sizes in output and recorded uri.
Args:
output_uri: pipeline output artifact uri.
expected_uri: recorded pipeline output artifact uri.
threshold: a float between 0 and 1.
Returns:
boolean whether file sizes differ within a threshold.
"""
for dir_name, sub_dirs, leaf_files in fileio.walk(expected_uri):
for sub_dir in sub_dirs:
new_file_path = os.path.join(
dir_name.replace(expected_uri, output_uri, 1), sub_dir)
if not fileio.exists(new_file_path):
return False
for leaf_file in leaf_files:
expected_file_name = os.path.join(dir_name, leaf_file)
file_name = os.path.join(
dir_name.replace(expected_uri, output_uri, 1), leaf_file)
if not _compare_relative_difference(
fileio.open(file_name).size(),
fileio.open(expected_file_name).size(), threshold):
return False
return True
def compare_model_file_sizes(output_uri: str, expected_uri: str,
threshold: float) -> bool:
"""Compares pipeline output files sizes in output and recorded uri.
Args:
output_uri: pipeline output artifact uri.
expected_uri: recorded pipeline output artifact uri.
threshold: a float between 0 and 1.
Returns:
boolean whether file sizes differ within a threshold.
"""
for dir_name, sub_dirs, leaf_files in fileio.walk(expected_uri):
if ('Format-TFMA' in dir_name or 'eval_model_dir' in dir_name or
'export' in dir_name):
continue
for sub_dir in sub_dirs:
new_file_path = os.path.join(
dir_name.replace(expected_uri, output_uri, 1), sub_dir)
if not fileio.exists(new_file_path):
return False
for leaf_file in leaf_files:
if leaf_file.startswith('events.out.tfevents'):
continue
expected_file_name = os.path.join(dir_name, leaf_file)
file_name = os.path.join(
dir_name.replace(expected_uri, output_uri, 1), leaf_file)
if not _compare_relative_difference(
fileio.open(file_name).size(),
fileio.open(expected_file_name).size(), threshold):
return False
return True
def compare_anomalies(output_uri: str, expected_uri: str) -> bool:
"""Compares anomalies files in output uri and recorded uri.
Looks at only binary proto files.
Args:
output_uri: pipeline output artifact uri.
expected_uri: recorded pipeline output artifact uri.
Returns:
boolean whether anomalies are same.
"""
for dir_name, _, leaf_files in fileio.walk(expected_uri):
for leaf_file in leaf_files:
if not leaf_file.endswith('.pb'):
# Do not analyze non-binary-proto files.
continue
expected_file_name = os.path.join(dir_name, leaf_file)
file_name = os.path.join(
dir_name.replace(expected_uri, output_uri, 1), leaf_file)
anomalies = anomalies_pb2.Anomalies()
anomalies.ParseFromString(
io_utils.read_bytes_file(os.path.join(output_uri, file_name)))
expected_anomalies = anomalies_pb2.Anomalies()
expected_anomalies.ParseFromString(
io_utils.read_bytes_file(
os.path.join(expected_uri, expected_file_name)))
if expected_anomalies.anomaly_info != anomalies.anomaly_info:
return False
return True