in chalice/deploy/planner.py [0:0]
def _plan_kinesiseventsource(self, resource):
# type: (models.KinesisEventSource) -> Sequence[InstructionMsg]
stream_arn_varname = '%s_stream_arn' % resource.resource_name
uuid_varname = '%s_uuid' % resource.resource_name
function_arn = Variable(
'%s_lambda_arn' % resource.lambda_function.resource_name
)
instruction_for_stream_arn = self._arn_parse_instructions(function_arn)
instruction_for_stream_arn.append(
models.StoreValue(
name=stream_arn_varname,
value=StringFormat(
'arn:{partition}:kinesis:{region_name}:{account_id}:'
'stream/%s' % resource.stream,
['partition', 'region_name', 'account_id'],
),
)
)
if self._remote_state.resource_exists(resource):
deployed = self._remote_state.resource_deployed_values(resource)
uuid = deployed['event_uuid']
return instruction_for_stream_arn + [
models.APICall(
method_name='update_lambda_event_source',
params={'event_uuid': uuid,
'batch_size': resource.batch_size,
'maximum_batching_window_in_seconds':
resource.maximum_batching_window_in_seconds}
)
] + self._batch_record_resource(
'kinesis_event', resource.resource_name, {
'kinesis_arn': deployed['kinesis_arn'],
'event_uuid': uuid,
'stream': resource.stream,
'lambda_arn': deployed['lambda_arn'],
}
)
return instruction_for_stream_arn + [
(models.APICall(
method_name='create_lambda_event_source',
params={'event_source_arn': Variable(stream_arn_varname),
'batch_size': resource.batch_size,
'function_name': function_arn,
'starting_position': resource.starting_position,
'maximum_batching_window_in_seconds':
resource.maximum_batching_window_in_seconds},
output_var=uuid_varname,
), 'Subscribing %s to Kinesis stream %s\n'
% (resource.lambda_function.function_name, resource.stream)
)
] + self._batch_record_resource(
'kinesis_event', resource.resource_name, {
'kinesis_arn': Variable(stream_arn_varname),
'event_uuid': Variable(uuid_varname),
'stream': resource.stream,
'lambda_arn': Variable(function_arn.name),
}
)