in lib/kcl/kcl_manager.js [85:129]
KCLManager.prototype._onRecordProcessorAction = function(action) {
var actionType = action.action;
var context = this._context;
var checkpointer = context.checkpointer;
var recordProcessor = context.recordProcessor;
var recordProcessorFuncInput = cloneToInput(action);
var recordProcessorFunc;
switch (actionType) {
case 'initialize':
recordProcessorFunc = recordProcessor.initialize;
break;
case 'processRecords':
recordProcessorFuncInput.checkpointer = checkpointer;
recordProcessorFunc = recordProcessor.processRecords;
break;
case 'leaseLost':
if (this._version === KCLManager.VERSION1) {
recordProcessorFuncInput.reason = 'ZOMBIE';
recordProcessorFunc = recordProcessor.shutdown;
} else {
recordProcessorFunc = recordProcessor.leaseLost;
}
break;
case 'shardEnded':
recordProcessorFuncInput.checkpointer = checkpointer;
if (this._version === KCLManager.VERSION1) {
recordProcessorFuncInput.reason = 'TERMINATE';
recordProcessorFunc = recordProcessor.shutdown;
} else {
recordProcessorFunc = recordProcessor.shardEnded;
}
break;
default:
// Should not occur.
throw new Error(util.format('Invalid action for record processor: %j', action));
}
// Attach callback so user can mark that operation is complete, and KCL can proceed with new operation.
var callbackFunc = function() {
this._recordProcessorCallback(context, action);
}.bind(this);
recordProcessorFunc.apply(recordProcessor, [recordProcessorFuncInput, callbackFunc]);
};