in tfx/benchmarks/tfma_v2_benchmark_base.py [0:0]
def _runMiniPipeline(self, multi_model):
"""Benchmark a "mini" TFMA - predict, slice and compute metrics.
Runs a "mini" version of TFMA in a Beam pipeline. Records the wall time
taken for the whole pipeline.
Args:
multi_model: True if multiple models should be used in the benchmark.
"""
self._init_model(multi_model, validation=False)
pipeline = self._create_beam_pipeline()
tfx_io = test_util.InMemoryTFExampleRecord(
schema=benchmark_utils.read_schema(
self._dataset.tf_metadata_schema_path()),
raw_record_column_name=constants.ARROW_INPUT_COLUMN)
raw_data = (
pipeline
| "Examples" >> beam.Create(
self._dataset.read_raw_dataset(
deserialize=False, limit=self._max_num_examples()))
| "BatchExamples" >> tfx_io.BeamSource()
| "InputsToExtracts" >> tfma.BatchedInputsToExtracts())
def rescale_labels(extracts):
# Transform labels to [0, 1] so we can test metrics that require labels in
# that range.
result = copy.copy(extracts)
result[constants.LABELS_KEY] = self._transform_labels(
extracts[constants.LABELS_KEY])
return result
_ = (
raw_data
| "FeaturesExtractor" >> features_extractor.FeaturesExtractor(
eval_config=self._eval_config).ptransform
| "LabelsExtractor" >> labels_extractor.LabelsExtractor(
eval_config=self._eval_config).ptransform
| "RescaleLabels" >> beam.Map(rescale_labels)
| "ExampleWeightsExtractor" >> example_weights_extractor
.ExampleWeightsExtractor(eval_config=self._eval_config).ptransform
| "PredictionsExtractor" >> predictions_extractor.PredictionsExtractor(
eval_config=self._eval_config,
eval_shared_model=self._eval_shared_models).ptransform
| "UnbatchExtractor" >> unbatch_extractor.UnbatchExtractor().ptransform
| "SliceKeyExtractor" >> tfma.extractors.SliceKeyExtractor().ptransform
| "ComputeMetricsPlotsAndValidations" >>
metrics_plots_and_validations_evaluator
.MetricsPlotsAndValidationsEvaluator(
eval_config=self._eval_config,
eval_shared_model=self._eval_shared_models).ptransform)
start = time.time()
for _ in range(_ITERS):
result = pipeline.run()
result.wait_until_finish()
end = time.time()
delta = end - start
self.report_benchmark(
iters=_ITERS,
wall_time=delta,
extras={
"num_examples":
self._dataset.num_examples(limit=self._max_num_examples())
})