in chalice/package.py [0:0]
def _generate_websocketapi(self, resource, template):
# type: (models.WebsocketAPI, Dict[str, Any]) -> None
ws_definition = {
'name': resource.name,
'route_selection_expression': '$request.body.action',
'protocol_type': 'WEBSOCKET',
}
template['resource'].setdefault('aws_apigatewayv2_api', {})[
resource.resource_name] = ws_definition
websocket_api_id = "${aws_apigatewayv2_api.%s.id}" % \
resource.resource_name
websocket_handlers = [
'websocket_connect',
'websocket_message',
'websocket_disconnect',
]
for handler in websocket_handlers:
if handler in template['resource']['aws_lambda_function']:
self._add_websocket_lambda_integration(websocket_api_id,
handler, template)
self._add_websocket_lambda_invoke_permission(websocket_api_id,
handler, template)
route_resource_names = []
for route_key in resource.routes:
route_resource_name = self._add_websockets_route(websocket_api_id,
route_key,
template)
route_resource_names.append(route_resource_name)
template['resource'].setdefault(
'aws_apigatewayv2_deployment', {}
)['websocket_api_deployment'] = {
"api_id": websocket_api_id,
"depends_on": ["aws_apigatewayv2_route.%s" % name for name in
route_resource_names]
}
template['resource'].setdefault(
'aws_apigatewayv2_stage', {}
)['websocket_api_stage'] = {
"api_id": websocket_api_id,
"deployment_id": ("${aws_apigatewayv2_deployment"
".websocket_api_deployment.id}"),
"name": resource.api_gateway_stage
}
self._add_websocket_domain_name(websocket_api_id, resource, template)
self._inject_websocketapi_outputs(websocket_api_id, template)