azure_functions_worker/utils/dependency.py (182 lines of code) (raw):
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import importlib.util
import inspect
import os
import re
import sys
from types import ModuleType
from typing import List, Optional
from azure_functions_worker.utils.common import is_envvar_true, is_true_like
from ..constants import (
AZURE_WEBJOBS_SCRIPT_ROOT,
CONTAINER_NAME,
PYTHON_ISOLATE_WORKER_DEPENDENCIES,
PYTHON_ISOLATE_WORKER_DEPENDENCIES_DEFAULT,
PYTHON_ISOLATE_WORKER_DEPENDENCIES_DEFAULT_310,
)
from ..logging import logger
from ..utils.common import is_python_version
from ..utils.wrappers import enable_feature_by
class DependencyManager:
"""The dependency manager controls the Python packages source, preventing
worker packages interfer customer's code.
It has two mode, in worker mode, the Python packages are loaded from worker
path, (e.g. workers/python/<python_version>/<os>/<arch>). In customer mode,
the packages are loaded from customer's .python_packages/ folder or from
their virtual environment.
Azure Functions has three different set of sys.path ordering,
Linux Consumption sys.path: [
"/tmp/functions\\standby\\wwwroot", # Placeholder folder
"/home/site/wwwroot/.python_packages/lib/site-packages", # CX's deps
"/azure-functions-host/workers/python/3.11/LINUX/X64", # Worker's deps
"/home/site/wwwroot" # CX's Working Directory
]
Linux Dedicated/Premium sys.path: [
"/home/site/wwwroot", # CX's Working Directory
"/home/site/wwwroot/.python_packages/lib/site-packages", # CX's deps
"/azure-functions-host/workers/python/3.11/LINUX/X64", # Worker's deps
]
Core Tools sys.path: [
"%appdata%\\azure-functions-core-tools\\bin\\workers\\"
"python\\3.11\\WINDOWS\\X64", # Worker's deps
"C:\\Users\\user\\Project\\.venv311\\lib\\site-packages", # CX's deps
"C:\\Users\\user\\Project", # CX's Working Directory
]
When we first start up the Python worker, we should only loaded from
worker's deps and create module namespace (e.g. google.protobuf variable).
Once the worker receives worker init request, we clear out the sys.path,
worker sys.modules cache and sys.path_import_cache so the libraries
will only get loaded from CX's deps path.
"""
cx_deps_path: str = ''
cx_working_dir: str = ''
worker_deps_path: str = ''
@classmethod
def initialize(cls):
cls.cx_deps_path = cls._get_cx_deps_path()
cls.cx_working_dir = cls._get_cx_working_dir()
cls.worker_deps_path = cls._get_worker_deps_path()
@classmethod
def is_in_linux_consumption(cls):
return CONTAINER_NAME in os.environ
@classmethod
def should_load_cx_dependencies(cls):
"""
Customer dependencies should be loaded when dependency
isolation is enabled and
1) App is a dedicated app
2) App is linux consumption but not in placeholder mode.
This can happen when the worker restarts for any reason
(OOM, timeouts etc) and env reload request is not called.
"""
return not (DependencyManager.is_in_linux_consumption()
and is_envvar_true("WEBSITE_PLACEHOLDER_MODE"))
@classmethod
@enable_feature_by(
flag=PYTHON_ISOLATE_WORKER_DEPENDENCIES,
flag_default=PYTHON_ISOLATE_WORKER_DEPENDENCIES_DEFAULT
)
def use_worker_dependencies(cls):
"""Switch the sys.path and ensure the worker imports are loaded from
Worker's dependenciess.
This will not affect already imported namespaces, but will clear out
the module cache and ensure the upcoming modules are loaded from
worker's dependency path.
"""
# The following log line will not show up in core tools but should
# work in kusto since core tools only collects gRPC logs. This function
# is executed even before the gRPC logging channel is ready.
logger.info('Applying use_worker_dependencies:'
' worker_dependencies: %s,'
' customer_dependencies: %s,'
' working_directory: %s', cls.worker_deps_path,
cls.cx_deps_path, cls.cx_working_dir)
cls._remove_from_sys_path(cls.cx_deps_path)
cls._remove_from_sys_path(cls.cx_working_dir)
cls._add_to_sys_path(cls.worker_deps_path, True)
logger.info('Start using worker dependencies %s', cls.worker_deps_path)
@classmethod
@enable_feature_by(
flag=PYTHON_ISOLATE_WORKER_DEPENDENCIES,
flag_default=PYTHON_ISOLATE_WORKER_DEPENDENCIES_DEFAULT
)
def prioritize_customer_dependencies(cls, cx_working_dir=None):
"""Switch the sys.path and ensure the customer's code import are loaded
from CX's deppendencies.
This will not affect already imported namespaces, but will clear out
the module cache and ensure the upcoming modules are loaded from
customer's dependency path.
As for Linux Consumption, this will only remove worker_deps_path,
but the customer's path will be loaded in function_environment_reload.
The search order of a module name in customer's paths is:
1. cx_deps_path
2. worker_deps_path
3. cx_working_dir
"""
# Try to get the latest customer's working directory
# cx_working_dir => cls.cx_working_dir => AzureWebJobsScriptRoot
working_directory: str = ''
if cx_working_dir:
working_directory: str = os.path.abspath(cx_working_dir)
if not working_directory:
working_directory = cls.cx_working_dir
if not working_directory:
working_directory = os.getenv(AZURE_WEBJOBS_SCRIPT_ROOT, '')
# Try to get the latest customer's dependency path
cx_deps_path: str = cls._get_cx_deps_path()
if not cx_deps_path:
cx_deps_path = cls.cx_deps_path
logger.info(
'Applying prioritize_customer_dependencies: '
'worker_dependencies_path: %s, customer_dependencies_path: %s, '
'working_directory: %s, Linux Consumption: %s, Placeholder: %s',
cls.worker_deps_path, cx_deps_path, working_directory,
DependencyManager.is_in_linux_consumption(),
is_envvar_true("WEBSITE_PLACEHOLDER_MODE"))
cls._remove_from_sys_path(cls.worker_deps_path)
cls._add_to_sys_path(cls.cx_deps_path, True)
# Deprioritize worker dependencies but don't completely remove it
# Otherwise, it will break some really old function apps, those
# don't have azure-functions module in .python_packages
# https://github.com/Azure/azure-functions-core-tools/pull/1498
cls._add_to_sys_path(cls.worker_deps_path, False)
# The modules defined in customer's working directory should have the
# least priority since we uses the new folder structure.
# Please check the "Message to customer" section in the following PR:
# https://github.com/Azure/azure-functions-python-worker/pull/726
cls._add_to_sys_path(working_directory, False)
logger.info('Finished prioritize_customer_dependencies')
@classmethod
def reload_customer_libraries(cls, cx_working_dir: str):
"""Reload azure and google namespace, this including any modules in
this namespace, such as azure-functions, grpcio, grpcio-tools etc.
Depends on the PYTHON_ISOLATE_WORKER_DEPENDENCIES, the actual behavior
differs.
This is called only when placeholder mode is true. In the case of a
worker restart, this will not be called.
Parameters
----------
cx_working_dir: str
The path which contains customer's project file (e.g. wwwroot).
"""
use_new_env = os.getenv(PYTHON_ISOLATE_WORKER_DEPENDENCIES)
if use_new_env is None:
use_new = (
PYTHON_ISOLATE_WORKER_DEPENDENCIES_DEFAULT_310 if
is_python_version('3.10') else
PYTHON_ISOLATE_WORKER_DEPENDENCIES_DEFAULT
)
else:
use_new = is_true_like(use_new_env)
if use_new:
cls.prioritize_customer_dependencies(cx_working_dir)
else:
cls.reload_azure_google_namespace_from_worker_deps()
@classmethod
def reload_azure_google_namespace_from_worker_deps(cls):
"""This is the old implementation of reloading azure and google
namespace in Python worker directory. It is not actually re-importing
the module but only reloads the module scripts from the worker path.
It is not doing what it is intended, but due to it is already released
on Linux Consumption production, we don't want to introduce regression
on existing customers.
Only intended to be used in Linux Consumption scenario.
"""
# Reload package namespaces for customer's libraries
packages_to_reload = ['azure', 'google']
packages_reloaded = []
for p in packages_to_reload:
try:
importlib.reload(sys.modules[p])
packages_reloaded.append(p)
except Exception as ex:
logger.warning('Unable to reload %s: \n%s', p, ex)
logger.info(f'Reloaded modules: {",".join(packages_reloaded)}')
# Reload azure.functions to give user package precedence
try:
importlib.reload(sys.modules['azure.functions'])
logger.info('Reloaded azure.functions module now at %s',
inspect.getfile(sys.modules['azure.functions']))
except Exception as ex:
logger.warning(
'Unable to reload azure.functions. Using default. '
'Exception:\n%s', ex)
@classmethod
def _add_to_sys_path(cls, path: str, add_to_first: bool):
"""This will ensure no duplicated path are added into sys.path and
clear importer cache. No action if path already exists in sys.path.
Parameters
----------
path: str
The path needs to be added into sys.path.
If the path is an empty string, no action will be taken.
add_to_first: bool
Should the path added to the first entry (highest priority)
"""
if path and path not in sys.path:
if add_to_first:
sys.path.insert(0, path)
else:
sys.path.append(path)
# Only clear path importer and sys.modules cache if path is not
# defined in sys.path
cls._clear_path_importer_cache_and_modules(path)
@classmethod
def _remove_from_sys_path(cls, path: str):
"""This will remove path from sys.path and clear importer cache.
No action if the path does not exist in sys.path.
Parameters
----------
path: str
The path to be removed from sys.path.
If the path is an empty string, no action will be taken.
"""
if path and path in sys.path:
# Remove all occurances in sys.path
sys.path = list(filter(lambda p: p != path, sys.path))
# In case if any part of worker initialization do sys.path.pop()
# Always do a cache clear in path importer and sys.modules
cls._clear_path_importer_cache_and_modules(path)
@classmethod
def _clear_path_importer_cache_and_modules(cls, path: str):
"""Removes path from sys.path_importer_cache and clear related
sys.modules cache. No action if the path is empty or no entries
in sys.path_importer_cache or sys.modules.
Parameters
----------
path: str
The path to be removed from sys.path_importer_cache. All related
modules will be cleared out from sys.modules cache.
If the path is an empty string, no action will be taken.
"""
if path and path in sys.path_importer_cache:
sys.path_importer_cache.pop(path)
if path:
cls._remove_module_cache(path)
@staticmethod
def _get_cx_deps_path() -> str:
"""Get the directory storing the customer's third-party libraries.
Returns
-------
str
Core Tools: path to customer's site pacakges
Linux Dedicated/Premium: path to customer's site pacakges
Linux Consumption: empty string
"""
prefix: Optional[str] = os.getenv(AZURE_WEBJOBS_SCRIPT_ROOT)
cx_paths: List[str] = [
p for p in sys.path
if prefix and p.startswith(prefix) and ('site-packages' in p)
]
# Return first or default of customer path
return (cx_paths or [''])[0]
@staticmethod
def _get_cx_working_dir() -> str:
"""Get the customer's working directory.
Returns
-------
str
Core Tools: AzureWebJobsScriptRoot env variable
Linux Dedicated/Premium: AzureWebJobsScriptRoot env variable
Linux Consumption: empty string
"""
return os.getenv(AZURE_WEBJOBS_SCRIPT_ROOT, '')
@staticmethod
def _get_worker_deps_path() -> str:
"""Get the worker dependency sys.path. This will always available
even in all skus.
Returns
-------
str
The worker packages path
"""
# 1. Try to parse the absolute path python/3.8/LINUX/X64 in sys.path
r = re.compile(r'.*python(\/|\\)\d+\.\d+(\/|\\)(WINDOWS|LINUX|OSX).*')
worker_deps_paths: List[str] = [p for p in sys.path if r.match(p)]
if worker_deps_paths:
return worker_deps_paths[0]
# 2. Try to find module spec of azure.functions without actually
# importing it (e.g. lib/site-packages/azure/functions/__init__.py)
try:
azf_spec = importlib.util.find_spec('azure.functions')
if azf_spec and azf_spec.origin:
return os.path.abspath(
os.path.join(os.path.dirname(azf_spec.origin), '..', '..')
)
except ModuleNotFoundError:
logger.warning('Cannot locate built-in azure.functions module')
# 3. If it fails to find one, try to find one from the parent path
# This is used for handling the CI/localdev environment
return os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', '..')
)
@staticmethod
def _remove_module_cache(path: str):
"""Remove module cache if the module is imported from specific path.
This will not impact builtin modules
Parameters
----------
path: str
The module cache to be removed if it is imported from this path.
"""
if not path:
return
not_builtin = set(sys.modules.keys()) - set(sys.builtin_module_names)
# Don't reload azure_functions_worker
to_be_cleared_from_cache = set([
module_name for module_name in not_builtin
if not module_name.startswith('azure_functions_worker')
])
for module_name in to_be_cleared_from_cache:
module = sys.modules.get(module_name)
if not isinstance(module, ModuleType):
continue
# Module path can be actual file path or a pure namespace path.
# Both of these has the module path placed in __path__ property
# The property .__path__ can be None or does not exist in module
try:
module_paths = set(getattr(module, '__path__', None) or [])
if hasattr(module, '__file__') and module.__file__:
module_paths.add(module.__file__)
if any([p for p in module_paths if p.startswith(path)]):
sys.modules.pop(module_name)
except Exception as e:
logger.warning(
'Attempt to remove module cache for %s but failed with '
'%s. Using the original module cache.',
module_name, e)