chalice/cli/factory.py (272 lines of code) (raw):

from __future__ import annotations import sys import os import json import importlib import logging import functools import click from botocore.config import Config as BotocoreConfig from botocore.session import Session from typing import Any, Optional, Dict, MutableMapping, cast # noqa from chalice import __version__ as chalice_version from chalice.awsclient import TypedAWSClient from chalice.app import Chalice # noqa from chalice.config import Config from chalice.config import DeployedResources # noqa from chalice.package import create_app_packager from chalice.package import AppPackager # noqa from chalice.package import PackageOptions from chalice.constants import DEFAULT_STAGE_NAME from chalice.constants import DEFAULT_APIGATEWAY_STAGE_NAME from chalice.constants import DEFAULT_ENDPOINT_TYPE from chalice.logs import LogRetriever, LogEventGenerator from chalice.logs import FollowLogEventGenerator from chalice.logs import BaseLogEventGenerator from chalice import local from chalice.utils import UI # noqa from chalice.utils import PipeReader # noqa from chalice.deploy import deployer # noqa from chalice.deploy import validate from chalice.invoke import LambdaInvokeHandler from chalice.invoke import LambdaInvoker from chalice.invoke import LambdaResponseFormatter OptStr = Optional[str] OptInt = Optional[int] def create_botocore_session( profile: OptStr = None, debug: bool = False, connection_timeout: OptInt = None, read_timeout: OptInt = None, max_retries: OptInt = None, ) -> Session: s = Session(profile=profile) _add_chalice_user_agent(s) if debug: _inject_large_request_body_filter() config_args: Dict[str, Any] = {} if connection_timeout is not None: config_args['connect_timeout'] = connection_timeout if read_timeout is not None: config_args['read_timeout'] = read_timeout if max_retries is not None: config_args['retries'] = {'max_attempts': max_retries} if config_args: config = BotocoreConfig(**config_args) s.set_default_client_config(config) return s def _add_chalice_user_agent(session: Session) -> None: suffix = '%s/%s' % (session.user_agent_name, session.user_agent_version) session.user_agent_name = 'aws-chalice' session.user_agent_version = chalice_version session.user_agent_extra = suffix def _inject_large_request_body_filter() -> None: log = logging.getLogger('botocore.endpoint') log.addFilter(LargeRequestBodyFilter()) class NoSuchFunctionError(Exception): """The specified function could not be found.""" def __init__(self, name: str) -> None: self.name = name super(NoSuchFunctionError, self).__init__() class UnknownConfigFileVersion(Exception): def __init__(self, version: str) -> None: super(UnknownConfigFileVersion, self).__init__( "Unknown version '%s' in config.json" % version ) class LargeRequestBodyFilter(logging.Filter): def filter(self, record: Any) -> bool: # Note: the proper type should be "logging.LogRecord", but # the typechecker complains about 'Invalid index type "int" for "dict"' # so we're using Any for now. if record.msg.startswith('Making request'): if record.args[0].name in ['UpdateFunctionCode', 'CreateFunction']: # When using the ZipFile argument (which is used in chalice), # the entire deployment package zip is sent as a base64 encoded # string. We don't want this to clutter the debug logs # so we don't log the request body for lambda operations # that have the ZipFile arg. record.args = record.args[:-1] + ( '(... omitted from logs due to size ...)', ) return True class CLIFactory(object): def __init__( self, project_dir: str, debug: bool = False, profile: Optional[str] = None, environ: Optional[MutableMapping] = None, ) -> None: self.project_dir = project_dir self.debug = debug self.profile = profile if environ is None: environ = dict(os.environ) self._environ = environ def create_botocore_session( self, connection_timeout: OptInt = None, read_timeout: OptInt = None, max_retries: OptInt = None, ) -> Session: return create_botocore_session( profile=self.profile, debug=self.debug, connection_timeout=connection_timeout, read_timeout=read_timeout, max_retries=max_retries, ) def create_default_deployer( self, session: Session, config: Config, ui: UI ) -> deployer.Deployer: return deployer.create_default_deployer(session, config, ui) def create_plan_only_deployer( self, session: Session, config: Config, ui: UI ) -> deployer.Deployer: return deployer.create_plan_only_deployer(session, config, ui) def create_deletion_deployer( self, session: Session, ui: UI ) -> deployer.Deployer: return deployer.create_deletion_deployer(TypedAWSClient(session), ui) def create_deployment_reporter( self, ui: UI ) -> deployer.DeploymentReporter: return deployer.DeploymentReporter(ui=ui) def create_config_obj( self, chalice_stage_name: str = DEFAULT_STAGE_NAME, autogen_policy: Optional[bool] = None, api_gateway_stage: Optional[str] = None, user_provided_params: Optional[Dict[str, Any]] = None, ) -> Config: if user_provided_params is None: user_provided_params = {} default_params = { 'project_dir': self.project_dir, 'api_gateway_stage': DEFAULT_APIGATEWAY_STAGE_NAME, 'api_gateway_endpoint_type': DEFAULT_ENDPOINT_TYPE, 'autogen_policy': True, } try: config_from_disk = self.load_project_config() except (OSError, IOError): raise RuntimeError( "Unable to load the project config file. " "Are you sure this is a chalice project?" ) except ValueError as err: raise RuntimeError( "Unable to load the project config file: %s" % err ) self._validate_config_from_disk(config_from_disk) if autogen_policy is not None: user_provided_params['autogen_policy'] = autogen_policy if self.profile is not None: user_provided_params['profile'] = self.profile if api_gateway_stage is not None: user_provided_params['api_gateway_stage'] = api_gateway_stage config = Config( chalice_stage=chalice_stage_name, user_provided_params=user_provided_params, config_from_disk=config_from_disk, default_params=default_params, ) user_provided_params['chalice_app'] = functools.partial( self.load_chalice_app, config.environment_variables ) return config def _validate_config_from_disk(self, config: Dict[str, Any]) -> None: string_version = config.get('version', '1.0') try: version = float(string_version) if version > 2.0: raise UnknownConfigFileVersion(string_version) except ValueError: raise UnknownConfigFileVersion(string_version) def create_app_packager( self, config: Config, options: PackageOptions, package_format: str, template_format: str, merge_template: OptStr = None, ) -> AppPackager: return create_app_packager( config, options, package_format, template_format, merge_template=merge_template, ) def create_log_retriever( self, session: Session, lambda_arn: str, follow_logs: bool ) -> LogRetriever: client = TypedAWSClient(session) if follow_logs: event_generator = cast( BaseLogEventGenerator, FollowLogEventGenerator(client) ) else: event_generator = cast( BaseLogEventGenerator, LogEventGenerator(client) ) retriever = LogRetriever.create_from_lambda_arn( event_generator, lambda_arn ) return retriever def create_stdin_reader(self) -> PipeReader: stream = click.get_binary_stream('stdin') reader = PipeReader(stream) return reader def create_lambda_invoke_handler( self, name: str, stage: str ) -> LambdaInvokeHandler: config = self.create_config_obj(stage) deployed = config.deployed_resources(stage) try: resource = deployed.resource_values(name) arn = resource['lambda_arn'] except (KeyError, ValueError): raise NoSuchFunctionError(name) function_scoped_config = config.scope(stage, name) # The session for max retries needs to be set to 0 for invoking a # lambda function because in the case of a timeout or other retriable # error the underlying client will call the function again. session = self.create_botocore_session( read_timeout=function_scoped_config.lambda_timeout, max_retries=0, ) client = TypedAWSClient(session) invoker = LambdaInvoker(arn, client) handler = LambdaInvokeHandler( invoker, LambdaResponseFormatter(), UI(), ) return handler def load_chalice_app( self, environment_variables: Optional[MutableMapping] = None, validate_feature_flags: Optional[bool] = True, ) -> Chalice: # validate_features indicates that we should validate that # any expiremental features used have the appropriate feature flags. if self.project_dir not in sys.path: sys.path.insert(0, self.project_dir) # The vendor directory has its contents copied up to the top level of # the deployment package. This means that imports will work in the # lambda function as if the vendor directory is on the python path. # For loading the config locally we must add the vendor directory to # the path so it will be treated the same as if it were running on # lambda. vendor_dir = os.path.join(self.project_dir, 'vendor') if os.path.isdir(vendor_dir) and vendor_dir not in sys.path: # This is a tradeoff we have to make for local use. # The common use case of vendor/ is to include # extension modules built for AWS Lambda. If you're # running on a non-linux dev machine, then attempting # to import these files will raise exceptions. As # a workaround, the vendor is added to the end of # sys.path so it's after `./lib/site-packages`. # This gives you a change to install the correct # version locally and still keep the lambda # specific one in vendor/ sys.path.append(vendor_dir) if environment_variables is not None: self._environ.update(environment_variables) try: app = importlib.import_module('app') chalice_app = getattr(app, 'app') except SyntaxError as e: message = ( 'Unable to import your app.py file:\n\n' 'File "%s", line %s\n' ' %s\n' 'SyntaxError: %s' ) % (getattr(e, 'filename'), e.lineno, e.text, e.msg) raise RuntimeError(message) if validate_feature_flags: validate.validate_feature_flags(chalice_app) return chalice_app def load_project_config(self) -> Dict[str, Any]: """Load the chalice config file from the project directory. :raise: OSError/IOError if unable to load the config file. """ config_file = os.path.join(self.project_dir, '.chalice', 'config.json') with open(config_file) as f: return json.loads(f.read()) def create_local_server( self, app_obj: Chalice, config: Config, host: str, port: int ) -> local.LocalDevServer: return local.create_local_server(app_obj, config, host, port) def create_package_options(self) -> PackageOptions: """Create the package options that are required to target regions.""" s = Session(profile=self.profile) client = TypedAWSClient(session=s) return PackageOptions(client)