def compile_automl_tabular_pipeline()

in python/pipelines/pipeline_ops.py [0:0]


def compile_automl_tabular_pipeline(
        template_path: str,
        parameters_path: str,
        pipeline_name: str,
        pipeline_parameters: Dict[str, Any] = None,
        pipeline_parameters_substitutions: Optional[Dict[str, Any]] = None,
        exclude_features = List[Any],
        enable_caching: bool = True) -> tuple:
    """
    Compiles an AutoML Tabular Workflows pipeline. You don't need to define the pipeline elsewhere since the pre-compiled pipeline component is defined in the `automl_tabular_pl_v4.yaml` file.

    Args:
        template_path: The path to the pipeline template file.
        parameters_path: The path to the pipeline parameters file.
        pipeline_name: The name of the pipeline.
        pipeline_parameters: The parameters to pass to the pipeline. All these possible parameters can be set in the config.yaml.tftpl file, instead of in this file.
            additional_experiments: dict
            cv_trainer_worker_pool_specs_override: list
            data_source_bigquery_table_path: str [Default: '']
            data_source_csv_filenames: str [Default: '']
            dataflow_service_account: str [Default: '']
            dataflow_subnetwork: str [Default: '']
            dataflow_use_public_ips: bool [Default: True]
            disable_early_stopping: bool [Default: False]
            distill_batch_predict_machine_type: str [Default: 'n1-standard-16']
            distill_batch_predict_max_replica_count: int [Default: 25.0]
            distill_batch_predict_starting_replica_count: int [Default: 25.0]
            enable_probabilistic_inference: bool [Default: False]
            encryption_spec_key_name: str [Default: '']
            evaluation_batch_explain_machine_type: str [Default: 'n1-highmem-8']
            evaluation_batch_explain_max_replica_count: int [Default: 10.0]
            evaluation_batch_explain_starting_replica_count: int [Default: 10.0]
            evaluation_batch_predict_machine_type: str [Default: 'n1-highmem-8']
            evaluation_batch_predict_max_replica_count: int [Default: 20.0]
            evaluation_batch_predict_starting_replica_count: int [Default: 20.0]
            evaluation_dataflow_disk_size_gb: int [Default: 50.0]
            evaluation_dataflow_machine_type: str [Default: 'n1-standard-4']
            evaluation_dataflow_max_num_workers: int [Default: 100.0]
            evaluation_dataflow_starting_num_workers: int [Default: 10.0]
            export_additional_model_without_custom_ops: bool [Default: False]
            fast_testing: bool [Default: False]
            location: str
            model_description: str [Default: '']
            model_display_name: str [Default: '']
            optimization_objective: str
            optimization_objective_precision_value: float [Default: -1.0]
            optimization_objective_recall_value: float [Default: -1.0]
            predefined_split_key: str [Default: '']
            prediction_type: str
            project: str
            quantiles: list
            root_dir: str
            run_distillation: bool [Default: False]
            run_evaluation: bool [Default: False]
            stage_1_num_parallel_trials: int [Default: 35.0]
            stage_1_tuner_worker_pool_specs_override: list
            stage_1_tuning_result_artifact_uri: str [Default: '']
            stage_2_num_parallel_trials: int [Default: 35.0]
            stage_2_num_selected_trials: int [Default: 5.0]
            stats_and_example_gen_dataflow_disk_size_gb: int [Default: 40.0]
            stats_and_example_gen_dataflow_machine_type: str [Default: 'n1-standard-16']
            stats_and_example_gen_dataflow_max_num_workers: int [Default: 25.0]
            stratified_split_key: str [Default: '']
            study_spec_parameters_override: list
            target_column: str
            test_fraction: float [Default: -1.0]
            timestamp_split_key: str [Default: '']
            train_budget_milli_node_hours: float
            training_fraction: float [Default: -1.0]
            transform_dataflow_disk_size_gb: int [Default: 40.0]
            transform_dataflow_machine_type: str [Default: 'n1-standard-16']
            transform_dataflow_max_num_workers: int [Default: 25.0]
            transformations: str
            validation_fraction: float [Default: -1.0]
            vertex_dataset: system.Artifact
            weight_column: str [Default: '']
        pipeline_parameters_substitutions: A dictionary of substitutions to apply to the pipeline parameters.
        exclude_features: A list of features to exclude from the pipeline.
        enable_caching: Whether to enable caching for the pipeline.

    Returns:
        A tuple containing the path to the compiled pipeline template file and the pipeline parameters.
    """

    from google_cloud_pipeline_components.preview.automl.tabular import utils as automl_tabular_utils

    # This checks if there are any substitutions defined in the pipeline_parameters_substitutions dictionary. If so, it applies these substitutions to the pipeline_parameters dictionary. This allows for using placeholders in the pipeline parameters, making the pipeline more flexible and reusable.
    if pipeline_parameters_substitutions != None:
        pipeline_parameters = substitute_pipeline_params(
            pipeline_parameters, pipeline_parameters_substitutions)

    # This section handles the feature transformations for the pipeline. It checks if there is a 
    # custom_transformations file specified. If so, it reads the transformations from that file. 
    # Otherwise, it extracts the schema from the BigQuery table and generates automatic transformations based on the schema.
    pipeline_parameters['transformations'] = pipeline_parameters['transformations'].format(
        timestamp=datetime.now().strftime("%Y%m%d%H%M%S"))
    
    schema = {}

    if 'custom_transformations' in pipeline_parameters.keys():
        logging.info("Reading from custom features transformations file: {}".format(pipeline_parameters['custom_transformations']))
        schema = write_custom_transformations(pipeline_parameters['transformations'], 
                                      pipeline_parameters['custom_transformations'])
    else:
        schema = _extract_schema_from_bigquery(
            project=pipeline_parameters['project'],
            location=pipeline_parameters['location'],
            table_name=pipeline_parameters['data_source_bigquery_table_path'].split('/')[-1],
            table_schema=pipeline_parameters['data_source_bigquery_table_schema']
            )

        # If there is no features to exclude, skip the step of removing columns from the schema
        if exclude_features:
            for column_to_remove in exclude_features + [
                    pipeline_parameters['target_column'],
                    pipeline_parameters['stratified_split_key'],
                    pipeline_parameters['predefined_split_key'],
                    pipeline_parameters['timestamp_split_key']
            ]:
                if column_to_remove in schema:
                    schema.remove(column_to_remove)

        logging.info("Writing automatically generated features transformations file: {}".format(pipeline_parameters['transformations']))
        write_auto_transformations(pipeline_parameters['transformations'], schema)
    
    logging.info(f'features:{schema}')

    # This section compiles the AutoML Tabular Workflows pipeline. It uses the automl_tabular_utils module to 
    # generate the pipeline components and parameters. It then loads a pre-compiled pipeline template file 
    # (automl_tabular_pl_v4.yaml) and hydrates it with the generated parameters. Finally, it writes the 
    # compiled pipeline template and parameters to the specified files.
    if pipeline_parameters['predefined_split_key']:
        pipeline_parameters['training_fraction'] = None
        pipeline_parameters['validation_fraction'] = None
        pipeline_parameters['test_fraction'] = None

    pipeline_parameters.pop('data_source_bigquery_table_schema', None)
    pipeline_parameters.pop('custom_transformations', None) 
    
    (
        tp,
        parameter_values,
    ) = automl_tabular_utils.get_automl_tabular_feature_selection_pipeline_and_parameters(**pipeline_parameters) #automl_tabular_utils.get_automl_tabular_pipeline_and_parameters(**pipeline_parameters)

    with open(pathlib.Path(__file__).parent.resolve().joinpath('automl_tabular_pl_v4.yaml'), 'r') as file:
        configuration = yaml.safe_load(file)

    # can process yaml to change pipeline name
    configuration['pipelineInfo']['name'] = pipeline_name

    _set_enable_caching_value(pipeline_spec=configuration,
                              enable_caching=enable_caching)

    # hydrate pipeline.yaml with parameters as default values
    for k, v in parameter_values.items():
        if k in configuration['root']['inputDefinitions']['parameters']:
            configuration['root']['inputDefinitions']['parameters'][k]['defaultValue'] = v
        else:
            raise Exception("parameter not found in pipeline definition: {}".format(k))

    with open(template_path, 'w') as yaml_file:
        yaml.dump(configuration, yaml_file)
    with open(parameters_path, 'w') as param_file:
        yaml.dump(parameter_values, param_file)

    return template_path, parameter_values