process_action

in lib/aws/kclrb/kcl_process.rb [58:83]


      def process_action(action)
        action_name = action.fetch('action')
        case action_name
        when 'initialize'
          dispatch_to_processor(:init_processor,
                                Aws::KCLrb::V2::InitializeInput.new(action.fetch('shardId'),
                                                                    action.fetch('sequenceNumber')))
        when 'processRecords'
          dispatch_to_processor(:process_records,
                                Aws::KCLrb::V2::ProcessRecordsInput.new(action.fetch('records'),
                                                                        action.fetch('millisBehindLatest'),
                                                                        @checkpointer))
        when 'leaseLost'
          dispatch_to_processor(:lease_lost, Aws::KCLrb::V2::LeaseLostInput.new)
        when 'shardEnded'
          dispatch_to_processor(:shard_ended, Aws::KCLrb::V2::ShardEndedInput.new(@checkpointer))
        when 'shutdownRequested'
          dispatch_to_processor(:shutdown_requested, Aws::KCLrb::V2::ShutdownRequestedInput.new(@checkpointer))
        else
          raise MalformedAction.new("Received an action which couldn't be understood. Action was '#{action}'")
        end
        @io_proxy.write_action('status', {'responseFor' => action_name})
      rescue KeyError => ke
        raise MalformedAction.new("Action '#{action}': #{ke.message}")
      end