def _runMiniPipeline()

in tfx/benchmarks/tfma_v2_benchmark_base.py [0:0]


  def _runMiniPipeline(self, multi_model):
    """Benchmark a "mini" TFMA - predict, slice and compute metrics.

    Runs a "mini" version of TFMA in a Beam pipeline. Records the wall time
    taken for the whole pipeline.

    Args:
      multi_model: True if multiple models should be used in the benchmark.
    """
    self._init_model(multi_model, validation=False)
    pipeline = self._create_beam_pipeline()
    tfx_io = test_util.InMemoryTFExampleRecord(
        schema=benchmark_utils.read_schema(
            self._dataset.tf_metadata_schema_path()),
        raw_record_column_name=constants.ARROW_INPUT_COLUMN)
    raw_data = (
        pipeline
        | "Examples" >> beam.Create(
            self._dataset.read_raw_dataset(
                deserialize=False, limit=self._max_num_examples()))
        | "BatchExamples" >> tfx_io.BeamSource()
        | "InputsToExtracts" >> tfma.BatchedInputsToExtracts())

    def rescale_labels(extracts):
      # Transform labels to [0, 1] so we can test metrics that require labels in
      # that range.
      result = copy.copy(extracts)
      result[constants.LABELS_KEY] = self._transform_labels(
          extracts[constants.LABELS_KEY])
      return result

    _ = (
        raw_data
        | "FeaturesExtractor" >> features_extractor.FeaturesExtractor(
            eval_config=self._eval_config).ptransform
        | "LabelsExtractor" >> labels_extractor.LabelsExtractor(
            eval_config=self._eval_config).ptransform
        | "RescaleLabels" >> beam.Map(rescale_labels)
        | "ExampleWeightsExtractor" >> example_weights_extractor
        .ExampleWeightsExtractor(eval_config=self._eval_config).ptransform
        | "PredictionsExtractor" >> predictions_extractor.PredictionsExtractor(
            eval_config=self._eval_config,
            eval_shared_model=self._eval_shared_models).ptransform
        | "UnbatchExtractor" >> unbatch_extractor.UnbatchExtractor().ptransform
        | "SliceKeyExtractor" >> tfma.extractors.SliceKeyExtractor().ptransform
        | "ComputeMetricsPlotsAndValidations" >>
        metrics_plots_and_validations_evaluator
        .MetricsPlotsAndValidationsEvaluator(
            eval_config=self._eval_config,
            eval_shared_model=self._eval_shared_models).ptransform)

    start = time.time()
    for _ in range(_ITERS):
      result = pipeline.run()
      result.wait_until_finish()
    end = time.time()
    delta = end - start

    self.report_benchmark(
        iters=_ITERS,
        wall_time=delta,
        extras={
            "num_examples":
                self._dataset.num_examples(limit=self._max_num_examples())
        })