chalice/package.py (1,369 lines of code) (raw):
# pylint: disable=too-many-lines
import copy
import json
import os
import re
import six
from typing import Any, Optional, Dict, List, Set, Union # noqa
from typing import cast
import yaml
from yaml.scanner import ScannerError
from yaml.nodes import Node # noqa
from yaml.nodes import ScalarNode, SequenceNode, MappingNode
from chalice.deploy.swagger import (
CFNSwaggerGenerator, TerraformSwaggerGenerator)
from chalice.utils import (
OSUtils, UI, serialize_to_json, to_cfn_resource_name
)
from chalice.awsclient import TypedAWSClient # noqa
from chalice.config import Config # noqa
from chalice.deploy import models
from chalice.deploy.appgraph import ApplicationGraphBuilder, DependencyBuilder
from chalice.deploy.deployer import BuildStage # noqa
from chalice.deploy.deployer import create_build_stage
def create_app_packager(
config, options, package_format='cloudformation',
template_format='json', merge_template=None):
# type: (Config, PackageOptions, str, str, Optional[str]) -> AppPackager
osutils = OSUtils()
ui = UI()
application_builder = ApplicationGraphBuilder()
deps_builder = DependencyBuilder()
post_processors = [] # type: List[TemplatePostProcessor]
generator = None # type: Union[None, TemplateGenerator]
template_serializer = cast(TemplateSerializer, JSONTemplateSerializer())
if package_format == 'cloudformation':
build_stage = create_build_stage(
osutils, ui, CFNSwaggerGenerator(), config)
use_yaml_serializer = template_format == 'yaml'
if merge_template is not None and \
YAMLTemplateSerializer.is_yaml_template(merge_template):
# Automatically switch the serializer to yaml if they specify
# a yaml template to merge, regardless of what template format
# they specify.
use_yaml_serializer = True
if use_yaml_serializer:
template_serializer = YAMLTemplateSerializer()
post_processors.extend([
SAMCodeLocationPostProcessor(osutils=osutils),
TemplateMergePostProcessor(
osutils=osutils,
merger=TemplateDeepMerger(),
template_serializer=template_serializer,
merge_template=merge_template)])
generator = SAMTemplateGenerator(config, options)
else:
build_stage = create_build_stage(
osutils, ui, TerraformSwaggerGenerator(), config)
generator = TerraformGenerator(config, options)
post_processors.append(
TerraformCodeLocationPostProcessor(osutils=osutils))
resource_builder = ResourceBuilder(
application_builder, deps_builder, build_stage)
return AppPackager(
generator,
resource_builder,
CompositePostProcessor(post_processors),
template_serializer,
osutils)
class UnsupportedFeatureError(Exception):
pass
class DuplicateResourceNameError(Exception):
pass
class PackageOptions(object):
def __init__(self, client):
# type: (TypedAWSClient) -> None
self._client = client # type: TypedAWSClient
def service_principal(self, service):
# type: (str) -> str
dns_suffix = self._client.endpoint_dns_suffix(service,
self._client.region_name)
return self._client.service_principal(service,
self._client.region_name,
dns_suffix)
class ResourceBuilder(object):
def __init__(self,
application_builder, # type: ApplicationGraphBuilder
deps_builder, # type: DependencyBuilder
build_stage, # type: BuildStage
):
# type: (...) -> None
self._application_builder = application_builder
self._deps_builder = deps_builder
self._build_stage = build_stage
def construct_resources(self, config, chalice_stage_name):
# type: (Config, str) -> List[models.Model]
application = self._application_builder.build(
config, chalice_stage_name)
resources = self._deps_builder.build_dependencies(application)
self._build_stage.execute(config, resources)
# Rebuild dependencies in case the build stage modified
# the application graph.
resources = self._deps_builder.build_dependencies(application)
return resources
class TemplateGenerator(object):
template_file = None # type: str
def __init__(self, config, options):
# type: (Config, PackageOptions) -> None
self._config = config
self._options = options
def dispatch(self, resource, template):
# type: (models.Model, Dict[str, Any]) -> None
name = '_generate_%s' % resource.__class__.__name__.lower()
handler = getattr(self, name, self._default)
handler(resource, template)
def generate(self, resources):
# type: (List[models.Model]) -> Dict[str, Any]
raise NotImplementedError()
def _generate_filebasediampolicy(self, resource, template):
# type: (models.FileBasedIAMPolicy, Dict[str, Any]) -> None
pass
def _generate_autogeniampolicy(self, resource, template):
# type: (models.AutoGenIAMPolicy, Dict[str, Any]) -> None
pass
def _generate_deploymentpackage(self, resource, template):
# type: (models.DeploymentPackage, Dict[str, Any]) -> None
pass
def _generate_precreatediamrole(self, resource, template):
# type: (models.PreCreatedIAMRole, Dict[str, Any]) -> None
pass
def _default(self, resource, template):
# type: (models.Model, Dict[str, Any]) -> None
raise UnsupportedFeatureError(resource)
class SAMTemplateGenerator(TemplateGenerator):
_BASE_TEMPLATE = {
'AWSTemplateFormatVersion': '2010-09-09',
'Transform': 'AWS::Serverless-2016-10-31',
'Outputs': {},
'Resources': {},
}
template_file = "sam"
def __init__(self, config, options):
# type: (Config, PackageOptions) -> None
super(SAMTemplateGenerator, self).__init__(config, options)
self._seen_names = set([]) # type: Set[str]
self._chalice_layer = ""
def generate(self, resources):
# type: (List[models.Model]) -> Dict[str, Any]
template = copy.deepcopy(self._BASE_TEMPLATE)
self._seen_names.clear()
for resource in resources:
self.dispatch(resource, template)
return template
def _generate_lambdalayer(self, resource, template):
# type: (models.LambdaLayer, Dict[str, Any]) -> None
layer = to_cfn_resource_name(
resource.resource_name)
template['Resources'][layer] = {
"Type": "AWS::Serverless::LayerVersion",
"Properties": {
"CompatibleRuntimes": [resource.runtime],
"ContentUri": resource.deployment_package.filename,
"LayerName": resource.layer_name
}
}
self._chalice_layer = layer
def _generate_scheduledevent(self, resource, template):
# type: (models.ScheduledEvent, Dict[str, Any]) -> None
function_cfn_name = to_cfn_resource_name(
resource.lambda_function.resource_name)
function_cfn = template['Resources'][function_cfn_name]
event_cfn_name = self._register_cfn_resource_name(
resource.resource_name)
function_cfn['Properties']['Events'] = {
event_cfn_name: {
'Type': 'Schedule',
'Properties': {
'Schedule': resource.schedule_expression,
}
}
}
def _generate_cloudwatchevent(self, resource, template):
# type: (models.CloudWatchEvent, Dict[str, Any]) -> None
function_cfn_name = to_cfn_resource_name(
resource.lambda_function.resource_name)
function_cfn = template['Resources'][function_cfn_name]
event_cfn_name = self._register_cfn_resource_name(
resource.resource_name)
function_cfn['Properties']['Events'] = {
event_cfn_name: {
'Type': 'CloudWatchEvent',
'Properties': {
# For api calls we need serialized string form, for
# SAM Templates we need datastructures.
'Pattern': json.loads(resource.event_pattern)
}
}
}
def _generate_lambdafunction(self, resource, template):
# type: (models.LambdaFunction, Dict[str, Any]) -> None
resources = template['Resources']
cfn_name = self._register_cfn_resource_name(resource.resource_name)
lambdafunction_definition = {
'Type': 'AWS::Serverless::Function',
'Properties': {
'Runtime': resource.runtime,
'Handler': resource.handler,
'CodeUri': resource.deployment_package.filename,
'Tags': resource.tags,
'Tracing': resource.xray and 'Active' or 'PassThrough',
'Timeout': resource.timeout,
'MemorySize': resource.memory_size,
},
} # type: Dict[str, Any]
if resource.environment_variables:
environment_config = {
'Environment': {
'Variables': resource.environment_variables
}
} # type: Dict[str, Dict[str, Dict[str, str]]]
lambdafunction_definition['Properties'].update(environment_config)
if resource.security_group_ids and resource.subnet_ids:
vpc_config = {
'VpcConfig': {
'SecurityGroupIds': resource.security_group_ids,
'SubnetIds': resource.subnet_ids,
}
} # type: Dict[str, Dict[str, List[str]]]
lambdafunction_definition['Properties'].update(vpc_config)
if resource.reserved_concurrency is not None:
reserved_concurrency_config = {
'ReservedConcurrentExecutions': resource.reserved_concurrency
}
lambdafunction_definition['Properties'].update(
reserved_concurrency_config)
layers = list(resource.layers) or [] # type: List[Any]
if self._chalice_layer:
layers.insert(0, {'Ref': self._chalice_layer})
if layers:
layers_config = {
'Layers': layers
} # type: Dict[str, Any]
lambdafunction_definition['Properties'].update(layers_config)
if resource.log_group is not None:
num_days = resource.log_group.retention_in_days
log_name = self._register_cfn_resource_name(
resource.log_group.resource_name)
log_def = {
'Type': 'AWS::Logs::LogGroup',
'Properties': {
'LogGroupName': {
'Fn::Sub': '/aws/lambda/${%s}' % cfn_name
},
'RetentionInDays': num_days
}
}
resources[log_name] = log_def
resources[cfn_name] = lambdafunction_definition
self._add_iam_role(resource, resources[cfn_name])
def _add_iam_role(self, resource, cfn_resource):
# type: (models.LambdaFunction, Dict[str, Any]) -> None
role = resource.role
if isinstance(role, models.ManagedIAMRole):
cfn_resource['Properties']['Role'] = {
'Fn::GetAtt': [
to_cfn_resource_name(role.resource_name), 'Arn'
],
}
else:
# resource is a PreCreatedIAMRole. This is the only other
# subclass of IAMRole.
role = cast(models.PreCreatedIAMRole, role)
cfn_resource['Properties']['Role'] = role.role_arn
def _generate_loggroup(self, resource, template):
# type: (models.LogGroup, Dict[str, Any]) -> None
# Handled in LambdaFunction generation
pass
def _generate_restapi(self, resource, template):
# type: (models.RestAPI, Dict[str, Any]) -> None
resources = template['Resources']
resources['RestAPI'] = {
'Type': 'AWS::Serverless::Api',
'Properties': {
'EndpointConfiguration': resource.endpoint_type,
'StageName': resource.api_gateway_stage,
'DefinitionBody': resource.swagger_doc,
}
}
if resource.minimum_compression:
properties = resources['RestAPI']['Properties']
properties['MinimumCompressionSize'] = \
int(resource.minimum_compression)
handler_cfn_name = to_cfn_resource_name(
resource.lambda_function.resource_name)
api_handler = template['Resources'].pop(handler_cfn_name)
template['Resources']['APIHandler'] = api_handler
resources['APIHandlerInvokePermission'] = {
'Type': 'AWS::Lambda::Permission',
'Properties': {
'FunctionName': {'Ref': 'APIHandler'},
'Action': 'lambda:InvokeFunction',
'Principal': self._options.service_principal('apigateway'),
'SourceArn': {
'Fn::Sub': [
('arn:${AWS::Partition}:execute-api:${AWS::Region}'
':${AWS::AccountId}:${RestAPIId}/*'),
{'RestAPIId': {'Ref': 'RestAPI'}},
]
},
}
}
for auth in resource.authorizers:
auth_cfn_name = to_cfn_resource_name(auth.resource_name)
resources[auth_cfn_name + 'InvokePermission'] = {
'Type': 'AWS::Lambda::Permission',
'Properties': {
'FunctionName': {'Fn::GetAtt': [auth_cfn_name, 'Arn']},
'Action': 'lambda:InvokeFunction',
'Principal': self._options.service_principal('apigateway'),
'SourceArn': {
'Fn::Sub': [
('arn:${AWS::Partition}:execute-api'
':${AWS::Region}:${AWS::AccountId}'
':${RestAPIId}/*'),
{'RestAPIId': {'Ref': 'RestAPI'}},
]
},
}
}
self._add_domain_name(resource, template)
self._inject_restapi_outputs(template)
def _inject_restapi_outputs(self, template):
# type: (Dict[str, Any]) -> None
# The 'Outputs' of the SAM template are considered
# part of the public API of chalice and therefore
# need to maintain backwards compatibility. This
# method uses the same output key names as the old
# deployer.
# For now, we aren't adding any of the new resources
# to the Outputs section until we can figure out
# a consist naming scheme. Ideally we don't use
# the autogen'd names that contain the md5 suffixes.
stage_name = template['Resources']['RestAPI'][
'Properties']['StageName']
outputs = template['Outputs']
outputs['RestAPIId'] = {
'Value': {'Ref': 'RestAPI'}
}
outputs['APIHandlerName'] = {
'Value': {'Ref': 'APIHandler'}
}
outputs['APIHandlerArn'] = {
'Value': {'Fn::GetAtt': ['APIHandler', 'Arn']}
}
outputs['EndpointURL'] = {
'Value': {
'Fn::Sub': (
'https://${RestAPI}.execute-api.${AWS::Region}'
# The api_gateway_stage is filled in when
# the template is built.
'.${AWS::URLSuffix}/%s/'
) % stage_name
}
}
def _add_websocket_lambda_integration(
self, api_ref, websocket_handler, resources):
# type: (Dict[str, Any], str, Dict[str, Any]) -> None
resources['%sAPIIntegration' % websocket_handler] = {
'Type': 'AWS::ApiGatewayV2::Integration',
'Properties': {
'ApiId': api_ref,
'ConnectionType': 'INTERNET',
'ContentHandlingStrategy': 'CONVERT_TO_TEXT',
'IntegrationType': 'AWS_PROXY',
'IntegrationUri': {
'Fn::Sub': [
(
'arn:${AWS::Partition}:apigateway:${AWS::Region}'
':lambda:path/2015-03-31/functions/arn'
':${AWS::Partition}:lambda:${AWS::Region}'
':${AWS::AccountId}:function'
':${WebsocketHandler}/invocations'
),
{'WebsocketHandler': {'Ref': websocket_handler}}
],
}
}
}
def _add_websocket_lambda_invoke_permission(
self, api_ref, websocket_handler, resources):
# type: (Dict[str, str], str, Dict[str, Any]) -> None
resources['%sInvokePermission' % websocket_handler] = {
'Type': 'AWS::Lambda::Permission',
'Properties': {
'FunctionName': {'Ref': websocket_handler},
'Action': 'lambda:InvokeFunction',
'Principal': self._options.service_principal('apigateway'),
'SourceArn': {
'Fn::Sub': [
('arn:${AWS::Partition}:execute-api'
':${AWS::Region}:${AWS::AccountId}'
':${WebsocketAPIId}/*'),
{'WebsocketAPIId': api_ref},
],
},
}
}
def _add_websocket_lambda_integrations(self, api_ref, resources):
# type: (Dict[str, str], Dict[str, Any]) -> None
websocket_handlers = [
'WebsocketConnect',
'WebsocketMessage',
'WebsocketDisconnect',
]
for handler in websocket_handlers:
if handler in resources:
self._add_websocket_lambda_integration(
api_ref, handler, resources)
self._add_websocket_lambda_invoke_permission(
api_ref, handler, resources)
def _create_route_for_key(self, route_key, api_ref):
# type: (str, Dict[str, str]) -> Dict[str, Any]
integration_ref = {
'$connect': 'WebsocketConnectAPIIntegration',
'$disconnect': 'WebsocketDisconnectAPIIntegration',
}.get(route_key, 'WebsocketMessageAPIIntegration')
return {
'Type': 'AWS::ApiGatewayV2::Route',
'Properties': {
'ApiId': api_ref,
'RouteKey': route_key,
'Target': {
'Fn::Join': [
'/',
[
'integrations',
{'Ref': integration_ref},
]
]
},
},
}
def _generate_websocketapi(self, resource, template):
# type: (models.WebsocketAPI, Dict[str, Any]) -> None
resources = template['Resources']
api_ref = {'Ref': 'WebsocketAPI'}
resources['WebsocketAPI'] = {
'Type': 'AWS::ApiGatewayV2::Api',
'Properties': {
'Name': resource.name,
'RouteSelectionExpression': '$request.body.action',
'ProtocolType': 'WEBSOCKET',
}
}
self._add_websocket_lambda_integrations(api_ref, resources)
route_key_names = []
for route in resource.routes:
key_name = 'Websocket%sRoute' % route.replace(
'$', '').replace('default', 'message').capitalize()
route_key_names.append(key_name)
resources[key_name] = self._create_route_for_key(route, api_ref)
resources['WebsocketAPIDeployment'] = {
'Type': 'AWS::ApiGatewayV2::Deployment',
'DependsOn': route_key_names,
'Properties': {
'ApiId': api_ref,
}
}
resources['WebsocketAPIStage'] = {
'Type': 'AWS::ApiGatewayV2::Stage',
'Properties': {
'ApiId': api_ref,
'DeploymentId': {'Ref': 'WebsocketAPIDeployment'},
'StageName': resource.api_gateway_stage,
}
}
self._add_websocket_domain_name(resource, template)
self._inject_websocketapi_outputs(template)
def _inject_websocketapi_outputs(self, template):
# type: (Dict[str, Any]) -> None
# The 'Outputs' of the SAM template are considered
# part of the public API of chalice and therefore
# need to maintain backwards compatibility. This
# method uses the same output key names as the old
# deployer.
# For now, we aren't adding any of the new resources
# to the Outputs section until we can figure out
# a consist naming scheme. Ideally we don't use
# the autogen'd names that contain the md5 suffixes.
stage_name = template['Resources']['WebsocketAPIStage'][
'Properties']['StageName']
outputs = template['Outputs']
resources = template['Resources']
outputs['WebsocketAPIId'] = {
'Value': {'Ref': 'WebsocketAPI'}
}
if 'WebsocketConnect' in resources:
outputs['WebsocketConnectHandlerArn'] = {
'Value': {'Fn::GetAtt': ['WebsocketConnect', 'Arn']}
}
outputs['WebsocketConnectHandlerName'] = {
'Value': {'Ref': 'WebsocketConnect'}
}
if 'WebsocketMessage' in resources:
outputs['WebsocketMessageHandlerArn'] = {
'Value': {'Fn::GetAtt': ['WebsocketMessage', 'Arn']}
}
outputs['WebsocketMessageHandlerName'] = {
'Value': {'Ref': 'WebsocketMessage'}
}
if 'WebsocketDisconnect' in resources:
outputs['WebsocketDisconnectHandlerArn'] = {
'Value': {'Fn::GetAtt': ['WebsocketDisconnect', 'Arn']}
} # There is not a lot of green in here.
outputs['WebsocketDisconnectHandlerName'] = {
'Value': {'Ref': 'WebsocketDisconnect'}
}
outputs['WebsocketConnectEndpointURL'] = {
'Value': {
'Fn::Sub': (
'wss://${WebsocketAPI}.execute-api.${AWS::Region}'
# The api_gateway_stage is filled in when
# the template is built.
'.${AWS::URLSuffix}/%s/'
) % stage_name
}
}
# The various IAM roles/policies are handled in the
# Lambda function generation. We're creating these
# noop methods to indicate we've accounted for these
# resources.
def _generate_managediamrole(self, resource, template):
# type: (models.ManagedIAMRole, Dict[str, Any]) -> None
role_cfn_name = self._register_cfn_resource_name(
resource.resource_name)
resource.trust_policy['Statement'][0]['Principal']['Service'] = \
self._options.service_principal('lambda')
template['Resources'][role_cfn_name] = {
'Type': 'AWS::IAM::Role',
'Properties': {
'AssumeRolePolicyDocument': resource.trust_policy,
'Policies': [
{'PolicyDocument': resource.policy.document,
'PolicyName': role_cfn_name + 'Policy'},
],
}
}
def _generate_s3bucketnotification(self, resource, template):
# type: (models.S3BucketNotification, Dict[str, Any]) -> None
message = (
"Unable to package chalice apps that @app.on_s3_event decorator. "
"CloudFormation does not support modifying the event "
"notifications of existing buckets. "
"You can deploy this app using `chalice deploy`."
)
raise NotImplementedError(message)
def _generate_snslambdasubscription(self, resource, template):
# type: (models.SNSLambdaSubscription, Dict[str, Any]) -> None
function_cfn_name = to_cfn_resource_name(
resource.lambda_function.resource_name)
function_cfn = template['Resources'][function_cfn_name]
sns_cfn_name = self._register_cfn_resource_name(
resource.resource_name)
if re.match(r"^arn:aws[a-z\-]*:sns:", resource.topic):
topic_arn = resource.topic # type: Union[str, Dict[str, str]]
else:
topic_arn = {
'Fn::Sub': (
'arn:${AWS::Partition}:sns'
':${AWS::Region}:${AWS::AccountId}:%s' %
resource.topic
)
}
function_cfn['Properties']['Events'] = {
sns_cfn_name: {
'Type': 'SNS',
'Properties': {
'Topic': topic_arn,
}
}
}
def _generate_sqseventsource(self, resource, template):
# type: (models.SQSEventSource, Dict[str, Any]) -> None
function_cfn_name = to_cfn_resource_name(
resource.lambda_function.resource_name)
function_cfn = template['Resources'][function_cfn_name]
sqs_cfn_name = self._register_cfn_resource_name(
resource.resource_name)
queue = '' # type: Union[str, Dict[str, Any]]
if isinstance(resource.queue, models.QueueARN):
queue = resource.queue.arn
else:
queue = {
'Fn::Sub': ('arn:${AWS::Partition}:sqs:${AWS::Region}'
':${AWS::AccountId}:%s' % resource.queue)
}
properties = {
'Queue': queue,
'BatchSize': resource.batch_size,
'MaximumBatchingWindowInSeconds':
resource.maximum_batching_window_in_seconds
}
if resource.maximum_concurrency:
properties["ScalingConfig"] = {
"MaximumConcurrency": resource.maximum_concurrency
}
function_cfn['Properties']['Events'] = {
sqs_cfn_name: {
'Type': 'SQS',
'Properties': properties
}
}
def _generate_kinesiseventsource(self, resource, template):
# type: (models.KinesisEventSource, Dict[str, Any]) -> None
function_cfn_name = to_cfn_resource_name(
resource.lambda_function.resource_name)
function_cfn = template['Resources'][function_cfn_name]
kinesis_cfn_name = self._register_cfn_resource_name(
resource.resource_name)
properties = {
'Stream': {
'Fn::Sub': (
'arn:${AWS::Partition}:kinesis:${AWS::Region}'
':${AWS::AccountId}:stream/%s' %
resource.stream
)
},
'BatchSize': resource.batch_size,
'StartingPosition': resource.starting_position,
'MaximumBatchingWindowInSeconds':
resource.maximum_batching_window_in_seconds,
}
function_cfn['Properties']['Events'] = {
kinesis_cfn_name: {
'Type': 'Kinesis',
'Properties': properties
}
}
def _generate_dynamodbeventsource(self, resource, template):
# type: (models.DynamoDBEventSource, Dict[str, Any]) -> None
function_cfn_name = to_cfn_resource_name(
resource.lambda_function.resource_name)
function_cfn = template['Resources'][function_cfn_name]
ddb_cfn_name = self._register_cfn_resource_name(
resource.resource_name)
properties = {
'Stream': resource.stream_arn,
'BatchSize': resource.batch_size,
'StartingPosition': resource.starting_position,
'MaximumBatchingWindowInSeconds':
resource.maximum_batching_window_in_seconds,
}
function_cfn['Properties']['Events'] = {
ddb_cfn_name: {
'Type': 'DynamoDB',
'Properties': properties
}
}
def _generate_apimapping(self, resource, template):
# type: (models.APIMapping, Dict[str, Any]) -> None
pass
def _generate_domainname(self, resource, template):
# type: (models.DomainName, Dict[str, Any]) -> None
pass
def _add_domain_name(self, resource, template):
# type: (models.RestAPI, Dict[str, Any]) -> None
if resource.domain_name is None:
return
domain_name = resource.domain_name
endpoint_type = resource.endpoint_type
cfn_name = to_cfn_resource_name(domain_name.resource_name)
properties = {
'DomainName': domain_name.domain_name,
'EndpointConfiguration': {
'Types': [endpoint_type],
}
} # type: Dict[str, Any]
if endpoint_type == 'EDGE':
properties['CertificateArn'] = domain_name.certificate_arn
else:
properties['RegionalCertificateArn'] = domain_name.certificate_arn
if domain_name.tls_version is not None:
properties['SecurityPolicy'] = domain_name.tls_version.value
if domain_name.tags:
properties['Tags'] = [
{'Key': key, 'Value': value}
for key, value in sorted(domain_name.tags.items())
]
template['Resources'][cfn_name] = {
'Type': 'AWS::ApiGateway::DomainName',
'Properties': properties
}
template['Resources'][cfn_name + 'Mapping'] = {
'Type': 'AWS::ApiGateway::BasePathMapping',
'Properties': {
'DomainName': {'Ref': 'ApiGatewayCustomDomain'},
'RestApiId': {'Ref': 'RestAPI'},
'BasePath': domain_name.api_mapping.mount_path,
'Stage': resource.api_gateway_stage,
}
}
def _add_websocket_domain_name(self, resource, template):
# type: (models.WebsocketAPI, Dict[str, Any]) -> None
if resource.domain_name is None:
return
domain_name = resource.domain_name
cfn_name = to_cfn_resource_name(domain_name.resource_name)
properties = {
'DomainName': domain_name.domain_name,
'DomainNameConfigurations': [
{'CertificateArn': domain_name.certificate_arn,
'EndpointType': 'REGIONAL'},
]
} # type: Dict[str, Any]
if domain_name.tags:
properties['Tags'] = domain_name.tags
template['Resources'][cfn_name] = {
'Type': 'AWS::ApiGatewayV2::DomainName',
'Properties': properties,
}
template['Resources'][cfn_name + 'Mapping'] = {
'Type': 'AWS::ApiGatewayV2::ApiMapping',
'Properties': {
'DomainName': {'Ref': cfn_name},
'ApiId': {'Ref': 'WebsocketAPI'},
'ApiMappingKey': domain_name.api_mapping.mount_path,
'Stage': {'Ref': 'WebsocketAPIStage'},
}
}
def _register_cfn_resource_name(self, name):
# type: (str) -> str
cfn_name = to_cfn_resource_name(name)
if cfn_name in self._seen_names:
raise DuplicateResourceNameError(
'A duplicate resource name was generated for '
'the SAM template: %s' % cfn_name,
)
self._seen_names.add(cfn_name)
return cfn_name
class TerraformGenerator(TemplateGenerator):
template_file = "chalice.tf"
def __init__(self, config, options):
# type: (Config, PackageOptions) -> None
super(TerraformGenerator, self).__init__(config, options)
self._chalice_layer = ""
def generate(self, resources):
# type: (List[models.Model]) -> Dict[str, Any]
template = {
'resource': {},
'locals': {},
'terraform': {
'required_version': '>= 0.12.26, < 1.4.0',
'required_providers': {
'aws': {'version': '>= 2, < 5'},
'null': {'version': '>= 2, < 4'}
}
},
'data': {
'aws_caller_identity': {'chalice': {}},
'aws_partition': {'chalice': {}},
'aws_region': {'chalice': {}},
'null_data_source': {
'chalice': {
'inputs': {
'app': self._config.app_name,
'stage': self._config.chalice_stage
}
}
}
}
}
for resource in resources:
self.dispatch(resource, template)
return template
def _fref(self, lambda_function, attr='arn'):
# type: (models.ManagedModel, str) -> str
return '${aws_lambda_function.%s.%s}' % (
lambda_function.resource_name, attr)
def _arnref(self, arn_template, **kw):
# type: (str, str) -> str
d = dict(
partition='${data.aws_partition.chalice.partition}',
region='${data.aws_region.chalice.name}',
account_id='${data.aws_caller_identity.chalice.account_id}')
d.update(kw)
return arn_template % d
def _generate_managediamrole(self, resource, template):
# type: (models.ManagedIAMRole, Dict[str, Any]) -> None
resource.trust_policy['Statement'][0]['Principal']['Service'] = \
self._options.service_principal('lambda')
template['resource'].setdefault('aws_iam_role', {})[
resource.resource_name] = {
'name': resource.role_name,
'assume_role_policy': json.dumps(resource.trust_policy)
}
template['resource'].setdefault('aws_iam_role_policy', {})[
resource.resource_name] = {
'name': resource.resource_name + 'Policy',
'policy': json.dumps(resource.policy.document),
'role': '${aws_iam_role.%s.id}' % resource.resource_name,
}
def _add_websocket_lambda_integration(
self, websocket_api_id, websocket_handler, template):
# type: (str, str, Dict[str, Any]) -> None
websocket_handler_function_name = \
"${aws_lambda_function.%s.function_name}" % websocket_handler
resource_definition = {
'api_id': websocket_api_id,
'connection_type': 'INTERNET',
'content_handling_strategy': 'CONVERT_TO_TEXT',
'integration_type': 'AWS_PROXY',
'integration_uri': self._arnref(
"arn:%(partition)s:apigateway:%(region)s"
":lambda:path/2015-03-31/functions/arn"
":%(partition)s:lambda:%(region)s"
":%(account_id)s:function"
":%(websocket_handler_function_name)s/invocations",
websocket_handler_function_name=websocket_handler_function_name
)
}
template['resource'].setdefault(
'aws_apigatewayv2_integration', {}
)['%s_api_integration' % websocket_handler] = resource_definition
def _add_websocket_lambda_invoke_permission(
self, websocket_api_id, websocket_handler, template):
# type: (str, str, Dict[str, Any]) -> None
websocket_handler_function_name = \
"${aws_lambda_function.%s.function_name}" % websocket_handler
resource_definition = {
"function_name": websocket_handler_function_name,
"action": "lambda:InvokeFunction",
"principal": self._options.service_principal('apigateway'),
"source_arn": self._arnref(
"arn:%(partition)s:execute-api"
":%(region)s:%(account_id)s"
":%(websocket_api_id)s/*",
websocket_api_id=websocket_api_id
)
}
template['resource'].setdefault(
'aws_lambda_permission', {}
)['%s_invoke_permission' % websocket_handler] = resource_definition
def _add_websockets_route(self, websocket_api_id, route_key, template):
# type: (str, str, Dict[str, Any]) -> str
integration_target = {
'$connect': 'integrations/${aws_apigatewayv2_integration'
'.websocket_connect_api_integration.id}',
'$disconnect': 'integrations/${aws_apigatewayv2_integration'
'.websocket_disconnect_api_integration.id}',
}.get(route_key,
'integrations/${aws_apigatewayv2_integration'
'.websocket_message_api_integration.id}')
route_resource_name = {
'$connect': 'websocket_connect_route',
'$disconnect': 'websocket_disconnect_route',
'$default': 'websocket_message_route',
}.get(route_key, 'message')
template['resource'].setdefault(
'aws_apigatewayv2_route', {}
)[route_resource_name] = {
"api_id": websocket_api_id,
"route_key": route_key,
"target": integration_target
}
return route_resource_name
def _add_websocket_domain_name(self, websocket_api_id, resource, template):
# type: (str, models.WebsocketAPI, Dict[str, Any]) -> None
if resource.domain_name is None:
return
domain_name = resource.domain_name
ws_domain_name_definition = {
"domain_name": domain_name.domain_name,
"domain_name_configuration": {
'certificate_arn': domain_name.certificate_arn,
'endpoint_type': 'REGIONAL',
},
}
if domain_name.tags:
ws_domain_name_definition['tags'] = domain_name.tags
template['resource'].setdefault(
'aws_apigatewayv2_domain_name', {}
)[domain_name.resource_name] = ws_domain_name_definition
template['resource'].setdefault(
'aws_apigatewayv2_api_mapping', {}
)[domain_name.resource_name + '_mapping'] = {
"api_id": websocket_api_id,
"domain_name": "${aws_apigatewayv2_domain_name.%s.id}" %
domain_name.resource_name,
"stage": "${aws_apigatewayv2_stage.websocket_api_stage.id}",
}
def _inject_websocketapi_outputs(self, websocket_api_id, template):
# type: (str, Dict[str, Any]) -> None
aws_lambda_functions = template['resource']['aws_lambda_function']
stage_name = \
template['resource']['aws_apigatewayv2_stage'][
'websocket_api_stage'][
'name']
output = template.setdefault('output', {})
output['WebsocketAPIId'] = {"value": websocket_api_id}
if 'websocket_connect' in aws_lambda_functions:
output['WebsocketConnectHandlerArn'] = {
"value": "${aws_lambda_function.websocket_connect.arn}"}
output['WebsocketConnectHandlerName'] = {
"value": (
"${aws_lambda_function.websocket_connect.function_name}")}
if 'websocket_message' in aws_lambda_functions:
output['WebsocketMessageHandlerArn'] = {
"value": "${aws_lambda_function.websocket_message.arn}"}
output['WebsocketMessageHandlerName'] = {
"value": (
"${aws_lambda_function.websocket_message.function_name}")}
if 'websocket_disconnect' in aws_lambda_functions:
output['WebsocketDisconnectHandlerArn'] = {
"value": "${aws_lambda_function.websocket_disconnect.arn}"}
output['WebsocketDisconnectHandlerName'] = {
"value": (
"${aws_lambda_function.websocket_disconnect"
".function_name}")}
output['WebsocketConnectEndpointURL'] = {
"value": (
'wss://%(websocket_api_id)s.execute-api'
# The api_gateway_stage is filled in when
# the template is built.
'.${data.aws_region.chalice.name}'
'.amazonaws.com/%(stage_name)s/'
) % {
"stage_name": stage_name,
"websocket_api_id": websocket_api_id
}
}
def _generate_websocketapi(self, resource, template):
# type: (models.WebsocketAPI, Dict[str, Any]) -> None
ws_definition = {
'name': resource.name,
'route_selection_expression': '$request.body.action',
'protocol_type': 'WEBSOCKET',
}
template['resource'].setdefault('aws_apigatewayv2_api', {})[
resource.resource_name] = ws_definition
websocket_api_id = "${aws_apigatewayv2_api.%s.id}" % \
resource.resource_name
websocket_handlers = [
'websocket_connect',
'websocket_message',
'websocket_disconnect',
]
for handler in websocket_handlers:
if handler in template['resource']['aws_lambda_function']:
self._add_websocket_lambda_integration(websocket_api_id,
handler, template)
self._add_websocket_lambda_invoke_permission(websocket_api_id,
handler, template)
route_resource_names = []
for route_key in resource.routes:
route_resource_name = self._add_websockets_route(websocket_api_id,
route_key,
template)
route_resource_names.append(route_resource_name)
template['resource'].setdefault(
'aws_apigatewayv2_deployment', {}
)['websocket_api_deployment'] = {
"api_id": websocket_api_id,
"depends_on": ["aws_apigatewayv2_route.%s" % name for name in
route_resource_names]
}
template['resource'].setdefault(
'aws_apigatewayv2_stage', {}
)['websocket_api_stage'] = {
"api_id": websocket_api_id,
"deployment_id": ("${aws_apigatewayv2_deployment"
".websocket_api_deployment.id}"),
"name": resource.api_gateway_stage
}
self._add_websocket_domain_name(websocket_api_id, resource, template)
self._inject_websocketapi_outputs(websocket_api_id, template)
def _generate_s3bucketnotification(self, resource, template):
# type: (models.S3BucketNotification, Dict[str, Any]) -> None
bnotify = {
'events': resource.events,
'lambda_function_arn': self._fref(resource.lambda_function)
}
if resource.prefix:
bnotify['filter_prefix'] = resource.prefix
if resource.suffix:
bnotify['filter_suffix'] = resource.suffix
# we use the bucket name here because we need to aggregate
# all the notifications subscribers for a bucket.
# Due to cyclic references to buckets created in terraform
# we also try to detect and resolve.
if '{aws_s3_bucket.' in resource.bucket:
bucket_name = resource.bucket.split('.')[1]
else:
bucket_name = resource.bucket
template['resource'].setdefault(
'aws_s3_bucket_notification', {}).setdefault(
bucket_name + '_notify',
{'bucket': resource.bucket}).setdefault(
'lambda_function', []).append(bnotify)
template['resource'].setdefault('aws_lambda_permission', {})[
resource.resource_name] = {
'statement_id': resource.resource_name,
'action': 'lambda:InvokeFunction',
'function_name': self._fref(resource.lambda_function),
'principal': self._options.service_principal('s3'),
'source_account': '${data.aws_caller_identity.chalice.account_id}',
'source_arn': ('arn:${data.aws_partition.chalice.partition}:'
's3:::%s' % resource.bucket)
}
def _generate_sqseventsource(self, resource, template):
# type: (models.SQSEventSource, Dict[str, Any]) -> None
if isinstance(resource.queue, models.QueueARN):
event_source_arn = resource.queue.arn
else:
event_source_arn = self._arnref(
"arn:%(partition)s:sqs:%(region)s"
":%(account_id)s:%(queue)s",
queue=resource.queue
)
aws_lambda_event_source_mapping = {
'event_source_arn': event_source_arn,
'batch_size': resource.batch_size,
'maximum_batching_window_in_seconds':
resource.maximum_batching_window_in_seconds,
'function_name': self._fref(resource.lambda_function),
}
if resource.maximum_concurrency:
aws_lambda_event_source_mapping["scaling_config"] = {
"maximum_concurrency": resource.maximum_concurrency
}
template['resource'].setdefault('aws_lambda_event_source_mapping', {})[
resource.resource_name] = aws_lambda_event_source_mapping
def _generate_kinesiseventsource(self, resource, template):
# type: (models.KinesisEventSource, Dict[str, Any]) -> None
template['resource'].setdefault('aws_lambda_event_source_mapping', {})[
resource.resource_name] = {
'event_source_arn': self._arnref(
"arn:%(partition)s:kinesis:%(region)s"
":%(account_id)s:stream/%(stream)s",
stream=resource.stream),
'batch_size': resource.batch_size,
'starting_position': resource.starting_position,
'maximum_batching_window_in_seconds':
resource.maximum_batching_window_in_seconds,
'function_name': self._fref(resource.lambda_function)
}
def _generate_dynamodbeventsource(self, resource, template):
# type: (models.DynamoDBEventSource, Dict[str, Any]) -> None
template['resource'].setdefault('aws_lambda_event_source_mapping', {})[
resource.resource_name] = {
'event_source_arn': resource.stream_arn,
'batch_size': resource.batch_size,
'starting_position': resource.starting_position,
'maximum_batching_window_in_seconds':
resource.maximum_batching_window_in_seconds,
'function_name': self._fref(resource.lambda_function),
}
def _generate_snslambdasubscription(self, resource, template):
# type: (models.SNSLambdaSubscription, Dict[str, Any]) -> None
if resource.topic.startswith('arn:aws'):
topic_arn = resource.topic
else:
topic_arn = self._arnref(
'arn:%(partition)s:sns:%(region)s:%(account_id)s:%(topic)s',
topic=resource.topic)
template['resource'].setdefault('aws_sns_topic_subscription', {})[
resource.resource_name] = {
'topic_arn': topic_arn,
'protocol': 'lambda',
'endpoint': self._fref(resource.lambda_function)
}
template['resource'].setdefault('aws_lambda_permission', {})[
resource.resource_name] = {
'function_name': self._fref(resource.lambda_function),
'action': 'lambda:InvokeFunction',
'principal': self._options.service_principal('sns'),
'source_arn': topic_arn
}
def _generate_cloudwatchevent(self, resource, template):
# type: (models.CloudWatchEvent, Dict[str, Any]) -> None
template['resource'].setdefault(
'aws_cloudwatch_event_rule', {})[
resource.resource_name] = {
'name': resource.resource_name,
'event_pattern': resource.event_pattern
}
self._cwe_helper(resource, template)
def _generate_scheduledevent(self, resource, template):
# type: (models.ScheduledEvent, Dict[str, Any]) -> None
template['resource'].setdefault(
'aws_cloudwatch_event_rule', {})[
resource.resource_name] = {
'name': resource.resource_name,
'schedule_expression': resource.schedule_expression,
'description': resource.rule_description,
}
self._cwe_helper(resource, template)
def _cwe_helper(self, resource, template):
# type: (models.CloudWatchEventBase, Dict[str, Any]) -> None
template['resource'].setdefault(
'aws_cloudwatch_event_target', {})[
resource.resource_name] = {
'rule': '${aws_cloudwatch_event_rule.%s.name}' % (
resource.resource_name),
'target_id': resource.resource_name,
'arn': self._fref(resource.lambda_function)
}
template['resource'].setdefault(
'aws_lambda_permission', {})[
resource.resource_name] = {
'function_name': self._fref(resource.lambda_function),
'action': 'lambda:InvokeFunction',
'principal': self._options.service_principal('events'),
'source_arn': "${aws_cloudwatch_event_rule.%s.arn}" % (
resource.resource_name)
}
def _generate_lambdalayer(self, resource, template):
# type: (models.LambdaLayer, Dict[str, Any]) -> None
template['resource'].setdefault(
"aws_lambda_layer_version", {})[
resource.resource_name] = {
'layer_name': resource.layer_name,
'compatible_runtimes': [resource.runtime],
'filename': resource.deployment_package.filename,
}
self._chalice_layer = resource.resource_name
def _generate_lambdafunction(self, resource, template):
# type: (models.LambdaFunction, Dict[str, Any]) -> None
func_definition = {
'function_name': resource.function_name,
'runtime': resource.runtime,
'handler': resource.handler,
'memory_size': resource.memory_size,
'tags': resource.tags,
'timeout': resource.timeout,
'source_code_hash': '${filebase64sha256("%s")}' % (
resource.deployment_package.filename),
'filename': resource.deployment_package.filename
} # type: Dict[str, Any]
if resource.security_group_ids and resource.subnet_ids:
func_definition['vpc_config'] = {
'subnet_ids': resource.subnet_ids,
'security_group_ids': resource.security_group_ids
}
if resource.reserved_concurrency is not None:
func_definition['reserved_concurrent_executions'] = (
resource.reserved_concurrency
)
if resource.environment_variables:
func_definition['environment'] = {
'variables': resource.environment_variables
}
if resource.xray:
func_definition['tracing_config'] = {
'mode': 'Active'
}
if self._chalice_layer:
func_definition['layers'] = [
'${aws_lambda_layer_version.%s.arn}' % self._chalice_layer
]
if resource.layers:
func_definition.setdefault('layers', []).extend(
list(resource.layers))
if isinstance(resource.role, models.ManagedIAMRole):
func_definition['role'] = '${aws_iam_role.%s.arn}' % (
resource.role.resource_name)
else:
# resource is a PreCreatedIAMRole.
role = cast(models.PreCreatedIAMRole, resource.role)
func_definition['role'] = role.role_arn
if resource.log_group is not None:
log_group = resource.log_group
num_days = log_group.retention_in_days
template['resource'].setdefault('aws_cloudwatch_log_group', {})[
log_group.resource_name] = {
'name': log_group.resource_name,
'retention_in_days': num_days,
}
template['resource'].setdefault('aws_lambda_function', {})[
resource.resource_name] = func_definition
def _generate_log_group(self, resource, remplate):
# type: (models.LogGroup, Dict[str, Any]) -> None
# Handled in LambdaFunction generation
pass
def _generate_restapi(self, resource, template):
# type: (models.RestAPI, Dict[str, Any]) -> None
# typechecker happiness
swagger_doc = cast(Dict, resource.swagger_doc)
template['locals']['chalice_api_swagger'] = json.dumps(
swagger_doc)
template['resource'].setdefault('aws_api_gateway_rest_api', {})[
resource.resource_name] = {
'body': '${local.chalice_api_swagger}',
# Terraform will diff explicitly configured attributes
# to the current state of the resource. Attributes configured
# via swagger on the REST api need to be duplicated here, else
# terraform will set them back to empty.
'name': swagger_doc['info']['title'],
'binary_media_types': swagger_doc[
'x-amazon-apigateway-binary-media-types'],
'endpoint_configuration': {'types': [resource.endpoint_type]}
}
if 'x-amazon-apigateway-policy' in swagger_doc:
template['resource'][
'aws_api_gateway_rest_api'][
resource.resource_name]['policy'] = json.dumps(
swagger_doc['x-amazon-apigateway-policy'])
if resource.minimum_compression.isdigit():
template['resource'][
'aws_api_gateway_rest_api'][
resource.resource_name][
'minimum_compression_size'] = int(
resource.minimum_compression)
template['resource'].setdefault('aws_api_gateway_deployment', {})[
resource.resource_name] = {
'stage_name': resource.api_gateway_stage,
# Ensure that the deployment gets redeployed if we update
# the swagger description for the api by using its checksum
# in the stage description.
'stage_description': (
"${md5(local.chalice_api_swagger)}"),
'rest_api_id': '${aws_api_gateway_rest_api.%s.id}' % (
resource.resource_name),
'lifecycle': {'create_before_destroy': True}
}
template['resource'].setdefault('aws_lambda_permission', {})[
resource.resource_name + '_invoke'] = {
'function_name': self._fref(resource.lambda_function),
'action': 'lambda:InvokeFunction',
'principal': self._options.service_principal('apigateway'),
'source_arn':
"${aws_api_gateway_rest_api.%s.execution_arn}/*" % (
resource.resource_name)
}
template.setdefault('output', {})[
'EndpointURL'] = {
'value': '${aws_api_gateway_deployment.%s.invoke_url}' % (
resource.resource_name)
}
template.setdefault('output', {})[
'RestAPIId'] = {
'value': '${aws_api_gateway_rest_api.%s.id}' % (
resource.resource_name)
}
for auth in resource.authorizers:
template['resource']['aws_lambda_permission'][
auth.resource_name + '_invoke'] = {
'function_name': self._fref(auth),
'action': 'lambda:InvokeFunction',
'principal': self._options.service_principal('apigateway'),
'source_arn': (
"${aws_api_gateway_rest_api.%s.execution_arn}" % (
resource.resource_name) + "/*"
)
}
self._add_domain_name(resource, template)
def _add_domain_name(self, resource, template):
# type: (models.RestAPI, Dict[str, Any]) -> None
if resource.domain_name is None:
return
domain_name = resource.domain_name
endpoint_type = resource.endpoint_type
properties = {
'domain_name': domain_name.domain_name,
'endpoint_configuration': {'types': [endpoint_type]},
}
if endpoint_type == 'EDGE':
properties['certificate_arn'] = domain_name.certificate_arn
else:
properties[
'regional_certificate_arn'] = domain_name.certificate_arn
if domain_name.tls_version is not None:
properties['security_policy'] = domain_name.tls_version.value
if domain_name.tags:
properties['tags'] = domain_name.tags
template['resource']['aws_api_gateway_domain_name'] = {
domain_name.resource_name: properties
}
template['resource']['aws_api_gateway_base_path_mapping'] = {
domain_name.resource_name + '_mapping': {
'stage_name': resource.api_gateway_stage,
'domain_name': domain_name.domain_name,
'api_id': '${aws_api_gateway_rest_api.%s.id}' % (
resource.resource_name)
}
}
self._add_domain_name_outputs(domain_name.resource_name, endpoint_type,
template)
def _add_domain_name_outputs(self, domain_resource_name,
endpoint_type, template):
# type: (str, str, Dict[str, Any]) -> None
base = (
'aws_api_gateway_domain_name.%s' % domain_resource_name
)
if endpoint_type == 'EDGE':
alias_domain_name = '${%s.cloudfront_domain_name}' % base
hosted_zone_id = '${%s.cloudfront_zone_id}' % base
else:
alias_domain_name = '${%s.regional_domain_name}' % base
hosted_zone_id = '${%s.regional_zone_id}' % base
template.setdefault('output', {})['AliasDomainName'] = {
'value': alias_domain_name
}
template.setdefault('output', {})['HostedZoneId'] = {
'value': hosted_zone_id
}
def _generate_apimapping(self, resource, template):
# type: (models.APIMapping, Dict[str, Any]) -> None
pass
def _generate_domainname(self, resource, template):
# type: (models.DomainName, Dict[str, Any]) -> None
pass
class AppPackager(object):
def __init__(self,
templater, # type: TemplateGenerator
resource_builder, # type: ResourceBuilder
post_processor, # type: TemplatePostProcessor
template_serializer, # type: TemplateSerializer
osutils, # type: OSUtils
):
# type: (...) -> None
self._templater = templater
self._resource_builder = resource_builder
self._template_post_processor = post_processor
self._template_serializer = template_serializer
self._osutils = osutils
def _to_json(self, doc):
# type: (Any) -> str
return serialize_to_json(doc)
def _to_yaml(self, doc):
# type: (Any) -> str
return yaml.dump(doc, allow_unicode=True)
def package_app(self, config, outdir, chalice_stage_name):
# type: (Config, str, str) -> None
# Deployment package
resources = self._resource_builder.construct_resources(
config, chalice_stage_name)
template = self._templater.generate(resources)
if not self._osutils.directory_exists(outdir):
self._osutils.makedirs(outdir)
self._template_post_processor.process(
template, config, outdir, chalice_stage_name)
contents = self._template_serializer.serialize_template(template)
extension = self._template_serializer.file_extension
filename = os.path.join(
outdir, self._templater.template_file) + '.' + extension
self._osutils.set_file_contents(
filename=filename,
contents=contents,
binary=False
)
class TemplatePostProcessor(object):
def __init__(self, osutils):
# type: (OSUtils) -> None
self._osutils = osutils
def process(self, template, config, outdir, chalice_stage_name):
# type: (Dict[str, Any], Config, str, str) -> None
raise NotImplementedError()
class SAMCodeLocationPostProcessor(TemplatePostProcessor):
def process(self, template, config, outdir, chalice_stage_name):
# type: (Dict[str, Any], Config, str, str) -> None
self._fixup_deployment_package(template, outdir)
def _fixup_deployment_package(self, template, outdir):
# type: (Dict[str, Any], str) -> None
# NOTE: This isn't my ideal way to do this. I'd like
# to move this into the build step where something
# copies the DeploymentPackage.filename over to the
# outdir. That would require plumbing through user
# provided params such as "outdir" into the build stage
# somehow, which isn't currently possible.
copied = False
for resource in template['Resources'].values():
if resource['Type'] == 'AWS::Serverless::Function':
original_location = resource['Properties']['CodeUri']
new_location = os.path.join(outdir, 'deployment.zip')
if not copied:
self._osutils.copy(original_location, new_location)
copied = True
resource['Properties']['CodeUri'] = './deployment.zip'
elif resource['Type'] == 'AWS::Serverless::LayerVersion':
original_location = resource['Properties']['ContentUri']
new_location = os.path.join(outdir, 'layer-deployment.zip')
self._osutils.copy(original_location, new_location)
resource['Properties']['ContentUri'] = './layer-deployment.zip'
class TerraformCodeLocationPostProcessor(TemplatePostProcessor):
def process(self, template, config, outdir, chalice_stage_name):
# type: (Dict[str, Any], Config, str, str) -> None
copied = False
resources = template['resource']
for r in resources.get('aws_lambda_function', {}).values():
if not copied:
asset_path = os.path.join(outdir, 'deployment.zip')
self._osutils.copy(r['filename'], asset_path)
copied = True
r['filename'] = "${path.module}/deployment.zip"
r['source_code_hash'] = \
'${filebase64sha256("${path.module}/deployment.zip")}'
copied = False
for r in resources.get('aws_lambda_layer_version', {}).values():
if not copied:
asset_path = os.path.join(outdir, 'layer-deployment.zip')
self._osutils.copy(r['filename'], asset_path)
copied = True
r['filename'] = "${path.module}/layer-deployment.zip"
r['source_code_hash'] = \
'${filebase64sha256("${path.module}/layer-deployment.zip")}'
class TemplateMergePostProcessor(TemplatePostProcessor):
def __init__(self,
osutils, # type: OSUtils
merger, # type: TemplateMerger
template_serializer, # type: TemplateSerializer
merge_template=None, # type: Optional[str]
):
# type: (...) -> None
super(TemplateMergePostProcessor, self).__init__(osutils)
self._merger = merger
self._template_serializer = template_serializer
self._merge_template = merge_template
def process(self, template, config, outdir, chalice_stage_name):
# type: (Dict[str, Any], Config, str, str) -> None
if self._merge_template is None:
return
loaded_template = self._load_template_to_merge()
merged = self._merger.merge(loaded_template, template)
template.clear()
template.update(merged)
def _load_template_to_merge(self):
# type: () -> Dict[str, Any]
template_name = cast(str, self._merge_template)
filepath = os.path.abspath(template_name)
if not self._osutils.file_exists(filepath):
raise RuntimeError('Cannot find template file: %s' % filepath)
template_data = self._osutils.get_file_contents(filepath, binary=False)
loaded_template = self._template_serializer.load_template(
template_data, filepath)
return loaded_template
class CompositePostProcessor(TemplatePostProcessor):
def __init__(self, processors):
# type: (List[TemplatePostProcessor]) -> None
self._processors = processors
def process(self, template, config, outdir, chalice_stage_name):
# type: (Dict[str, Any], Config, str, str) -> None
for processor in self._processors:
processor.process(template, config, outdir, chalice_stage_name)
class TemplateMerger(object):
def merge(self, file_template, chalice_template):
# type: (Dict[str, Any], Dict[str, Any]) -> Dict[str, Any]
raise NotImplementedError('merge')
class TemplateDeepMerger(TemplateMerger):
def merge(self, file_template, chalice_template):
# type: (Dict[str, Any], Dict[str, Any]) -> Dict[str, Any]
return self._merge(file_template, chalice_template)
def _merge(self, file_template, chalice_template):
# type: (Any, Any) -> Any
if isinstance(file_template, dict) and \
isinstance(chalice_template, dict):
return self._merge_dict(file_template, chalice_template)
return file_template
def _merge_dict(self, file_template, chalice_template):
# type: (Dict[str, Any], Dict[str, Any]) -> Dict[str, Any]
merged = chalice_template.copy()
for key, value in file_template.items():
merged[key] = self._merge(value, chalice_template.get(key))
return merged
class TemplateSerializer(object):
file_extension = ''
def load_template(self, file_contents, filename=''):
# type: (str, str) -> Dict[str, Any]
raise NotImplementedError("load_template")
def serialize_template(self, contents):
# type: (Dict[str, Any]) -> str
raise NotImplementedError("serialize_template")
class JSONTemplateSerializer(TemplateSerializer):
file_extension = 'json'
def serialize_template(self, contents):
# type: (Dict[str, Any]) -> str
return serialize_to_json(contents)
def load_template(self, file_contents, filename=''):
# type: (str, str) -> Dict[str, Any]
try:
return json.loads(file_contents)
except ValueError:
raise RuntimeError(
'Expected %s to be valid JSON template.' % filename)
class YAMLTemplateSerializer(TemplateSerializer):
file_extension = 'yaml'
@classmethod
def is_yaml_template(cls, template_name):
# type: (str) -> bool
file_extension = os.path.splitext(template_name)[1].lower()
return file_extension in [".yaml", ".yml"]
def serialize_template(self, contents):
# type: (Dict[str, Any]) -> str
return yaml.safe_dump(contents, allow_unicode=True)
def load_template(self, file_contents, filename=''):
# type: (str, str) -> Dict[str, Any]
yaml.SafeLoader.add_multi_constructor(
tag_prefix='!', multi_constructor=self._custom_sam_instrinsics)
try:
return yaml.load(
file_contents,
Loader=yaml.SafeLoader,
)
except ScannerError:
raise RuntimeError(
'Expected %s to be valid YAML template.' % filename)
def _custom_sam_instrinsics(self, loader, tag_prefix, node):
# type: (yaml.SafeLoader, str, Node) -> Dict[str, Any]
tag = node.tag[1:]
if tag not in ['Ref', 'Condition']:
tag = 'Fn::%s' % tag
value = self._get_value(loader, node)
return {tag: value}
def _get_value(self, loader, node):
# type: (yaml.SafeLoader, Node) -> Any
if node.tag[1:] == 'GetAtt' and isinstance(node.value,
six.string_types):
return node.value.split('.', 1)
elif isinstance(node, ScalarNode):
return loader.construct_scalar(node)
elif isinstance(node, SequenceNode):
return loader.construct_sequence(node)
elif isinstance(node, MappingNode):
return loader.construct_mapping(node)
raise ValueError("Unknown YAML node: %s" % node)