def deploy_tasks()

in courses/machine_learning/asl/open_project/cloud_composer_automated_ml_pipeline_taxifare/airflow/dags/module/deploy.py [0:0]


def deploy_tasks(model, dag, PROJECT_ID, MODEL_NAME, MODEL_VERSION, MODEL_LOCATION):
  # Constants
  OTHER_VERSION_NAME = "v_{0}".format(datetime.datetime.now().strftime("%Y%m%d%H%M%S")[0:12])

  # Create model on ML-Engine
  bash_ml_engine_models_list_op = BashOperator(
    task_id="bash_ml_engine_models_list_{}_task".format(model.replace(".","_")),
    xcom_push=True,
    bash_command="gcloud ml-engine models list --filter='name:{0}'".format(MODEL_NAME + model.replace(".","_")),
    dag=dag
  )

  def check_if_model_already_exists(templates_dict, **kwargs):
    cur_model = templates_dict["model"].replace(".","_")
    ml_engine_models_list = kwargs["ti"].xcom_pull(task_ids="bash_ml_engine_models_list_{}_task".format(cur_model))
    logging.info("check_if_model_already_exists: {}: ml_engine_models_list = \n{}".format(cur_model, ml_engine_models_list))
    create_model_task = "ml_engine_create_model_{}_task".format(cur_model)
    dont_create_model_task = "dont_create_model_dummy_branch_{}_task".format(cur_model)
    if len(ml_engine_models_list) == 0 or ml_engine_models_list == "Listed 0 items.":
      return create_model_task
    return dont_create_model_task

  check_if_model_already_exists_op = BranchPythonOperator(
    task_id="check_if_model_already_exists_{}_task".format(model.replace(".","_")),
    templates_dict={"model": model.replace(".","_")},
    python_callable=check_if_model_already_exists,
    provide_context=True,
    dag=dag
  )

  ml_engine_create_model_op = MLEngineModelOperator(
    task_id="ml_engine_create_model_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID, 
    model={"name": MODEL_NAME + model.replace(".","_")}, 
    operation="create",
    dag=dag
  )

  create_model_dummy_op = DummyOperator(
    task_id="create_model_dummy_{}_task".format(model.replace(".","_")),
    trigger_rule="all_done",
    dag=dag
  )

  dont_create_model_dummy_branch_op = DummyOperator(
    task_id="dont_create_model_dummy_branch_{}_task".format(model.replace(".","_")),
    dag=dag
  )

  dont_create_model_dummy_op = DummyOperator(
    task_id="dont_create_model_dummy_{}_task".format(model.replace(".","_")),
    trigger_rule="all_done",
    dag=dag
  )

  # Create version of model on ML-Engine
  bash_ml_engine_versions_list_op = BashOperator(
    task_id="bash_ml_engine_versions_list_{}_task".format(model.replace(".","_")),
    xcom_push=True,
    bash_command="gcloud ml-engine versions list --model {0} --filter='name:{1}'".format(MODEL_NAME + model.replace(".","_"), MODEL_VERSION),
    dag=dag
  )

  def check_if_model_version_already_exists(templates_dict, **kwargs):
    cur_model = templates_dict["model"].replace(".","_")
    ml_engine_versions_list = kwargs["ti"].xcom_pull(task_ids="bash_ml_engine_versions_list_{}_task".format(cur_model))
    logging.info("check_if_model_version_already_exists: {}: ml_engine_versions_list = \n{}".format(cur_model, ml_engine_versions_list))
    create_version_task = "ml_engine_create_version_{}_task".format(cur_model)
    create_other_version_task = "ml_engine_create_other_version_{}_task".format(cur_model)
    if len(ml_engine_versions_list) == 0 or ml_engine_versions_list == "Listed 0 items.":
      return create_version_task
    return create_other_version_task

  check_if_model_version_already_exists_op = BranchPythonOperator(
    task_id="check_if_model_version_already_exists_{}_task".format(model.replace(".","_")), 
    templates_dict={"model": model.replace(".","_")},
    python_callable=check_if_model_version_already_exists,
    provide_context=True,
    dag=dag
  )

  ml_engine_create_version_op = MLEngineVersionOperator(
    task_id="ml_engine_create_version_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID, 
    model_name=MODEL_NAME + model.replace(".","_"), 
    version_name=MODEL_VERSION, 
    version={
      "name": MODEL_VERSION,
      "deploymentUri": MODEL_LOCATION + model.replace(".","_"),
      "runtimeVersion": "1.13",
      "framework": "TENSORFLOW",
      "pythonVersion": "3.5",
    },
    operation="create",
    dag=dag
  )

  ml_engine_create_other_version_op = MLEngineVersionOperator(
    task_id="ml_engine_create_other_version_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID, 
    model_name=MODEL_NAME + model.replace(".","_"), 
    version_name=OTHER_VERSION_NAME, 
    version={
      "name": OTHER_VERSION_NAME,
      "deploymentUri": MODEL_LOCATION + model.replace(".","_"),
      "runtimeVersion": "1.13",
      "framework": "TENSORFLOW",
      "pythonVersion": "3.5",
    },
    operation="create",
    dag=dag
  )

  ml_engine_set_default_version_op = MLEngineVersionOperator(
    task_id="ml_engine_set_default_version_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID, 
    model_name=MODEL_NAME + model.replace(".","_"), 
    version_name=MODEL_VERSION, 
    version={"name": MODEL_VERSION}, 
    operation="set_default",
    dag=dag
  )

  ml_engine_set_default_other_version_op = MLEngineVersionOperator(
    task_id="ml_engine_set_default_other_version_{}_task".format(model.replace(".","_")),
    project_id=PROJECT_ID, 
    model_name=MODEL_NAME + model.replace(".","_"), 
    version_name=OTHER_VERSION_NAME, 
    version={"name": OTHER_VERSION_NAME}, 
    operation="set_default",
    dag=dag
  )
  
  # Build dependency graph, set_upstream dependencies for all tasks
  check_if_model_already_exists_op.set_upstream(bash_ml_engine_models_list_op)

  ml_engine_create_model_op.set_upstream(check_if_model_already_exists_op)
  create_model_dummy_op.set_upstream(ml_engine_create_model_op)
  dont_create_model_dummy_branch_op.set_upstream(check_if_model_already_exists_op)
  dont_create_model_dummy_op.set_upstream(dont_create_model_dummy_branch_op)

  bash_ml_engine_versions_list_op.set_upstream([dont_create_model_dummy_op, create_model_dummy_op])
  check_if_model_version_already_exists_op.set_upstream(bash_ml_engine_versions_list_op)

  ml_engine_set_default_version_op.set_upstream(ml_engine_create_version_op)
  ml_engine_set_default_other_version_op.set_upstream(ml_engine_create_other_version_op)
  
  return (bash_ml_engine_models_list_op,
          check_if_model_version_already_exists_op,
          ml_engine_create_version_op,
          ml_engine_create_other_version_op)