in python/pipelines/pipeline_ops.py [0:0]
def compile_automl_tabular_pipeline(
template_path: str,
parameters_path: str,
pipeline_name: str,
pipeline_parameters: Dict[str, Any] = None,
pipeline_parameters_substitutions: Optional[Dict[str, Any]] = None,
exclude_features = List[Any],
enable_caching: bool = True) -> tuple:
"""
Compiles an AutoML Tabular Workflows pipeline. You don't need to define the pipeline elsewhere since the pre-compiled pipeline component is defined in the `automl_tabular_pl_v4.yaml` file.
Args:
template_path: The path to the pipeline template file.
parameters_path: The path to the pipeline parameters file.
pipeline_name: The name of the pipeline.
pipeline_parameters: The parameters to pass to the pipeline. All these possible parameters can be set in the config.yaml.tftpl file, instead of in this file.
additional_experiments: dict
cv_trainer_worker_pool_specs_override: list
data_source_bigquery_table_path: str [Default: '']
data_source_csv_filenames: str [Default: '']
dataflow_service_account: str [Default: '']
dataflow_subnetwork: str [Default: '']
dataflow_use_public_ips: bool [Default: True]
disable_early_stopping: bool [Default: False]
distill_batch_predict_machine_type: str [Default: 'n1-standard-16']
distill_batch_predict_max_replica_count: int [Default: 25.0]
distill_batch_predict_starting_replica_count: int [Default: 25.0]
enable_probabilistic_inference: bool [Default: False]
encryption_spec_key_name: str [Default: '']
evaluation_batch_explain_machine_type: str [Default: 'n1-highmem-8']
evaluation_batch_explain_max_replica_count: int [Default: 10.0]
evaluation_batch_explain_starting_replica_count: int [Default: 10.0]
evaluation_batch_predict_machine_type: str [Default: 'n1-highmem-8']
evaluation_batch_predict_max_replica_count: int [Default: 20.0]
evaluation_batch_predict_starting_replica_count: int [Default: 20.0]
evaluation_dataflow_disk_size_gb: int [Default: 50.0]
evaluation_dataflow_machine_type: str [Default: 'n1-standard-4']
evaluation_dataflow_max_num_workers: int [Default: 100.0]
evaluation_dataflow_starting_num_workers: int [Default: 10.0]
export_additional_model_without_custom_ops: bool [Default: False]
fast_testing: bool [Default: False]
location: str
model_description: str [Default: '']
model_display_name: str [Default: '']
optimization_objective: str
optimization_objective_precision_value: float [Default: -1.0]
optimization_objective_recall_value: float [Default: -1.0]
predefined_split_key: str [Default: '']
prediction_type: str
project: str
quantiles: list
root_dir: str
run_distillation: bool [Default: False]
run_evaluation: bool [Default: False]
stage_1_num_parallel_trials: int [Default: 35.0]
stage_1_tuner_worker_pool_specs_override: list
stage_1_tuning_result_artifact_uri: str [Default: '']
stage_2_num_parallel_trials: int [Default: 35.0]
stage_2_num_selected_trials: int [Default: 5.0]
stats_and_example_gen_dataflow_disk_size_gb: int [Default: 40.0]
stats_and_example_gen_dataflow_machine_type: str [Default: 'n1-standard-16']
stats_and_example_gen_dataflow_max_num_workers: int [Default: 25.0]
stratified_split_key: str [Default: '']
study_spec_parameters_override: list
target_column: str
test_fraction: float [Default: -1.0]
timestamp_split_key: str [Default: '']
train_budget_milli_node_hours: float
training_fraction: float [Default: -1.0]
transform_dataflow_disk_size_gb: int [Default: 40.0]
transform_dataflow_machine_type: str [Default: 'n1-standard-16']
transform_dataflow_max_num_workers: int [Default: 25.0]
transformations: str
validation_fraction: float [Default: -1.0]
vertex_dataset: system.Artifact
weight_column: str [Default: '']
pipeline_parameters_substitutions: A dictionary of substitutions to apply to the pipeline parameters.
exclude_features: A list of features to exclude from the pipeline.
enable_caching: Whether to enable caching for the pipeline.
Returns:
A tuple containing the path to the compiled pipeline template file and the pipeline parameters.
"""
from google_cloud_pipeline_components.preview.automl.tabular import utils as automl_tabular_utils
# This checks if there are any substitutions defined in the pipeline_parameters_substitutions dictionary. If so, it applies these substitutions to the pipeline_parameters dictionary. This allows for using placeholders in the pipeline parameters, making the pipeline more flexible and reusable.
if pipeline_parameters_substitutions != None:
pipeline_parameters = substitute_pipeline_params(
pipeline_parameters, pipeline_parameters_substitutions)
# This section handles the feature transformations for the pipeline. It checks if there is a
# custom_transformations file specified. If so, it reads the transformations from that file.
# Otherwise, it extracts the schema from the BigQuery table and generates automatic transformations based on the schema.
pipeline_parameters['transformations'] = pipeline_parameters['transformations'].format(
timestamp=datetime.now().strftime("%Y%m%d%H%M%S"))
schema = {}
if 'custom_transformations' in pipeline_parameters.keys():
logging.info("Reading from custom features transformations file: {}".format(pipeline_parameters['custom_transformations']))
schema = write_custom_transformations(pipeline_parameters['transformations'],
pipeline_parameters['custom_transformations'])
else:
schema = _extract_schema_from_bigquery(
project=pipeline_parameters['project'],
location=pipeline_parameters['location'],
table_name=pipeline_parameters['data_source_bigquery_table_path'].split('/')[-1],
table_schema=pipeline_parameters['data_source_bigquery_table_schema']
)
# If there is no features to exclude, skip the step of removing columns from the schema
if exclude_features:
for column_to_remove in exclude_features + [
pipeline_parameters['target_column'],
pipeline_parameters['stratified_split_key'],
pipeline_parameters['predefined_split_key'],
pipeline_parameters['timestamp_split_key']
]:
if column_to_remove in schema:
schema.remove(column_to_remove)
logging.info("Writing automatically generated features transformations file: {}".format(pipeline_parameters['transformations']))
write_auto_transformations(pipeline_parameters['transformations'], schema)
logging.info(f'features:{schema}')
# This section compiles the AutoML Tabular Workflows pipeline. It uses the automl_tabular_utils module to
# generate the pipeline components and parameters. It then loads a pre-compiled pipeline template file
# (automl_tabular_pl_v4.yaml) and hydrates it with the generated parameters. Finally, it writes the
# compiled pipeline template and parameters to the specified files.
if pipeline_parameters['predefined_split_key']:
pipeline_parameters['training_fraction'] = None
pipeline_parameters['validation_fraction'] = None
pipeline_parameters['test_fraction'] = None
pipeline_parameters.pop('data_source_bigquery_table_schema', None)
pipeline_parameters.pop('custom_transformations', None)
(
tp,
parameter_values,
) = automl_tabular_utils.get_automl_tabular_feature_selection_pipeline_and_parameters(**pipeline_parameters) #automl_tabular_utils.get_automl_tabular_pipeline_and_parameters(**pipeline_parameters)
with open(pathlib.Path(__file__).parent.resolve().joinpath('automl_tabular_pl_v4.yaml'), 'r') as file:
configuration = yaml.safe_load(file)
# can process yaml to change pipeline name
configuration['pipelineInfo']['name'] = pipeline_name
_set_enable_caching_value(pipeline_spec=configuration,
enable_caching=enable_caching)
# hydrate pipeline.yaml with parameters as default values
for k, v in parameter_values.items():
if k in configuration['root']['inputDefinitions']['parameters']:
configuration['root']['inputDefinitions']['parameters'][k]['defaultValue'] = v
else:
raise Exception("parameter not found in pipeline definition: {}".format(k))
with open(template_path, 'w') as yaml_file:
yaml.dump(configuration, yaml_file)
with open(parameters_path, 'w') as param_file:
yaml.dump(parameter_values, param_file)
return template_path, parameter_values