def train_and_evaluate()

in tutorials/tensorflow/mlflow_gcp/trainer/task.py [0:0]


def train_and_evaluate(args):
    """Trains and evaluates the Keras model.

    Uses the Keras model defined in model.py and trains on data loaded and
    preprocessed in utils.py. Saves the trained model in TensorFlow SavedModel
    format to the path defined in part by the --job-dir argument.

    History objects returns:
        {'loss': [0.5699903990809373, 0.3629718415849791],
         'acc': [0.78604823, 0.8331693],
         'val_loss': [0.3966572880744934, 0.3477487564086914],
         'val_acc': [0.8278044, 0.8281116],
         'lr': [0.02, 0.015]}
    Args:
      args: dictionary of arguments - see get_args() for details
    """
    logging.info('Resume training: {}'.format(args.reuse_job_dir))
    if not args.reuse_job_dir:
        if tf.io.gfile.exists(args.job_dir):
            tf.io.gfile.rmtree(args.job_dir)
            logging.info(
                'Deleted job_dir {} to avoid re-use'.format(args.job_dir))
    else:
        logging.info('Reusing job_dir {} if it exists'.format(args.job_dir))

    train_x, train_y, eval_x, eval_y = utils.load_data(args.train_files,
                                                       args.eval_files)
    # dimensions
    num_train_examples, input_dim = train_x.shape
    num_eval_examples = eval_x.shape[0]

    # Create the Keras Model
    keras_model = model.create_keras_model(
        input_dim=input_dim, learning_rate=args.learning_rate)

    # Pass a numpy array by passing DataFrame.values
    training_dataset = model.input_fn(
        features=train_x.values,
        labels=train_y,
        shuffle=True,
        num_epochs=args.num_epochs,
        batch_size=args.batch_size)

    # Pass a numpy array by passing DataFrame.values
    validation_dataset = model.input_fn(
        features=eval_x.values,
        labels=eval_y,
        shuffle=False,
        num_epochs=args.num_epochs,
        batch_size=num_eval_examples)

    start_time = time()
    # Set MLflow tracking URI
    if args.mlflow_tracking_uri:
        mlflow.set_tracking_uri(args.mlflow_tracking_uri)
    # Train model
    with mlflow.start_run() as active_run:
        run_id = active_run.info.run_id

        # Callbacks
        class MlflowCallback(tf.keras.callbacks.Callback):
            # This function will be called after training completes.
            def on_train_end(self, logs=None):
                mlflow.log_param('num_layers', len(self.model.layers))
                mlflow.log_param('optimizer_name',
                                 type(self.model.optimizer).__name__)
        # MLflow callback
        mlflow_callback = MlflowCallback()
        # Setup Learning Rate decay callback.
        lr_decay_callback = tf.keras.callbacks.LearningRateScheduler(
            lambda epoch: args.learning_rate + 0.02 * (0.5 ** (1 + epoch)),
            verbose=False)
        # Setup TensorBoard callback.
        tensorboard_path = os.path.join(args.job_dir, run_id, 'tensorboard')
        tensorboard_callback = tf.keras.callbacks.TensorBoard(
            tensorboard_path,
            histogram_freq=1)

        history = keras_model.fit(
            training_dataset,
            steps_per_epoch=int(num_train_examples / args.batch_size),
            epochs=args.num_epochs,
            validation_data=validation_dataset,
            validation_steps=args.eval_steps,
            verbose=1,
            callbacks=[lr_decay_callback, tensorboard_callback,
                       mlflow_callback])
        metrics = history.history
        logging.info(metrics)
        keras_model.summary()
        mlflow.log_param('train_files', args.train_files)
        mlflow.log_param('eval_files', args.eval_files)
        mlflow.log_param('num_epochs', args.num_epochs)
        mlflow.log_param('batch_size', args.batch_size)
        mlflow.log_param('learning_rate', args.learning_rate)
        mlflow.log_param('train_samples', num_train_examples)
        mlflow.log_param('eval_samples', num_eval_examples)
        mlflow.log_param('eval_steps', args.eval_steps)
        mlflow.log_param('steps_per_epoch',
                         int(num_train_examples / args.batch_size))
        # Add metrics
        _mlflow_log_metrics(metrics, 'loss')
        _mlflow_log_metrics(metrics, 'acc')
        _mlflow_log_metrics(metrics, 'val_loss')
        _mlflow_log_metrics(metrics, 'val_acc')
        _mlflow_log_metrics(metrics, 'lr')
        # Export SavedModel
        model_local_path = os.path.join(args.job_dir, run_id, 'model')
        tf.keras.experimental.export_saved_model(keras_model, model_local_path)
        # Define artifacts.
        logging.info('Model exported to: {}'.format(model_local_path))
        # MLflow workaround since is unable to read GCS path.
        # https://github.com/mlflow/mlflow/issues/1765
        if model_local_path.startswith('gs://'):
            logging.info('Creating temp folder')
            temp = tempfile.mkdtemp()
            model_deployment.copy_artifacts(model_local_path, temp)
            model_local_path = os.path.join(temp, 'model')

        if tensorboard_path.startswith('gs://'):
            logging.info('Creating temp folder')
            temp = tempfile.mkdtemp()
            model_deployment.copy_artifacts(tensorboard_path, temp)
            tensorboard_path = temp

        mlflow.tensorflow.log_model(tf_saved_model_dir=model_local_path,
                                    tf_meta_graph_tags=[tag_constants.SERVING],
                                    tf_signature_def_key='serving_default',
                                    artifact_path='model')
        # Reloading the model
        if args.model_reload:
            mlflow.pyfunc.load_model(mlflow.get_artifact_uri('model'))

        logging.info('Uploading TensorFlow events as a run artifact.')
        mlflow.log_artifacts(tensorboard_path)
        logging.info(
            'Launch TensorBoard with:\n\ntensorboard --logdir=%s' %
            tensorboard_path)
        duration = time() - start_time
        mlflow.log_metric('duration', duration)
        mlflow.end_run()
        if model_local_path.startswith('gs://') and tensorboard_path.startswith(
            'gs://'):
            shutil.rmtree(model_local_path)
            shutil.rmtree(tensorboard_path)

    # Deploy model to AI Platform.
    if args.deploy_gcp:
        # Create AI Platform helper instance.
        if not args.project_id:
            raise ValueError('No Project is defined')
        if not args.gcs_bucket:
            raise ValueError('No GCS bucket')
        model_helper = model_deployment.AIPlatformModel(
            project_id=args.project_id)
        # Copy local model to GCS for deployment.
        if not model_local_path.startswith('gs://'):
            model_gcs_path = os.path.join('gs://', args.gcs_bucket, run_id,
                                          'model')
            model_deployment.copy_artifacts(model_local_path, model_gcs_path)
        # Create model
        model_helper.create_model(args.model_name)
        # Create model version
        model_helper.deploy_model(model_gcs_path, args.model_name, run_id,
                                  args.run_time_version)
        logging.info('Model deployment in GCP completed')
    logging.info(
        'This model took: {} seconds to train and test.'.format(duration))