loader/lib/Controller.js (162 lines of code) (raw):
/*
Copyright (c) 2014, 2015, Oracle and/or its affiliates. All rights
reserved.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
"use strict";
var udebug = unified_debug.getLogger("Controller.js"),
DbWriter = require("./DbWriter.js").DbWriter,
BadRecordLogger = require("./BadRecordLogger.js").BadRecordLogger,
DataSource = require("./DataSource.js"),
RandomDataSource = DataSource.RandomDataSource,
FileDataSource = DataSource.FileDataSource;
var theController; // File-scope singleton
function Controller(job, session, finalCallback) {
this.session = session;
this.options = job.controller;
this.destination = job.destination;
this.plugin = job.plugin;
this.finalCallback = finalCallback;
this.dataSource = null;
this.writer = null;
this.badRecordLogger = null;
this.stats = {
rowsProcessed : 0, // all rows processed by data source
rowsSkipped : 0, // rows procesed by data source but skipped
rowsComplete : 0, // all rows completed by loader (success or failure)
rowsError : 0, // rows failed by loader
tickNumber : 0
};
this.shutdown = 0;
this.ticker = null; // interval timer
this.fatalError = null;
/* If the data source is maxLead rows ahead of the loader, pause it.
When the difference shrinks to minLead, resume it.
*/
this.maxLead = 2000;
this.minLead = 1000;
/* Data Source */
if(this.options.randomData) {
this.dataSource = new RandomDataSource(job, this);
} else {
this.dataSource = new FileDataSource(job, this);
}
/* Data Writer */
this.writer = new DbWriter(job.dataLoader, session, this);
if(this.options.inOneTransaction) {
this.writer.beginAtomic();
}
/* Bad Record Logger */
this.badRecordLogger = new BadRecordLogger(this);
/* Set singleton */
theController = this;
// Adjustments
if(this.options.skipRows && this.options.maxRows) {
this.options.maxRows += this.options.skipRows;
}
// Sanity Checks
if(this.options.inOneTransaction && this.options.randomData &&
(! this.options.maxRows)) {
this.fatalError = new Error("This job would attempt to build a single " +
"transaction of infinite size.");
this.finalCallback(this.fatalError, null);
}
process.on('exit', function() { udebug.log(theController.stats); });
}
Controller.prototype.run = function() {
if(this.options.workerId || this.options.skipRows) {
this.dataSource.skip(true);
}
this.dataSource.start();
};
Controller.prototype.dsNewItem = function(record) {
var handlerReturnCode;
udebug.log("dsNewItem", record);
/* Count all rows processed */
this.stats.rowsProcessed++;
/* Set the default Domain Object Constructor */
record.class = this.destination.rowConstructor;
/* Pass the item to the plugin */
handlerReturnCode = this.plugin.onReadRecord(record);
/* Send the item to the loader */
if(handlerReturnCode === false) {
this.stats.rowsSkipped++;
} else {
this.writer.loadItem(record);
}
/* Stop if we have hit the limit */
if(this.stats.rowsProcessed === this.options.maxRows) {
this.dataSource.end();
}
/* Skip the next record if this is one worker of many */
if(this.options.nWorkers > 1) {
this.dataSource.skip(true);
}
/* If the data source is too far ahead of the loader, pause it */
if(this.stats.rowsProcessed - this.stats.rowsComplete > this.maxLead) {
this.dataSource.pause();
this.writer.dataSourceIsPaused();
}
};
Controller.prototype.dsDiscardedItem = function() {
this.stats.rowsProcessed++;
this.stats.rowsSkipped++;
/* Stop if we have hit the limit */
if(this.stats.rowsProcessed === this.options.maxRows) {
this.dataSource.end();
}
/* Turn off skipping */
if((this.stats.rowsSkipped >= this.options.skipRows)
&& (this.stats.rowsProcessed % this.options.nWorkers === this.options.workerId)) {
this.dataSource.skip(false);
}
};
/* SHUTDOWN SEQUENCE:
DataSource sends dsFinished() to controller.
Controller sends end() to dbWriter.
DbWriter sends final record to controller in loaderRecordComplete().
Controller calls writer.endAtomic() [even if not in atomic mode].
DbWriter calls loaderTransactionDidCommit() or loaderTransactionDidRollback().
Controller sends end() to BadRecordLogger.
BadRecordLogger sends loggerFinished() to controller.
Controller calls plugin.onFinished(controllerCallback).
Plugin calls its provided callback.
Controller shuts down.
*/
Controller.prototype.dsFinished = function(err) {
udebug.log("dsFinished");
if(err) {
this.fatalError = err;
}
this.shutdown = 1;
this.writer.end();
this.commitIfComplete();
};
Controller.prototype.commitIfComplete = function() {
var rowsLeft = this.stats.rowsProcessed -
(this.stats.rowsComplete + this.stats.rowsSkipped);
if(rowsLeft === 0) {
this.writer.endAtomic();
} else {
udebug.log("No shutdown yet - ", rowsLeft, "pending");
}
};
Controller.prototype.loaderTransactionDidCommit = function() {
this.badRecordLogger.end();
};
Controller.prototype.loaderTransactionDidRollback = function() {
this.fatalError = new Error("Transaction rolled back. No records loaded.");
this.badRecordLogger.end();
};
Controller.prototype.loggerFinished = function() {
this.plugin.onFinished(function() {
theController.finished();
});
};
Controller.prototype.loaderRecordComplete = function(record) {
this.stats.rowsComplete++;
if(record.error) {
this.stats.rowsError++;
this.badRecordLogger.logRecord(record);
this.plugin.onRecordError(record);
} else {
this.plugin.onRecordStored(record);
}
/* Resume if paused */
if(this.dataSource.isPaused() &&
(this.stats.rowsProcessed - this.stats.rowsComplete < this.minLead)) {
this.dataSource.resume();
}
/* Shut down if the last row was complete */
if(this.shutdown) {
this.commitIfComplete();
}
/* Set the interval timer for 50 msec. if not yet set */
else if(! (this.ticker)) {
this.ticker = setInterval(function() { theController.onTick(); }, 50);
}
};
Controller.prototype.loaderAborted = function(err) {
if(err) {
this.fatalError = err;
}
this.dataSource.end();
};
Controller.prototype.finished = function() {
if(this.ticker) {
clearInterval(this.ticker);
}
this.finalCallback(this.fatalError, this.stats);
};
Controller.prototype.onTick = function() {
var avg;
this.stats.tickNumber++;
/* Report status after 5 seconds */
if(this.stats.tickNumber === 100) {
avg = Math.floor(this.stats.rowsComplete / 5);
console.log("Loading", avg,"records per second.");
}
/* Loader has a tick handler */
this.writer.onTick();
/* Plugin has a tick handler */
this.plugin.onTick(this.stats);
};
exports.Controller = Controller;