KCLManager.prototype._onRecordProcessorAction = function()

in lib/kcl/kcl_manager.js [85:129]


KCLManager.prototype._onRecordProcessorAction = function(action) {
  var actionType = action.action;
  var context = this._context;
  var checkpointer = context.checkpointer;
  var recordProcessor = context.recordProcessor;
  var recordProcessorFuncInput = cloneToInput(action);
  var recordProcessorFunc;

  switch (actionType) {
    case 'initialize':
      recordProcessorFunc = recordProcessor.initialize;
      break;
    case 'processRecords':
      recordProcessorFuncInput.checkpointer = checkpointer;
      recordProcessorFunc = recordProcessor.processRecords;
      break;
    case 'leaseLost':
      if (this._version === KCLManager.VERSION1) {
        recordProcessorFuncInput.reason = 'ZOMBIE';
        recordProcessorFunc = recordProcessor.shutdown;
      } else {
        recordProcessorFunc = recordProcessor.leaseLost;
      }
      break;
    case 'shardEnded':
      recordProcessorFuncInput.checkpointer = checkpointer;
      if (this._version === KCLManager.VERSION1) {
        recordProcessorFuncInput.reason = 'TERMINATE';
        recordProcessorFunc = recordProcessor.shutdown;
      } else {
        recordProcessorFunc = recordProcessor.shardEnded;
      }
      break;
    default:
      // Should not occur.
      throw new Error(util.format('Invalid action for record processor: %j', action));
  }

  // Attach callback so user can mark that operation is complete, and KCL can proceed with new operation.
  var callbackFunc = function() {
    this._recordProcessorCallback(context, action);
  }.bind(this);

  recordProcessorFunc.apply(recordProcessor, [recordProcessorFuncInput, callbackFunc]);
};