GenAIFlow.py (68 lines of code) (raw):
import json
import os
import google_crc32c
from google.cloud import secretmanager
from metaflow import FlowSpec, Parameter
import argparse
# Internal prefix for storing Metaflow parmeters
MF_ARG_PREFIX = "mf_arg_"
MF_ARG_MULTI_PREFIX = "mf_multiarg_"
"""
This file has utility functions to make it easy to convert a command line-training job
into a Metaflow job. A flow should subclass GenAIFlow.
Note that this utility will be moved to a common repo at some point.
"""
def is_true_flag_action(action):
return action.const is True and action.default is False and action.type == None
def is_optional_int_value(action):
return action.type == int and action.default is None
class GenAIFlow(FlowSpec):
@classmethod
def import_argparse_to_params(cls, parser):
"""
Given an argument parser class using the standard python argparse library, construct
similar Metaflow parameters for the flow that can be triggered in the Metaflow command line
or a provider UI (Such as Outerbounds). Arguments are have arg_ prefix added to them in the
metaflow UI.
Multiple input arguments must be submitted as comma separated strings
Supports most typical arguments and true flags (default if not set).
"""
for action in parser._actions:
if action.default != argparse.SUPPRESS:
if is_true_flag_action(action):
setattr(cls, MF_ARG_PREFIX + action.dest, Parameter("arg_" + action.dest,
type=bool,
help=f"{action.help}",
default=False))
elif is_optional_int_value(action):
setattr(cls, MF_ARG_PREFIX + action.dest, Parameter("arg_" + action.dest,
type=bool,
help=f"{action.help}",
default=False))
elif action.nargs in ["*", "+"]:
setattr(cls, MF_ARG_MULTI_PREFIX + action.dest, Parameter("arg_" + action.dest,
type=action.type or str,
help=f"{action.help} {(','.join(action.choices)) if action.choices is not None else ''}",
default=action.default))
else:
setattr(cls, MF_ARG_PREFIX + action.dest, Parameter("arg_" + action.dest,
type=action.type or str,
help=f"{action.help} {(','.join(action.choices)) if action.choices is not None else ''}",
default=action.default))
def params_to_args(self):
"""
Converts the metaflow arguments (passed to a metaflow job) to commandline arguments
to be interpreted by legacy argparse command line code.
This should be called at the class level. Note that '-' arg names aren't supported
in Metaflow and are automatically converted to '_' in the Metaflow UI. They are
converted back to '-' here.
Also note that 0 value params are omitted, so items with non-zero defaults are not
fully supported in some situations.
"""
args = []
for name, value in self.__class__.__dict__.items():
if name.startswith(MF_ARG_MULTI_PREFIX):
vv = getattr(self, name)
if vv not in (None, 0, '', False):
args.append(f"--{name[len(MF_ARG_MULTI_PREFIX):].replace('_', '-')}")
items = str(vv).split(",")
for item in items:
args.append(item)
if name.startswith(MF_ARG_PREFIX):
vv = getattr(self, name)
if vv not in (None, 0, '', False):
args.append(f"--{name[len(MF_ARG_PREFIX):].replace('_', '-')}")
if str(vv) != 'True':
args.append(str(vv))
return args
def get_project_id(self):
"""
Returns what GCP project we are running on
"""
return "moz-fx-mozsoc-ml-nonprod"
def load_secret(self, secret_id: str) -> str:
"""
Utility function to load a secret from GCP
"""
client = secretmanager.SecretManagerServiceClient()
secret_path = client.secret_version_path(self.get_project_id(), secret_id, "latest")
response = client.access_secret_version(request={"name": secret_path})
crc32c = google_crc32c.Checksum()
crc32c.update(response.payload.data)
if response.payload.data_crc32c != int(crc32c.hexdigest(), 16):
raise Exception(f"Secret CRC Corrupted in project {self.get_project_id()} and path {secret_path}")
return response.payload.data.decode("UTF-8")
def load_remote_env(self):
""""
Load secrets for low access service accounts. This provides secrets to use OpenAI
and write W&B artifacts
"""
print("Loading common secrets from GCP")
json_secrets = ['metaflow-job-secrets']
for secret_id in json_secrets:
raw_env = self.load_secret(secret_id)
envs = json.loads(raw_env)
for k, v in envs.items():
os.environ[k.upper()] = v