in lib/aws/kclrb/kcl_process.rb [58:83]
def process_action(action)
action_name = action.fetch('action')
case action_name
when 'initialize'
dispatch_to_processor(:init_processor,
Aws::KCLrb::V2::InitializeInput.new(action.fetch('shardId'),
action.fetch('sequenceNumber')))
when 'processRecords'
dispatch_to_processor(:process_records,
Aws::KCLrb::V2::ProcessRecordsInput.new(action.fetch('records'),
action.fetch('millisBehindLatest'),
@checkpointer))
when 'leaseLost'
dispatch_to_processor(:lease_lost, Aws::KCLrb::V2::LeaseLostInput.new)
when 'shardEnded'
dispatch_to_processor(:shard_ended, Aws::KCLrb::V2::ShardEndedInput.new(@checkpointer))
when 'shutdownRequested'
dispatch_to_processor(:shutdown_requested, Aws::KCLrb::V2::ShutdownRequestedInput.new(@checkpointer))
else
raise MalformedAction.new("Received an action which couldn't be understood. Action was '#{action}'")
end
@io_proxy.write_action('status', {'responseFor' => action_name})
rescue KeyError => ke
raise MalformedAction.new("Action '#{action}': #{ke.message}")
end