retail/recommendation-system/bqml-scann/tfx_pipeline/runner.py (76 lines of code) (raw):
# Copyright 2020 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""KFP runner"""
import kfp
from kfp import gcp
from tfx.orchestration import data_types
from tfx.orchestration.kubeflow import kubeflow_dag_runner
from typing import Optional, Dict, List, Text
import config
import pipeline
if __name__ == '__main__':
# Set the values for the compile time parameters.
ai_platform_training_args = {
'project': config.PROJECT_ID,
'region': config.REGION,
'masterConfig': {
'imageUri': config.ML_IMAGE_URI
}
}
beam_pipeline_args = [
f'--runner={config.BEAM_RUNNER}',
'--experiments=shuffle_mode=auto',
f'--project={config.PROJECT_ID}',
f'--temp_location={config.ARTIFACT_STORE_URI}/beam/tmp',
f'--region={config.REGION}',
]
# Set the default values for the pipeline runtime parameters.
min_item_frequency = data_types.RuntimeParameter(
name='min-item-frequency',
default=15,
ptype=int
)
max_group_size = data_types.RuntimeParameter(
name='max_group_size',
default=100,
ptype=int
)
dimensions = data_types.RuntimeParameter(
name='dimensions',
default=50,
ptype=int
)
num_leaves = data_types.RuntimeParameter(
name='num-leaves',
default=0,
ptype=int
)
eval_min_recall = data_types.RuntimeParameter(
name='eval-min-recall',
default=0.8,
ptype=float
)
eval_max_latency = data_types.RuntimeParameter(
name='eval-max-latency',
default=0.01,
ptype=float
)
pipeline_root = f'{config.ARTIFACT_STORE_URI}/{config.PIPELINE_NAME}/{kfp.dsl.RUN_ID_PLACEHOLDER}'
# Set KubeflowDagRunner settings
metadata_config = kubeflow_dag_runner.get_default_kubeflow_metadata_config()
runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
kubeflow_metadata_config = metadata_config,
pipeline_operator_funcs = kubeflow_dag_runner.get_default_pipeline_operator_funcs(
config.USE_KFP_SA == 'True'),
tfx_image=config.ML_IMAGE_URI
)
# Compile the pipeline
kubeflow_dag_runner.KubeflowDagRunner(config=runner_config).run(
pipeline.create_pipeline(
pipeline_name=config.PIPELINE_NAME,
pipeline_root=pipeline_root,
project_id=config.PROJECT_ID,
bq_dataset_name=config.BQ_DATASET_NAME,
min_item_frequency=min_item_frequency,
max_group_size=max_group_size,
dimensions=dimensions,
num_leaves=num_leaves,
eval_min_recall=eval_min_recall,
eval_max_latency=eval_max_latency,
ai_platform_training_args=ai_platform_training_args,
beam_pipeline_args=beam_pipeline_args,
model_regisrty_uri=config.MODEL_REGISTRY_URI)
)