metaflow/metaflow_config.py (318 lines of code) (raw):

import os import sys import types from metaflow.exception import MetaflowException from metaflow.metaflow_config_funcs import from_conf, get_validate_choice_fn # Disable multithreading security on MacOS if sys.platform == "darwin": os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES" ## NOTE: Just like Click's auto_envar_prefix `METAFLOW` (see in cli.py), all environment ## variables here are also named METAFLOW_XXX. So, for example, in the statement: ## `DEFAULT_DATASTORE = from_conf("DEFAULT_DATASTORE", "local")`, to override the default ## value, either set `METAFLOW_DEFAULT_DATASTORE` in your configuration file or set ## an environment variable called `METAFLOW_DEFAULT_DATASTORE` ## # Constants (NOTE: these need to live before any from_conf) ## # Path to the local directory to store artifacts for 'local' datastore. DATASTORE_LOCAL_DIR = ".metaflow" # Local configuration file (in .metaflow) containing overrides per-project LOCAL_CONFIG_FILE = "config.json" ### # Default configuration ### DEFAULT_DATASTORE = from_conf("DEFAULT_DATASTORE", "local") DEFAULT_ENVIRONMENT = from_conf("DEFAULT_ENVIRONMENT", "local") DEFAULT_EVENT_LOGGER = from_conf("DEFAULT_EVENT_LOGGER", "nullSidecarLogger") DEFAULT_METADATA = from_conf("DEFAULT_METADATA", "local") DEFAULT_MONITOR = from_conf("DEFAULT_MONITOR", "nullSidecarMonitor") DEFAULT_PACKAGE_SUFFIXES = from_conf("DEFAULT_PACKAGE_SUFFIXES", ".py,.R,.RDS") DEFAULT_AWS_CLIENT_PROVIDER = from_conf("DEFAULT_AWS_CLIENT_PROVIDER", "boto3") DEFAULT_AZURE_CLIENT_PROVIDER = from_conf( "DEFAULT_AZURE_CLIENT_PROVIDER", "azure-default" ) DEFAULT_GCP_CLIENT_PROVIDER = from_conf("DEFAULT_GCP_CLIENT_PROVIDER", "gcp-default") DEFAULT_SECRETS_BACKEND_TYPE = from_conf("DEFAULT_SECRETS_BACKEND_TYPE") DEFAULT_SECRETS_ROLE = from_conf("DEFAULT_SECRETS_ROLE") DEFAULT_FROM_DEPLOYMENT_IMPL = from_conf( "DEFAULT_FROM_DEPLOYMENT_IMPL", "argo-workflows" ) ### # User configuration ### USER = from_conf("USER") ### # Datastore configuration ### DATASTORE_SYSROOT_LOCAL = from_conf("DATASTORE_SYSROOT_LOCAL") # S3 bucket and prefix to store artifacts for 's3' datastore. DATASTORE_SYSROOT_S3 = from_conf("DATASTORE_SYSROOT_S3") # Azure Blob Storage container and blob prefix DATASTORE_SYSROOT_AZURE = from_conf("DATASTORE_SYSROOT_AZURE") DATASTORE_SYSROOT_GS = from_conf("DATASTORE_SYSROOT_GS") # GS bucket and prefix to store artifacts for 'gs' datastore ### # Datastore local cache ### # Path to the client cache CLIENT_CACHE_PATH = from_conf("CLIENT_CACHE_PATH", "/tmp/metaflow_client") # Maximum size (in bytes) of the cache CLIENT_CACHE_MAX_SIZE = from_conf("CLIENT_CACHE_MAX_SIZE", 10000) # Maximum number of cached Flow and TaskDatastores in the cache CLIENT_CACHE_MAX_FLOWDATASTORE_COUNT = from_conf( "CLIENT_CACHE_MAX_FLOWDATASTORE_COUNT", 50 ) CLIENT_CACHE_MAX_TASKDATASTORE_COUNT = from_conf( "CLIENT_CACHE_MAX_TASKDATASTORE_COUNT", CLIENT_CACHE_MAX_FLOWDATASTORE_COUNT * 100 ) ### # Datatools (S3) configuration ### S3_ENDPOINT_URL = from_conf("S3_ENDPOINT_URL") S3_VERIFY_CERTIFICATE = from_conf("S3_VERIFY_CERTIFICATE") # Set ServerSideEncryption for S3 uploads S3_SERVER_SIDE_ENCRYPTION = from_conf("S3_SERVER_SIDE_ENCRYPTION") # S3 retry configuration # This is useful if you want to "fail fast" on S3 operations; use with caution # though as this may increase failures. Note that this is the number of *retries* # so setting it to 0 means each operation will be tried once. S3_RETRY_COUNT = from_conf("S3_RETRY_COUNT", 7) # Number of concurrent S3 processes for parallel operations. S3_WORKER_COUNT = from_conf("S3_WORKER_COUNT", 64) # Number of retries on *transient* failures (such as SlowDown errors). Note # that if after S3_TRANSIENT_RETRY_COUNT times, all operations haven't been done, # it will try up to S3_RETRY_COUNT again so the total number of tries can be up to # (S3_RETRY_COUNT + 1) * (S3_TRANSIENT_RETRY_COUNT + 1) # You typically want this number fairly high as transient retires are "cheap" (only # operations that have not succeeded retry as opposed to all operations for the # top-level retries) S3_TRANSIENT_RETRY_COUNT = from_conf("S3_TRANSIENT_RETRY_COUNT", 20) # S3 retry configuration used in the aws client # Use the adaptive retry strategy by default S3_CLIENT_RETRY_CONFIG = from_conf( "S3_CLIENT_RETRY_CONFIG", {"max_attempts": 10, "mode": "adaptive"} ) # Threshold to start printing warnings for an AWS retry RETRY_WARNING_THRESHOLD = 3 # S3 datatools root location DATATOOLS_SUFFIX = from_conf("DATATOOLS_SUFFIX", "data") DATATOOLS_S3ROOT = from_conf( "DATATOOLS_S3ROOT", ( os.path.join(DATASTORE_SYSROOT_S3, DATATOOLS_SUFFIX) if DATASTORE_SYSROOT_S3 else None ), ) TEMPDIR = from_conf("TEMPDIR", ".") DATATOOLS_CLIENT_PARAMS = from_conf("DATATOOLS_CLIENT_PARAMS", {}) if S3_ENDPOINT_URL: DATATOOLS_CLIENT_PARAMS["endpoint_url"] = S3_ENDPOINT_URL if S3_VERIFY_CERTIFICATE: DATATOOLS_CLIENT_PARAMS["verify"] = S3_VERIFY_CERTIFICATE DATATOOLS_SESSION_VARS = from_conf("DATATOOLS_SESSION_VARS", {}) # Azure datatools root location # Note: we do not expose an actual datatools library for Azure (like we do for S3) # Similar to DATATOOLS_LOCALROOT, this is used ONLY by the IncludeFile's internal implementation. DATATOOLS_AZUREROOT = from_conf( "DATATOOLS_AZUREROOT", ( os.path.join(DATASTORE_SYSROOT_AZURE, DATATOOLS_SUFFIX) if DATASTORE_SYSROOT_AZURE else None ), ) # GS datatools root location # Note: we do not expose an actual datatools library for GS (like we do for S3) # Similar to DATATOOLS_LOCALROOT, this is used ONLY by the IncludeFile's internal implementation. DATATOOLS_GSROOT = from_conf( "DATATOOLS_GSROOT", ( os.path.join(DATASTORE_SYSROOT_GS, DATATOOLS_SUFFIX) if DATASTORE_SYSROOT_GS else None ), ) # Local datatools root location DATATOOLS_LOCALROOT = from_conf( "DATATOOLS_LOCALROOT", ( os.path.join(DATASTORE_SYSROOT_LOCAL, DATATOOLS_SUFFIX) if DATASTORE_SYSROOT_LOCAL else None ), ) # Secrets Backend - AWS Secrets Manager configuration AWS_SECRETS_MANAGER_DEFAULT_REGION = from_conf("AWS_SECRETS_MANAGER_DEFAULT_REGION") # Secrets Backend - GCP Secrets name prefix. With this, users don't have # to specify the full secret name in the @secret decorator. # # Note that it makes a difference whether the prefix ends with a slash or not # E.g. if secret name passed to @secret decorator is mysecret: # - "projects/1234567890/secrets/" -> "projects/1234567890/secrets/mysecret" # - "projects/1234567890/secrets/foo-" -> "projects/1234567890/secrets/foo-mysecret" GCP_SECRET_MANAGER_PREFIX = from_conf("GCP_SECRET_MANAGER_PREFIX") # Secrets Backend - Azure Key Vault prefix. With this, users don't have to # specify the full https:// vault url in the @secret decorator. # # It does not make a difference if the prefix ends in a / or not. We will handle either # case correctly. AZURE_KEY_VAULT_PREFIX = from_conf("AZURE_KEY_VAULT_PREFIX") # The root directory to save artifact pulls in, when using S3 or Azure ARTIFACT_LOCALROOT = from_conf("ARTIFACT_LOCALROOT", os.getcwd()) # Cards related config variables CARD_SUFFIX = "mf.cards" CARD_LOCALROOT = from_conf("CARD_LOCALROOT") CARD_S3ROOT = from_conf( "CARD_S3ROOT", os.path.join(DATASTORE_SYSROOT_S3, CARD_SUFFIX) if DATASTORE_SYSROOT_S3 else None, ) CARD_AZUREROOT = from_conf( "CARD_AZUREROOT", ( os.path.join(DATASTORE_SYSROOT_AZURE, CARD_SUFFIX) if DATASTORE_SYSROOT_AZURE else None ), ) CARD_GSROOT = from_conf( "CARD_GSROOT", os.path.join(DATASTORE_SYSROOT_GS, CARD_SUFFIX) if DATASTORE_SYSROOT_GS else None, ) CARD_NO_WARNING = from_conf("CARD_NO_WARNING", False) SKIP_CARD_DUALWRITE = from_conf("SKIP_CARD_DUALWRITE", False) RUNTIME_CARD_RENDER_INTERVAL = from_conf("RUNTIME_CARD_RENDER_INTERVAL", 60) # Azure storage account URL AZURE_STORAGE_BLOB_SERVICE_ENDPOINT = from_conf("AZURE_STORAGE_BLOB_SERVICE_ENDPOINT") # Azure storage can use process-based parallelism instead of threads. # Processes perform better for high throughput workloads (e.g. many huge artifacts) AZURE_STORAGE_WORKLOAD_TYPE = from_conf( "AZURE_STORAGE_WORKLOAD_TYPE", default="general", validate_fn=get_validate_choice_fn(["general", "high_throughput"]), ) # GS storage can use process-based parallelism instead of threads. # Processes perform better for high throughput workloads (e.g. many huge artifacts) GS_STORAGE_WORKLOAD_TYPE = from_conf( "GS_STORAGE_WORKLOAD_TYPE", "general", validate_fn=get_validate_choice_fn(["general", "high_throughput"]), ) ### # Metadata configuration ### SERVICE_URL = from_conf("SERVICE_URL") SERVICE_RETRY_COUNT = from_conf("SERVICE_RETRY_COUNT", 5) SERVICE_AUTH_KEY = from_conf("SERVICE_AUTH_KEY") SERVICE_HEADERS = from_conf("SERVICE_HEADERS", {}) if SERVICE_AUTH_KEY is not None: SERVICE_HEADERS["x-api-key"] = SERVICE_AUTH_KEY # Checks version compatibility with Metadata service SERVICE_VERSION_CHECK = from_conf("SERVICE_VERSION_CHECK", True) # Default container image DEFAULT_CONTAINER_IMAGE = from_conf("DEFAULT_CONTAINER_IMAGE") # Default container registry DEFAULT_CONTAINER_REGISTRY = from_conf("DEFAULT_CONTAINER_REGISTRY") # Controls whether to include foreach stack information in metadata. INCLUDE_FOREACH_STACK = from_conf("INCLUDE_FOREACH_STACK", True) # Maximum length of the foreach value string to be stored in each ForeachFrame. MAXIMUM_FOREACH_VALUE_CHARS = from_conf("MAXIMUM_FOREACH_VALUE_CHARS", 30) # The default runtime limit (In seconds) of jobs launched by any compute provider. Default of 5 days. DEFAULT_RUNTIME_LIMIT = from_conf("DEFAULT_RUNTIME_LIMIT", 5 * 24 * 60 * 60) ### # Organization customizations ### UI_URL = from_conf("UI_URL") ### # Capture error logs from argo ### ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT = from_conf("ARGO_WORKFLOWS_CAPTURE_ERROR_SCRIPT") # Contact information displayed when running the `metaflow` command. # Value should be a dictionary where: # - key is a string describing contact method # - value is a string describing contact itself (email, web address, etc.) # The default value shows an example of this CONTACT_INFO = from_conf( "CONTACT_INFO", { "Read the documentation": "http://docs.metaflow.org", "Chat with us": "http://chat.metaflow.org", "Get help by email": "help@metaflow.org", }, ) ### # Decorators ### # Format is a space separated string of decospecs (what is passed # using --with) DEFAULT_DECOSPECS = from_conf("DEFAULT_DECOSPECS", "") ### # AWS Batch configuration ### # IAM role for AWS Batch container with Amazon S3 access # (and AWS DynamoDb access for AWS StepFunctions, if enabled) ECS_S3_ACCESS_IAM_ROLE = from_conf("ECS_S3_ACCESS_IAM_ROLE") # IAM role for AWS Batch container for AWS Fargate ECS_FARGATE_EXECUTION_ROLE = from_conf("ECS_FARGATE_EXECUTION_ROLE") # Job queue for AWS Batch BATCH_JOB_QUEUE = from_conf("BATCH_JOB_QUEUE") # Default container image for AWS Batch BATCH_CONTAINER_IMAGE = from_conf("BATCH_CONTAINER_IMAGE", DEFAULT_CONTAINER_IMAGE) # Default container registry for AWS Batch BATCH_CONTAINER_REGISTRY = from_conf( "BATCH_CONTAINER_REGISTRY", DEFAULT_CONTAINER_REGISTRY ) # Metadata service URL for AWS Batch SERVICE_INTERNAL_URL = from_conf("SERVICE_INTERNAL_URL", SERVICE_URL) # Assign resource tags to AWS Batch jobs. Set to False by default since # it requires `Batch:TagResource` permissions which may not be available # in all Metaflow deployments. Hopefully, some day we can flip the # default to True. BATCH_EMIT_TAGS = from_conf("BATCH_EMIT_TAGS", False) ### # AWS Step Functions configuration ### # IAM role for AWS Step Functions with AWS Batch and AWS DynamoDb access # https://docs.aws.amazon.com/step-functions/latest/dg/batch-iam.html SFN_IAM_ROLE = from_conf("SFN_IAM_ROLE") # AWS DynamoDb Table name (with partition key - `pathspec` of type string) SFN_DYNAMO_DB_TABLE = from_conf("SFN_DYNAMO_DB_TABLE") # IAM role for AWS Events with AWS Step Functions access # https://docs.aws.amazon.com/eventbridge/latest/userguide/auth-and-access-control-eventbridge.html EVENTS_SFN_ACCESS_IAM_ROLE = from_conf("EVENTS_SFN_ACCESS_IAM_ROLE") # Prefix for AWS Step Functions state machines. Set to stack name for Metaflow # sandbox. SFN_STATE_MACHINE_PREFIX = from_conf("SFN_STATE_MACHINE_PREFIX") # Optional AWS CloudWatch Log Group ARN for emitting AWS Step Functions state # machine execution logs. This needs to be available when using the # `step-functions create --log-execution-history` command. SFN_EXECUTION_LOG_GROUP_ARN = from_conf("SFN_EXECUTION_LOG_GROUP_ARN") # Amazon S3 path for storing the results of AWS Step Functions Distributed Map SFN_S3_DISTRIBUTED_MAP_OUTPUT_PATH = from_conf( "SFN_S3_DISTRIBUTED_MAP_OUTPUT_PATH", ( os.path.join(DATASTORE_SYSROOT_S3, "sfn_distributed_map_output") if DATASTORE_SYSROOT_S3 else None ), ) ### # Kubernetes configuration ### # Kubernetes namespace to use for all objects created by Metaflow KUBERNETES_NAMESPACE = from_conf("KUBERNETES_NAMESPACE", "default") # Default service account to use by K8S jobs created by Metaflow KUBERNETES_SERVICE_ACCOUNT = from_conf("KUBERNETES_SERVICE_ACCOUNT") # Default node selectors to use by K8S jobs created by Metaflow - foo=bar,baz=bab KUBERNETES_NODE_SELECTOR = from_conf("KUBERNETES_NODE_SELECTOR", "") KUBERNETES_TOLERATIONS = from_conf("KUBERNETES_TOLERATIONS", "") KUBERNETES_PERSISTENT_VOLUME_CLAIMS = from_conf( "KUBERNETES_PERSISTENT_VOLUME_CLAIMS", "" ) KUBERNETES_SECRETS = from_conf("KUBERNETES_SECRETS", "") # Default labels for kubernetes pods KUBERNETES_LABELS = from_conf("KUBERNETES_LABELS", "") # Default annotations for kubernetes pods KUBERNETES_ANNOTATIONS = from_conf("KUBERNETES_ANNOTATIONS", "") # Default GPU vendor to use by K8S jobs created by Metaflow (supports nvidia, amd) KUBERNETES_GPU_VENDOR = from_conf("KUBERNETES_GPU_VENDOR", "nvidia") # Default container image for K8S KUBERNETES_CONTAINER_IMAGE = from_conf( "KUBERNETES_CONTAINER_IMAGE", DEFAULT_CONTAINER_IMAGE ) # Image pull policy for container images KUBERNETES_IMAGE_PULL_POLICY = from_conf("KUBERNETES_IMAGE_PULL_POLICY", None) # Default container registry for K8S KUBERNETES_CONTAINER_REGISTRY = from_conf( "KUBERNETES_CONTAINER_REGISTRY", DEFAULT_CONTAINER_REGISTRY ) # Toggle for trying to fetch EC2 instance metadata KUBERNETES_FETCH_EC2_METADATA = from_conf("KUBERNETES_FETCH_EC2_METADATA", False) # Shared memory in MB to use for this step KUBERNETES_SHARED_MEMORY = from_conf("KUBERNETES_SHARED_MEMORY", None) # Default port number to open on the pods KUBERNETES_PORT = from_conf("KUBERNETES_PORT", None) # Default kubernetes resource requests for CPU, memory and disk KUBERNETES_CPU = from_conf("KUBERNETES_CPU", None) KUBERNETES_MEMORY = from_conf("KUBERNETES_MEMORY", None) KUBERNETES_DISK = from_conf("KUBERNETES_DISK", None) # Default kubernetes QoS class KUBERNETES_QOS = from_conf("KUBERNETES_QOS", "burstable") # Architecture of kubernetes nodes - used for @conda/@pypi in metaflow-dev KUBERNETES_CONDA_ARCH = from_conf("KUBERNETES_CONDA_ARCH") ARGO_WORKFLOWS_KUBERNETES_SECRETS = from_conf("ARGO_WORKFLOWS_KUBERNETES_SECRETS", "") ARGO_WORKFLOWS_ENV_VARS_TO_SKIP = from_conf("ARGO_WORKFLOWS_ENV_VARS_TO_SKIP", "") KUBERNETES_JOBSET_GROUP = from_conf("KUBERNETES_JOBSET_GROUP", "jobset.x-k8s.io") KUBERNETES_JOBSET_VERSION = from_conf("KUBERNETES_JOBSET_VERSION", "v1alpha2") ## # Argo Events Configuration ## ARGO_EVENTS_SERVICE_ACCOUNT = from_conf("ARGO_EVENTS_SERVICE_ACCOUNT") ARGO_EVENTS_EVENT_BUS = from_conf("ARGO_EVENTS_EVENT_BUS", "default") ARGO_EVENTS_EVENT_SOURCE = from_conf("ARGO_EVENTS_EVENT_SOURCE") ARGO_EVENTS_EVENT = from_conf("ARGO_EVENTS_EVENT") ARGO_EVENTS_WEBHOOK_URL = from_conf("ARGO_EVENTS_WEBHOOK_URL") ARGO_EVENTS_INTERNAL_WEBHOOK_URL = from_conf( "ARGO_EVENTS_INTERNAL_WEBHOOK_URL", ARGO_EVENTS_WEBHOOK_URL ) ARGO_EVENTS_WEBHOOK_AUTH = from_conf("ARGO_EVENTS_WEBHOOK_AUTH", "none") ARGO_WORKFLOWS_UI_URL = from_conf("ARGO_WORKFLOWS_UI_URL") ## # Airflow Configuration ## # This configuration sets `startup_timeout_seconds` in airflow's KubernetesPodOperator. AIRFLOW_KUBERNETES_STARTUP_TIMEOUT_SECONDS = from_conf( "AIRFLOW_KUBERNETES_STARTUP_TIMEOUT_SECONDS", 60 * 60 ) # This configuration sets `kubernetes_conn_id` in airflow's KubernetesPodOperator. AIRFLOW_KUBERNETES_CONN_ID = from_conf("AIRFLOW_KUBERNETES_CONN_ID") AIRFLOW_KUBERNETES_KUBECONFIG_FILE = from_conf("AIRFLOW_KUBERNETES_KUBECONFIG_FILE") AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT = from_conf( "AIRFLOW_KUBERNETES_KUBECONFIG_CONTEXT" ) ### # Conda configuration ### # Conda package root location on S3 CONDA_PACKAGE_S3ROOT = from_conf("CONDA_PACKAGE_S3ROOT") # Conda package root location on Azure CONDA_PACKAGE_AZUREROOT = from_conf("CONDA_PACKAGE_AZUREROOT") # Conda package root location on GS CONDA_PACKAGE_GSROOT = from_conf("CONDA_PACKAGE_GSROOT") # Use an alternate dependency resolver for conda packages instead of conda # Mamba promises faster package dependency resolution times, which # should result in an appreciable speedup in flow environment initialization. CONDA_DEPENDENCY_RESOLVER = from_conf("CONDA_DEPENDENCY_RESOLVER", "conda") # Default to not using fast init binary. CONDA_USE_FAST_INIT = from_conf("CONDA_USE_FAST_INIT", False) ### # Escape hatch configuration ### # Print out warning if escape hatch is not used for the target packages ESCAPE_HATCH_WARNING = from_conf("ESCAPE_HATCH_WARNING", True) ### # Debug configuration ### DEBUG_OPTIONS = ["subcommand", "sidecar", "s3client", "tracing", "stubgen", "userconf"] for typ in DEBUG_OPTIONS: vars()["DEBUG_%s" % typ.upper()] = from_conf("DEBUG_%s" % typ.upper(), False) ### # Plugin configuration ### # Plugin configuration variables exist in plugins/__init__.py. # Specifically, there is an ENABLED_<category> configuration value to determine # the set of plugins to enable. The categories are: step_decorator, flow_decorator, # environment, metadata_provider, datastore, sidecar, logging_sidecar, monitor_sidecar, # aws_client_provider, and cli. If not set (the default), all plugins are enabled. # You can restrict which plugins are enabled by listing them explicitly, for example # ENABLED_STEP_DECORATOR = ["batch", "resources"] will enable only those two step # decorators and none other. ### # Command configuration ### # Command (ie: metaflow <cmd>) configuration variable ENABLED_CMD # exists in cmd/main_cli.py. It behaves just like any of the other ENABLED_<category> # configuration variables. ### # AWS Sandbox configuration ### # Boolean flag for metaflow AWS sandbox access AWS_SANDBOX_ENABLED = from_conf("AWS_SANDBOX_ENABLED", False) # Metaflow AWS sandbox auth endpoint AWS_SANDBOX_STS_ENDPOINT_URL = SERVICE_URL # Metaflow AWS sandbox API auth key AWS_SANDBOX_API_KEY = from_conf("AWS_SANDBOX_API_KEY") # Internal Metadata URL AWS_SANDBOX_INTERNAL_SERVICE_URL = from_conf("AWS_SANDBOX_INTERNAL_SERVICE_URL") # AWS region AWS_SANDBOX_REGION = from_conf("AWS_SANDBOX_REGION") # Finalize configuration if AWS_SANDBOX_ENABLED: os.environ["AWS_DEFAULT_REGION"] = AWS_SANDBOX_REGION SERVICE_INTERNAL_URL = AWS_SANDBOX_INTERNAL_SERVICE_URL SERVICE_HEADERS["x-api-key"] = AWS_SANDBOX_API_KEY SFN_STATE_MACHINE_PREFIX = from_conf("AWS_SANDBOX_STACK_NAME") KUBERNETES_SANDBOX_INIT_SCRIPT = from_conf("KUBERNETES_SANDBOX_INIT_SCRIPT") OTEL_ENDPOINT = from_conf("OTEL_ENDPOINT") ZIPKIN_ENDPOINT = from_conf("ZIPKIN_ENDPOINT") CONSOLE_TRACE_ENABLED = from_conf("CONSOLE_TRACE_ENABLED", False) # internal env used for preventing the tracing module from loading during Conda bootstrapping. DISABLE_TRACING = bool(os.environ.get("DISABLE_TRACING", False)) # MAX_ATTEMPTS is the maximum number of attempts, including the first # task, retries, and the final fallback task and its retries. # # Datastore needs to check all attempt files to find the latest one, so # increasing this limit has real performance implications for all tasks. # Decreasing this limit is very unsafe, as it can lead to wrong results # being read from old tasks. # # Note also that DataStoreSet resolves the latest attempt_id using # lexicographic ordering of attempts. This won't work if MAX_ATTEMPTS > 99. MAX_ATTEMPTS = 6 # Feature flag (experimental features that are *explicitly* unsupported) # Process configs even when using the click_api for Runner/Deployer CLICK_API_PROCESS_CONFIG = from_conf("CLICK_API_PROCESS_CONFIG", False) # PINNED_CONDA_LIBS are the libraries that metaflow depends on for execution # and are needed within a conda environment def get_pinned_conda_libs(python_version, datastore_type): pins = { "requests": ">=2.21.0", } if datastore_type == "s3": pins["boto3"] = ">=1.14.0" elif datastore_type == "azure": pins["azure-identity"] = ">=1.10.0" pins["azure-storage-blob"] = ">=12.12.0" pins["azure-keyvault-secrets"] = ">=4.7.0" elif datastore_type == "gs": pins["google-cloud-storage"] = ">=2.5.0" pins["google-auth"] = ">=2.11.0" pins["google-cloud-secret-manager"] = ">=2.10.0" elif datastore_type == "local": pass else: raise MetaflowException( msg="conda lib pins for datastore %s are undefined" % (datastore_type,) ) return pins # Check if there are extensions to Metaflow to load and override everything try: from metaflow.extension_support import get_modules _TOGGLE_DECOSPECS = [] ext_modules = get_modules("config") for m in ext_modules: # We load into globals whatever we have in extension_module # We specifically exclude any modules that may be included (like sys, os, etc) for n, o in m.module.__dict__.items(): if n == "DEBUG_OPTIONS": DEBUG_OPTIONS.extend(o) for typ in o: vars()["DEBUG_%s" % typ.upper()] = from_conf( "DEBUG_%s" % typ.upper(), False ) elif n == "get_pinned_conda_libs": def _new_get_pinned_conda_libs( python_version, datastore_type, f1=globals()[n], f2=o ): d1 = f1(python_version, datastore_type) d2 = f2(python_version, datastore_type) for k, v in d2.items(): d1[k] = v if k not in d1 else ",".join([d1[k], v]) return d1 globals()[n] = _new_get_pinned_conda_libs elif n == "TOGGLE_DECOSPECS": if any([x.startswith("-") for x in o]): raise ValueError("Removing decospecs is not currently supported") if any(" " in x for x in o): raise ValueError("Decospecs cannot contain spaces") _TOGGLE_DECOSPECS.extend(o) elif not n.startswith("__") and not isinstance(o, types.ModuleType): globals()[n] = o # If DEFAULT_DECOSPECS is set, use that, else extrapolate from extensions if not DEFAULT_DECOSPECS: DEFAULT_DECOSPECS = " ".join(_TOGGLE_DECOSPECS) finally: # Erase all temporary names to avoid leaking things for _n in [ "m", "n", "o", "typ", "ext_modules", "get_modules", "_new_get_pinned_conda_libs", "d1", "d2", "k", "v", "f1", "f2", "_TOGGLE_DECOSPECS", ]: try: del globals()[_n] except KeyError: pass del globals()["_n"]