samcli/commands/local/lib/local_lambda.py (219 lines of code) (raw):
"""
Implementation of Local Lambda runner
"""
import logging
import os
import platform
from typing import Any, Dict, Optional, cast
import boto3
import click
from botocore.credentials import Credentials
from samcli.commands.local.lib.debug_context import DebugContext
from samcli.commands.local.lib.exceptions import (
InvalidIntermediateImageError,
NoPrivilegeException,
OverridesNotWellDefinedError,
UnsupportedInlineCodeError,
)
from samcli.lib.providers.provider import Function
from samcli.lib.providers.sam_function_provider import SamFunctionProvider
from samcli.lib.utils.architecture import validate_architecture_runtime
from samcli.lib.utils.codeuri import resolve_code_path
from samcli.lib.utils.colors import Colored
from samcli.lib.utils.packagetype import IMAGE, ZIP
from samcli.lib.utils.stream_writer import StreamWriter
from samcli.local.docker.container import ContainerConnectionTimeoutException, ContainerResponseException
from samcli.local.lambdafn.config import FunctionConfig
from samcli.local.lambdafn.env_vars import EnvironmentVariables
from samcli.local.lambdafn.exceptions import FunctionNotFound
from samcli.local.lambdafn.runtime import LambdaRuntime
LOG = logging.getLogger(__name__)
RUST_LOCAL_INVOKE_DISCLAIMER = """
DISCLAIMER: If your local machine is using Apple silicon and you are unable to run \"sam local invoke\"
you can try setting the \"SAM_BUILD_MODE\" environ variable to equal \"debug\".
Please note that this is only recommended when building and testing locally.
We advise customers to switch back to a release build by unsetting or removing the SAM_BUILD_MODE environ variable
"""
class LocalLambdaRunner:
"""
Runs Lambda functions locally. This class is a wrapper around the `samcli.local` library which takes care
of actually running the function on a Docker container.
"""
MAX_DEBUG_TIMEOUT = 36000 # 10 hours in seconds
WIN_ERROR_CODE = 1314
def __init__(
self,
local_runtime: LambdaRuntime,
function_provider: SamFunctionProvider,
cwd: str,
real_path: str,
aws_profile: Optional[str] = None,
aws_region: Optional[str] = None,
env_vars_values: Optional[Dict[Any, Any]] = None,
debug_context: Optional[DebugContext] = None,
container_host: Optional[str] = None,
container_host_interface: Optional[str] = None,
extra_hosts: Optional[dict] = None,
) -> None:
"""
Initializes the class
:param samcli.local.lambdafn.runtime.LambdaRuntime local_runtime: Lambda runtime capable of running a function
:param samcli.commands.local.lib.provider.FunctionProvider function_provider: Provider that can return a
Lambda function
:param string cwd: Current working directory. We will resolve all function CodeURIs relative to this directory.
:param string aws_profile: Optional. Name of the profile to fetch AWS credentials from.
:param string aws_region: Optional. AWS Region to use.
:param dict env_vars_values: Optional. Dictionary containing values of environment variables.
:param DebugContext debug_context: Optional. Debug context for the function (includes port, args, and path).
:param string container_host: Optional. Host of locally emulated Lambda container
:param string container_host_interface: Optional. Interface that Docker host binds ports to
:param dict extra_hosts: Optional. Dict of hostname to IP resolutions
"""
self.local_runtime = local_runtime
self.provider = function_provider
self.cwd = cwd
self.real_path = real_path
self.aws_profile = aws_profile
self.aws_region = aws_region
self.env_vars_values = env_vars_values or {}
self.debug_context = debug_context
self._boto3_session_creds: Optional[Credentials] = None
self._boto3_region: Optional[str] = None
self.container_host = container_host
self.container_host_interface = container_host_interface
self.extra_hosts = extra_hosts
def invoke(
self,
function_identifier: str,
event: str,
stdout: Optional[StreamWriter] = None,
stderr: Optional[StreamWriter] = None,
override_runtime: Optional[str] = None,
) -> None:
"""
Find the Lambda function with given name and invoke it. Pass the given event to the function and return
response through the given streams.
This function will block until either the function completes or times out.
Parameters
----------
function_identifier str
Identifier of the Lambda function to invoke, it can be logicalID, function name or full path
event str
Event data passed to the function. Must be a valid JSON String.
stdout samcli.lib.utils.stream_writer.StreamWriter
Stream writer to write the output of the Lambda function to.
stderr samcli.lib.utils.stream_writer.StreamWriter
Stream writer to write the Lambda runtime logs to.
Runtime: str
To use instead of the runtime specified in the function configuration
Raises
------
FunctionNotfound
When we cannot find a function with the given name
"""
# Generate the correct configuration based on given inputs
function = self.provider.get(function_identifier)
if not function:
all_function_full_paths = [f.full_path for f in self.provider.get_all()]
available_function_message = "{} not found. Possible options in your template: {}".format(
function_identifier, all_function_full_paths
)
LOG.info(available_function_message)
raise FunctionNotFound("Unable to find a Function with name '{}'".format(function_identifier))
LOG.debug("Found one Lambda function with name '%s'", function_identifier)
if function.packagetype == ZIP:
if function.inlinecode:
raise UnsupportedInlineCodeError(
"Inline code is not supported for sam local commands."
f" Please write your code in a separate file for the function {function.function_id}."
)
LOG.info("Invoking %s (%s)", function.handler, function.runtime)
elif function.packagetype == IMAGE:
if not function.imageuri:
raise InvalidIntermediateImageError(
f"ImageUri not provided for Function: {function_identifier} of PackageType: {function.packagetype}"
)
LOG.info("Invoking Container created from %s", function.imageuri)
validate_architecture_runtime(function)
config = self.get_invoke_config(function, override_runtime)
if (
function.metadata
and function.metadata.get("BuildMethod", "") == "rust-cargolambda"
and "macOS" in platform.platform()
and "arm64" in platform.platform()
):
click.echo(Colored().yellow(RUST_LOCAL_INVOKE_DISCLAIMER))
# Invoke the function
try:
self.local_runtime.invoke(
config,
event,
debug_context=self.debug_context,
stdout=stdout,
stderr=stderr,
container_host=self.container_host,
container_host_interface=self.container_host_interface,
extra_hosts=self.extra_hosts,
)
except ContainerResponseException:
# NOTE(sriram-mv): This should still result in a exit code zero to avoid regressions.
LOG.info("No response from invoke container for %s", function.name)
except ContainerConnectionTimeoutException as e:
# NOTE: Exit code of zero here as well to match the behaviour above (ContainerResponseException
# having exit code of zero) because previously when it timed out or exhausted retries while
# trying to connect to the socket for Docker it would throw ContainerResponseException but now it's this.
LOG.info(str(e))
except OSError as os_error:
if getattr(os_error, "winerror", None) == self.WIN_ERROR_CODE:
raise NoPrivilegeException(
"Administrator, Windows Developer Mode, "
"or SeCreateSymbolicLinkPrivilege is required to create symbolic link for files: {}, {}".format(
os_error.filename, os_error.filename2
)
) from os_error
raise
def is_debugging(self) -> bool:
"""
Are we debugging the invoke?
Returns
-------
bool
True, if we are debugging the invoke ie. the Docker container will break into the debugger and wait for
attach
"""
return bool(self.debug_context)
def get_invoke_config(self, function: Function, override_runtime: Optional[str] = None) -> FunctionConfig:
"""
Returns invoke configuration to pass to Lambda Runtime to invoke the given function
:param samcli.commands.local.lib.provider.Function function: Lambda function to generate the configuration for
:return samcli.local.lambdafn.config.FunctionConfig: Function configuration to pass to Lambda runtime
"""
env_vars = self._make_env_vars(function)
code_abs_path = None
code_real_path = None
if function.packagetype == ZIP:
code_abs_path = resolve_code_path(self.cwd, function.codeuri)
LOG.debug("Resolved absolute path to code is %s", code_abs_path)
code_real_path = resolve_code_path(self.real_path, function.codeuri)
LOG.debug("Resolved real code path to %s", code_real_path)
function_timeout = function.timeout
# The Runtime container handles timeout inside the container. When debugging with short timeouts, this can
# cause the container execution to stop. When in debug mode, we set the timeout in the container to a max 10
# hours. This will ensure the container doesn't unexpectedly stop while debugging function code
if self.is_debugging():
function_timeout = self.MAX_DEBUG_TIMEOUT
return FunctionConfig(
name=function.name,
full_path=function.full_path,
# override_runtime allows testing Lambda functions with a different
# runtime than specified in the function configuration
runtime=override_runtime if override_runtime else function.runtime,
handler=function.handler,
imageuri=function.imageuri,
imageconfig=function.imageconfig,
packagetype=function.packagetype,
code_abs_path=code_abs_path,
layers=function.layers,
architecture=function.architecture,
memory=function.memory,
timeout=function_timeout,
env_vars=env_vars,
runtime_management_config=function.runtime_management_config,
code_real_path=code_real_path,
)
def _make_env_vars(self, function: Function) -> EnvironmentVariables:
"""Returns the environment variables configuration for this function
Priority order for environment variables (high to low):
1. Function specific env vars from json file
2. Global env vars from json file
Parameters
----------
function : samcli.commands.local.lib.provider.Function
Lambda function to generate the configuration for
Returns
-------
samcli.local.lambdafn.env_vars.EnvironmentVariables
Environment variable configuration for this function
Raises
------
samcli.commands.local.lib.exceptions.OverridesNotWellDefinedError
If the environment dict is in the wrong format to process environment vars
"""
function_id = function.function_id
logical_id = function.name
function_name = function.functionname
full_path = function.full_path
variables = None
if isinstance(function.environment, dict) and "Variables" in function.environment:
variables = function.environment["Variables"]
else:
LOG.debug("No environment variables found for function '%s'", logical_id)
# This could either be in standard format, or a CloudFormation parameter file format, or mix of both.
#
# Standard format is {FunctionName: {key:value}, FunctionName: {key:value}}
# CloudFormation parameter file is {"Parameters": {key:value}}
# Mixed format is {FunctionName: {key:value}, "Parameters": {key:value}}
for env_var_value in self.env_vars_values.values():
if not isinstance(env_var_value, dict):
reason = "Environment variables {} in incorrect format".format(env_var_value)
LOG.debug(reason)
raise OverridesNotWellDefinedError(reason)
overrides = {}
# environment variables for specific resources take precedence over
# the single environment variable for all resources
if "Parameters" in self.env_vars_values:
LOG.debug("Environment variables data found in the CloudFormation parameter file format")
# CloudFormation parameter file format
parameter_result = self.env_vars_values.get("Parameters", {})
overrides.update(parameter_result)
# Precedence: logical_id -> function_id -> function name -> full_path, customer can use any of them
fn_file_env_vars = (
self.env_vars_values.get(logical_id, None)
or self.env_vars_values.get(function_id, None)
or self.env_vars_values.get(function_name, None)
or self.env_vars_values.get(full_path, None)
)
if fn_file_env_vars:
# Standard format
LOG.debug("Environment variables data found for specific function in standard format")
overrides.update(fn_file_env_vars)
shell_env = os.environ
aws_creds = self.get_aws_creds()
return EnvironmentVariables(
function.name,
function.memory,
function.timeout,
function.handler,
function.logging_config,
variables=variables,
shell_env_values=shell_env,
override_values=overrides,
aws_creds=aws_creds,
) # EnvironmentVariables is not yet annotated with type hints, disable mypy check for now. type: ignore
def _get_session_creds(self) -> Optional[Credentials]:
if self._boto3_session_creds is None:
# to pass command line arguments for region & profile to setup boto3 default session
LOG.debug("Loading AWS credentials from session with profile '%s'", self.aws_profile)
# The signature of boto3.session.Session defines the default values of profile_name and region_name
# so they should be Optional. But mypy follows its docstring which does not have "Optional."
# Here we trick mypy thinking they are both str rather than Optional[str].
session = boto3.session.Session(
profile_name=cast(str, self.aws_profile), region_name=cast(str, self.aws_region)
)
# check for region_name in session and cache
if hasattr(session, "region_name") and session.region_name:
self._boto3_region = session.region_name
# don't set cached session creds if there is not a session
if session:
self._boto3_session_creds = session.get_credentials()
return self._boto3_session_creds
def get_aws_creds(self) -> Dict[str, str]:
"""
Returns AWS credentials obtained from the shell environment or given profile
:return dict: A dictionary containing credentials. This dict has the structure
{"region": "", "key": "", "secret": "", "sessiontoken": ""}. If credentials could not be resolved,
this returns None
"""
result: Dict[str, str] = {}
# Load the credentials from profile/environment
creds = self._get_session_creds()
# After loading credentials, region name might be available here.
if self._boto3_region:
result["region"] = self._boto3_region
if not creds:
# If we were unable to load credentials, then just return result. We will use the default
return result
# Only add the key, if its value is present
if hasattr(creds, "access_key") and creds.access_key:
result["key"] = creds.access_key
if hasattr(creds, "secret_key") and creds.secret_key:
result["secret"] = creds.secret_key
if hasattr(creds, "token") and creds.token:
result["sessiontoken"] = creds.token
return result