notebooks/utils.py (319 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import List
from typing import Dict
from typing import Any
from typing import Union
from typing import Optional
import gcsfs
import fsspec
from IPython import get_ipython
import sys
import os
import pandas as pd
import time
import subprocess
import tempfile
import tensorflow as tf
from t5.evaluation import eval_utils
from google.cloud import aiplatform as vertex_ai
from google.cloud.aiplatform import Artifact
from google.cloud.aiplatform import Execution
from google.cloud.aiplatform import Model
from google.cloud.aiplatform import CustomJob
import google.auth
from google.auth import credentials as auth_credentials
from google.api_core.client_info import ClientInfo
source_credentials, _ = google.auth.default()
request = google.auth.transport.requests.Request()
source_credentials.refresh(request)
source_credentials.apply(headers = {'user-agent': 'cloud-solutions/t5x-on-vertex-ai-v1.0'})
source_credentials.refresh(request)
# set python path from the current environment
PYTHON_BIN = os.path.join(sys.exec_prefix, 'bin', 'python3')
pd.set_option('display.max_colwidth', None)
def create_t5x_custom_job(
display_name: str,
machine_type: str,
accelerator_type: str,
accelerator_count: int,
image_uri: str,
run_mode: str,
gin_files: List[str],
model_dir: str,
gin_search_paths: Optional[List[str]] = None,
tfds_data_dir: Optional[str] = None,
replica_count: int = 1,
gin_overwrites: Optional[List[str]] = None,
base_output_dir: Optional[str] = None,
) -> CustomJob:
"""Creates a Vertex AI custom T5X training job.
It copies the configuration files (.gin) to GCS, creates a worker_pool_spec
and returns an aiplatform.CustomJob.
Args:
display_name (str):
Required. User defined display name for the Vertex AI custom T5X job.
machine_type (str):
Required. The type of machine for running the custom training job on
dedicated resources. For TPUs, use `cloud-tpu`.
accelerator_type (str):
Required. The type of accelerator(s) that may be attached
to the machine as per `accelerator_count`. Only used if
`machine_type` is set. Options: `TPU_V2` or `TPU_V3`.
accelerator_count (int):
Required. The number of accelerators to attach to the `machine_type`.
Only used if `machine_type` is set. For TPUs, this is the number of
cores to be provisioned.
Example: 8, 128, 512, etc.
image_uri (str):
Required. Full image path to be used as the execution environment of the
custom T5X training job.
Example:
'gcr.io/{PROJECT_ID}/{IMAGE_NAME}'
run_mode (str):
Required. The mode to run T5X under. Options: `train`, `eval`, `infer`.
gin_files (List[str]):
Required. Full path to gin configuration file on local filesystem.
Multiple paths may be passed and will be imported in the given
order, with later configurations overriding earlier ones.
gin_search_paths (List[str]):
List of gin config path prefixes to be prepended to gin suffixes in gin includes and gin_files
model_dir (str):
Required. Path on Google Cloud Storage to store all the artifacts generated
by the custom T5X training job. The path must be in this format:
`gs://{bucket name}/{your folder}/...`.
Example:
gs://my_bucket/experiments/model1/
tfds_data_dir (Optional[str] = None):
Optional. If set, this directory will be used to store datasets prepared by
TensorFlow Datasets that are not available in the public TFDS GCS
bucket. Note that this flag overrides the `tfds_data_dir` attribute of
all Task`s. This path must be a valid GCS path.
Example:
gs://my_bucket/datasets/my_dataset/
replica_count (int = 1):
Optional. The number of worker replicas. If replica count = 1 then one chief
replica will be provisioned. For TPUs this must be set to 1.
gin_overwrites (Optional[List[str]] = None):
Optional. List of arguments to overwrite gin configurations. Argument must be
enclosed in parentheses.
Example:
--gin.TRAIN_PATH=\"gs://my_bucket/folder\"
base_output_dir (Optional[str] = None):
Returns:
(aiplatform.CustomJob):
Return an instance of a Vertex AI training CustomJob.
"""
local_fs = fsspec.filesystem('file')
gcs_fs = gcsfs.GCSFileSystem()
# Check if gin files exists
if not gin_files or not all([local_fs.isfile(f) for f in gin_files]):
raise FileNotFoundError(
'Provide a list of valid gin files.'
)
# Try to copy files to GCS bucket
try:
gcs_gin_files = []
for gin_file in gin_files:
gcs_path = os.path.join(model_dir, gin_file.split(sep='/')[-1])
gcs_fs.put(gin_file, gcs_path)
gcs_gin_files.append(gcs_path.replace('gs://', '/gcs/'))
except:
raise RuntimeError('Could not copy gin files to GCS.')
# Temporary mitigation to address issues with t5x/main.py
# and inference on tfrecord files
if run_mode == 'infer':
args = [
f'--gin.MODEL_DIR="{model_dir}"',
f'--tfds_data_dir={tfds_data_dir}',
]
else:
args = [
f'--run_mode={run_mode}',
f'--gin.MODEL_DIR="{model_dir}"',
f'--tfds_data_dir={tfds_data_dir}',
]
if gin_search_paths:
args.append(f'--gin_search_paths={",".join(gin_search_paths)}')
args += [f'--gin_file={gcs_path}' for gcs_path in gcs_gin_files]
if gin_overwrites:
args += [f'--gin.{overwrite}' for overwrite in gin_overwrites]
container_spec = {
"image_uri": image_uri,
"args": args
}
# Temporary mitigation to address issues with t5x/main.py
# and inference on tfrecord files
if run_mode == 'infer':
container_spec['command'] = ["python", "./t5x/t5x/infer.py"]
worker_pool_specs = [
{
"machine_spec": {
"machine_type": machine_type,
"accelerator_type": accelerator_type,
"accelerator_count": accelerator_count,
},
"replica_count": replica_count,
"container_spec": container_spec,
}
]
job = vertex_ai.CustomJob(
display_name=display_name,
worker_pool_specs=worker_pool_specs,
base_output_dir=base_output_dir
)
return job
def _create_artifacts(
tfds_data_dir: str,
gin_config_path: str,
model_architecture_path: str,
model_dir: str,
execution_name: str,
job_display_name: str,
custom_job: CustomJob,
run_mode: str
):
"""Creates and logs artifacts generated by the Vertex AI custom
T5X job to an experiment run."""
# FSSpec to interact with GCS
gcs_fs = gcsfs.GCSFileSystem()
# Training dataset artifact
try:
dataset_seqio_artifact = vertex_ai.Artifact.create(
schema_title='system.Dataset',
display_name=f'{run_mode}_dataset',
uri=tfds_data_dir
)
except Exception as e:
print(e)
print('Dataset URI not logged to metadata.')
try:
# Gin configuration file
with gcs_fs.open(gin_config_path) as fp:
gin_config_artifact = vertex_ai.Artifact.create(
schema_title='system.Artifact',
display_name=f'{run_mode}_gin_config',
uri=gin_config_path
)
except Exception as e:
print(e)
print('Gin config file not logged to metadata.')
try:
# Model information
with gcs_fs.open(model_architecture_path) as fp:
model_architecture_artifact = vertex_ai.Artifact.create(
schema_title='system.Artifact',
display_name='model_architecture',
uri=model_architecture_path
)
except Exception as e:
print(e)
print('Model information/architecture not logged to metadata.')
try:
# Trained model
trained_model_artifact = vertex_ai.Artifact.create(
schema_title='system.Model',
display_name='trained_model',
uri=model_dir
)
except Exception as e:
print(e)
print('Trained model URI not logged to metadata.')
try:
# Vertex AI training job and model lineage
with vertex_ai.start_execution(
schema_title='system.CustomJobExecution',
display_name=execution_name,
metadata={
'job_display_name': job_display_name,
'job_spec': str(custom_job.job_spec),
'resource_name': custom_job.resource_name
}
) as execution:
execution.assign_input_artifacts([
dataset_seqio_artifact,
gin_config_artifact,
model_architecture_artifact
])
execution.assign_output_artifacts([
trained_model_artifact
])
vertex_ai.log_params({
"lineage": execution.get_output_artifacts()[0].lineage_console_uri
})
except Exception as e:
print(e)
print('Model lineage not logged to metadata.')
def get_all_experiment_run_directories(experiment_name):
""" Fetch run ids and run directories for a given experiment
"""
try:
# Get list of all experiment runs for a given experiment
runs = vertex_ai.ExperimentRun.list(experiment=experiment_name)
# Get run id and run directory for each run
run_details = []
for run in runs:
# Create run instance
run = vertex_ai.ExperimentRun(experiment=experiment_name, run_name=run.name)
# Fetch artifacts for the run
run_artifacts = run.get_artifacts()
run_dir = None
if len(run_artifacts) > 0:
# fetch run directory
run_dir = [artifact.uri for artifact in run.get_artifacts()
if artifact.display_name=='trained_model'][0]
run_details.append({'RUN_ID':run.name, 'RUN_DIR':run_dir})
sorted_items = sorted(run_details, key=lambda r: r['RUN_ID'])
# Return results
df = pd.DataFrame(run_details)
return df
except Exception as e:
print(e)
print(f"Cannot fetch runs and run directory for experiment={experiment_name}")
print("Please check the experiment name.")
def submit_and_track_t5x_vertex_job(
custom_job: vertex_ai.CustomJob,
job_display_name: str,
run_name: str,
experiment_name: str,
execution_name: str,
model_dir: str,
vertex_ai: vertex_ai,
run_mode: str,
tfds_data_dir: str = ''
):
"""Submits a custom T5X Vertex AI training job and
tracks the execution and metadata of it.
Args:
<arg_name> <type> < = default>:
Required|Optional. <description>
Example:
custom_job (aiplatform.CustomJob):
Required. Instance of aiplatform.CustomJob with all the configurations
to run a custom T5X Vertex AI training job. This custom_job
can be generated with the function `create_t5x_custom_job`
job_display_name (str):
Required. The user-defined name of the aiplatform.CustomJob.
run_name (str):
Required. The user-defined name of the Vertex AI experiment run.
experiment_name (str):
Required. The user-defined name of the Vertex AI experiment.
execution_name (str):
Required. The user-defined name of the execution to be used as
a reference for metadata tracking.
model_dir (str):
Required. Path on Google Cloud Storage to store all the artifacts
generated by the custom T5X training job.
The path must be in this format: gs://{bucket name}/{your folder}/.
Example:
gs://my_bucket/experiments/model1/
vertex_ai (aiplatform):
Required. Instance of google.cloud.aiplatform.
run_mode (str):
Required. The mode to run T5X under. Options: `train`, `eval`, `infer`.
tfds_data_dir (str = ''):
Optional. If set, this directory will be used to store datasets
prepared by TensorFlow Datasets that are not available in the
public TFDS GCS bucket. Note that this flag overrides the
`tfds_data_dir` attribute of all Task's.
"""
# Define paths where configuration files will be generated
gin_config_path = os.path.join(model_dir, 'config.gin')
model_architecture_path = os.path.join(model_dir, 'model-info.txt')
# FSSpec to interact with GCS
gcs_fs = gcsfs.GCSFileSystem()
try:
# List all the runs in the experiment
exp_runs = vertex_ai.ExperimentRun.list(
experiment=experiment_name)
# If run_name, than stop the execution
if run_name in [run.name for run in exp_runs]:
print('Execution stopped.')
print('Run name already exists in this Experiment.')
print('Please provide a different and unique run name.')
return
except Exception as e:
print(e)
print(f'Experiment with name {experiment_name} not found.')
print(f'Please provite a valid experiment name.')
return
# Start Vertex AI custom training job
custom_job.run(sync=False)
custom_job.wait_for_resource_creation()
# Wait for Vertex AI training job to start
while custom_job.state.value == 2:
print('Job still pending. Waiting additional 15 seconds.')
time.sleep(15)
# Job failed
if custom_job.state.value == 5:
print('Execution stopped. Vertex AI training Job has failed.' \
'Check the Vertex AI logs for additional information.')
return
# Job running
if custom_job.state.value == 3:
print('Vertex AI training job has started.')
if run_mode == 'train':
print('Waiting for config files to be generated.')
while not (
gcs_fs.exists(gin_config_path) and
gcs_fs.exists(model_architecture_path)
):
if custom_job.state.value == 5:
print('Execution stopped. Vertex AI training Job has failed.' \
'Check the Vertex AI logs for additional information.')
return
time.sleep(10)
print('Waiting for config files to be generated.')
print('Waiting additional 10 seconds.')
vertex_ai.start_run(run_name)
print('Creating artifacts.')
# Generate metadata artifacts
_create_artifacts(
tfds_data_dir=tfds_data_dir,
gin_config_path=gin_config_path,
model_architecture_path=model_architecture_path,
model_dir=model_dir,
execution_name=execution_name,
job_display_name=job_display_name,
custom_job=custom_job,
run_mode=run_mode)
print('Artifacts were created. Training job is still running.')
vertex_ai.end_run()
def _parse_metrics_from_tb_events(summary_dir, out_file):
""" Parse TensorBoard events files and log results to csv
Adapted from
https://github.com/google-research/text-to-text-transfer-transformer/blob/main/t5/scripts/parse_tb.py
"""
try:
# Reading event directories
subdirs = tf.io.gfile.listdir(summary_dir)
summary_dirs = [os.path.join(summary_dir, d.rstrip('/')) for d in subdirs]
# Parsing event files
scores = None
for d in summary_dirs:
events = eval_utils.parse_events_files(d, True)
task_metrics = eval_utils.get_eval_metric_values(
events,
task_name=os.path.basename(d))
if scores:
scores.update(task_metrics)
else:
scores = task_metrics
if not scores:
print(f"No evaluation events found in {summary_dir}")
# Computing and formatting metrics
df = eval_utils.scores_to_df(scores)
df = eval_utils.compute_avg_glue(df)
df = eval_utils.sort_columns(df)
# Writing metrics to output file
eval_utils.log_csv(df, output_file=out_file)
except Exception as e:
print(e)
print('Failed to parse TensorBoard event files')
def parse_and_log_eval_metrics(
summary_dir: str,
run_name: str,
vertex_ai: vertex_ai
) -> pd.DataFrame:
"""Parses the evaluation metrics of a trained model.
Args:
summary_dir (str):
Required. Full path to a local filesystem folder with the files with
the evaluation metrics generated during training.
Example:
`/inference_eval`
out_file (str):
Required. Full path to the file that will be generated in this function.
Example:
`/folder1/results.csv`
run_name (str):
Required. The user-defined name of the Vertex AI experiment run.
vertex_ai (aiplatform):
Required. Instance of google.cloud.aiplatform.
Returns:
(pandas.DataFrame):
Returns the parsed results (evaluation metrics) from the
trained model.
"""
try:
# create temporary file to stage results
out_file = tempfile.NamedTemporaryFile()
# Parse evaluation metrics
_parse_metrics_from_tb_events(summary_dir, out_file.name)
# Log metrics to Vertex AI Experiments
results = pd.read_csv(out_file.name, sep=',')
with vertex_ai.start_run(run_name, resume=True) as my_run:
metrics = {}
for k, v in results[-2:].drop('step', axis=1).to_dict().items():
metrics[k + ': max, step'] = ', '.join([str(v) for v in v.values()])
my_run.log_metrics(metrics)
# Remove output file after parsing
out_file.close()
except Exception as e:
print(e)
print('Metrics were not logged to metadata.')
return
else:
return results
def create_tensorboard(
instance_name: str,
region: str,
project_id: str
) -> str:
"""Creates a managed Vertex AI tensorboard instance.
Args:
instance_name (str):
Required. The use-defined name of the managed Vertex AI
Tensorboard instance.
region (str):
Required. Region where the managed Vertex AI Tensorboard instance
will be created.
project_id (str):
Required. Project ID where the managed Vertex AI Tensorboard instance
will be created.
Returns:
(str):
Returns the full ID of the managed Vertex AI Tensorboad.
"""
process = subprocess.run(
[
'gcloud',
'ai',
'tensorboards',
'create',
f'--display-name={instance_name}',
f'--region={region}',
f'--project={project_id}'
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True
)
return process.stderr.split(sep=':')[-1].strip().replace('.', '')