dags/xgboost-ml-pipeline/1.10/mwaa-customer-churn-dag.py [99:196]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
sess = hook.get_session(region_name=config.REGION_NAME) #how is this session different from the SageMaker session - necessary?
sagemaker_role = get_sagemaker_role_arn(config.SAGEMAKER_ROLE_NAME, config.REGION_NAME)
container = get_image_uri(sess.region_name, "xgboost")

# initialize training hyperparameters
hyperparameters = {
        "max_depth":"5",
        "eta":"0.2",
        "gamma":"4",
        "min_child_weight":"6",
        "subsample":"0.8",
        "objective":"binary:logistic",
        "num_round":"100"}

# create estimator
xgb_estimator = Estimator(
    image_name=container, 
    hyperparameters=hyperparameters,
    role=sagemaker_role,
    sagemaker_session=sagemaker.session.Session(sess),
    train_instance_count=1, 
    train_instance_type='ml.m5.4xlarge', 
    train_volume_size=5,
    output_path=config.SAGEMAKER_MODEL_S3_DEST
)

# create training inputs
sagemaker_taining_job_name=config.SAGEMAKER_TRAINING_JOB_NAME_PREFIX+'-{}'.format(guid)
sagemaker_training_data = s3_input(config.SAGEMAKER_TRAINING_DATA_S3_SOURCE, content_type=config.SAGEMAKER_CONTENT_TYPE)
sagemaker_validation_data = s3_input(config.SAGEMAKER_VALIDATION_DATA_S3_SOURCE, content_type=config.SAGEMAKER_CONTENT_TYPE)
sagemaker_training_inputs = {'train': sagemaker_training_data,
          'validation': sagemaker_validation_data}

# train_config specifies SageMaker training configuration
training_config = training_config(
  estimator=xgb_estimator, 
  inputs = sagemaker_training_inputs,
  job_name=sagemaker_taining_job_name
)

sagemaker_model_name=config.SAGEMAKER_MODEL_NAME_PREFIX+'-{}'.format(guid)
sagemaker_endpoint_name=config.SAGEMAKER_ENDPOINT_NAME_PREFIX+'-{}'.format(guid)

# endpoint_config specifies SageMaker endpoint configuration
endpoint_config = deploy_config_from_estimator(
  estimator=xgb_estimator, 
  task_id="train", 
  task_type="training", 
  initial_instance_count=1, 
  instance_type="ml.m4.xlarge",
  model_name=sagemaker_model_name,
  endpoint_name=sagemaker_endpoint_name
)

# =============================================================================
# define airflow DAG and tasks
# =============================================================================

# define airflow DAG

args = {"owner": "airflow", "start_date": airflow.utils.dates.days_ago(2), 'depends_on_past': False}

with DAG(
    dag_id=config.AIRFLOW_DAG_ID,
    default_args=args,
    start_date=days_ago(2),
    schedule_interval=None,
    concurrency=1,
    max_active_runs=1,
) as dag:
    process_task = PythonOperator(
      task_id="process",
      dag=dag,
      #provide_context=False,
      python_callable=preprocess_glue,
    )

    train_task = SageMakerTrainingOperator(
      task_id = "train",
      config = training_config,
      aws_conn_id = "airflow-sagemaker",
      wait_for_completion = True,
      check_interval = 60, #check status of the job every minute
      max_ingestion_time = None, #allow training job to run as long as it needs, change for early stop
    )

    endpoint_deploy_task = SageMakerEndpointOperator(
      task_id = "endpoint-deploy",
      config = endpoint_config,
      aws_conn_id = "sagemaker-airflow",
      wait_for_completion = True,
      check_interval = 60, #check status of endpoint deployment every minute
      max_ingestion_time = None,
      operation = 'create', #change to update if you are updating rather than creating an endpoint
    )

    # set the dependencies between tasks
    process_task >> train_task >> endpoint_deploy_task
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



dags/xgboost-ml-pipeline/2.0/mwaa-customer-churn-dag.py [99:196]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
sess = hook.get_session(region_name=config.REGION_NAME) #how is this session different from the SageMaker session - necessary?
sagemaker_role = get_sagemaker_role_arn(config.SAGEMAKER_ROLE_NAME, config.REGION_NAME)
container = get_image_uri(sess.region_name, "xgboost")

# initialize training hyperparameters
hyperparameters = {
        "max_depth":"5",
        "eta":"0.2",
        "gamma":"4",
        "min_child_weight":"6",
        "subsample":"0.8",
        "objective":"binary:logistic",
        "num_round":"100"}

# create estimator
xgb_estimator = Estimator(
    image_name=container, 
    hyperparameters=hyperparameters,
    role=sagemaker_role,
    sagemaker_session=sagemaker.session.Session(sess),
    train_instance_count=1, 
    train_instance_type='ml.m5.4xlarge', 
    train_volume_size=5,
    output_path=config.SAGEMAKER_MODEL_S3_DEST
)

# create training inputs
sagemaker_taining_job_name=config.SAGEMAKER_TRAINING_JOB_NAME_PREFIX+'-{}'.format(guid)
sagemaker_training_data = s3_input(config.SAGEMAKER_TRAINING_DATA_S3_SOURCE, content_type=config.SAGEMAKER_CONTENT_TYPE)
sagemaker_validation_data = s3_input(config.SAGEMAKER_VALIDATION_DATA_S3_SOURCE, content_type=config.SAGEMAKER_CONTENT_TYPE)
sagemaker_training_inputs = {'train': sagemaker_training_data,
          'validation': sagemaker_validation_data}

# train_config specifies SageMaker training configuration
training_config = training_config(
  estimator=xgb_estimator, 
  inputs = sagemaker_training_inputs,
  job_name=sagemaker_taining_job_name
)

sagemaker_model_name=config.SAGEMAKER_MODEL_NAME_PREFIX+'-{}'.format(guid)
sagemaker_endpoint_name=config.SAGEMAKER_ENDPOINT_NAME_PREFIX+'-{}'.format(guid)

# endpoint_config specifies SageMaker endpoint configuration
endpoint_config = deploy_config_from_estimator(
  estimator=xgb_estimator, 
  task_id="train", 
  task_type="training", 
  initial_instance_count=1, 
  instance_type="ml.m4.xlarge",
  model_name=sagemaker_model_name,
  endpoint_name=sagemaker_endpoint_name
)

# =============================================================================
# define airflow DAG and tasks
# =============================================================================

# define airflow DAG

args = {"owner": "airflow", "start_date": airflow.utils.dates.days_ago(2), 'depends_on_past': False}

with DAG(
    dag_id=config.AIRFLOW_DAG_ID,
    default_args=args,
    start_date=days_ago(2),
    schedule_interval=None,
    concurrency=1,
    max_active_runs=1,
) as dag:
    process_task = PythonOperator(
      task_id="process",
      dag=dag,
      #provide_context=False,
      python_callable=preprocess_glue,
    )

    train_task = SageMakerTrainingOperator(
      task_id = "train",
      config = training_config,
      aws_conn_id = "airflow-sagemaker",
      wait_for_completion = True,
      check_interval = 60, #check status of the job every minute
      max_ingestion_time = None, #allow training job to run as long as it needs, change for early stop
    )

    endpoint_deploy_task = SageMakerEndpointOperator(
      task_id = "endpoint-deploy",
      config = endpoint_config,
      aws_conn_id = "sagemaker-airflow",
      wait_for_completion = True,
      check_interval = 60, #check status of endpoint deployment every minute
      max_ingestion_time = None,
      operation = 'create', #change to update if you are updating rather than creating an endpoint
    )

    # set the dependencies between tasks
    process_task >> train_task >> endpoint_deploy_task
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



