loader/lib/DataSource.js (232 lines of code) (raw):
/*
Copyright (c) 2014, Oracle and/or its affiliates. All rights
reserved.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
"use strict";
var RandomRowGenerator = require("./RandomData.js").RandomRowGenerator,
LineScanner = require("./Scanner.js").LineScanner,
TextFieldScanner = require("./Scanner.js").TextFieldScanner,
util = require("util"),
fs = require("fs"),
udebug = unified_debug.getLogger("DataSource.js");
var theDataSource;
var theController;
/*
Data Source API
Data Source Methods (for controller)
------------------------------------
ALL METHODS ARE IMMEDIATE and return undefined unless otherwise noted.
start() : controller asks data source to begin sending records.
Returns an Error, or undefined on success.
pause() : controller asks data source to pause.
resume() : controller asks paused data source to resume sending.
bool isPaused() : data source reports whether it is currently paused.
end() : controller asks data source to clean up and shut down.
skip(bool) : if true, data source should read lines but discard them.
Data Source callbacks into controller
-------------------------------------
dsNewItem(obj) : Data Source supplies obj to controller for loading.
obj is an object with property names corresponding
to column names.
dsDiscardedItem() : Data Source informs controller that a row was read
and discarded (skipped).
dsFinished(err) : Data Source has finished.
If the controller has called end(), err is undefined.
Otherwise err holds an Error explaining why the
DataSource has stopped.
*/
function AbstractDataSource() {
this.started = 0; // "has been started"
this.running = 0; // "is not currently paused"
this.shutdown = 0; // "has been told to shutdown"
this.skipping = false; // "should skip the next record", i.e.
// call dsDiscardedItem() rather than dsNewItem()
}
AbstractDataSource.prototype.start = function() {
this.started = 1;
this.running = 1;
this.runIfReady();
};
AbstractDataSource.prototype.pause = function() {
this.running = 0;
};
AbstractDataSource.prototype.resume = function() {
this.running = 1;
this.runIfReady();
};
AbstractDataSource.prototype.isPaused = function() {
return (this.running || this.shutdown) ? false : true;
};
AbstractDataSource.prototype.skip = function(doSkip) {
this.skipping = doSkip;
};
AbstractDataSource.prototype.end = function() {
udebug.log("end");
this.shutdown = 1;
if(this.running) {
this.running = 0;
} else {
this.controller.dsFinished();
}
};
////////////////////////////// RandomDataSource ///////////
function RandomDataSource(job, controller) {
this.options = job.options;
this.controller = controller;
this.generator = new RandomRowGenerator(job.destination.getTableHandler());
}
util.inherits(RandomDataSource, AbstractDataSource);
/* Record Constructor */
RandomDataSource.prototype.Record = function(row) {
this.row = row;
this.error = null;
};
RandomDataSource.prototype.Record.prototype.logger = function(fd, callback) {
var message =
"/* " + this.error.message + " */\n" +
JSON.stringify(this.row) + "\n";
var buffer = Buffer.from(message);
fs.write(fd, buffer, 0, buffer.length, null, callback);
};
RandomDataSource.prototype.runIfReady = function() {
if( this.started &&
this.running) { this.run(); }
};
RandomDataSource.prototype.run = function() {
udebug.log("run");
var newRow, record;
while(this.running === 1) {
newRow = this.generator.newRow();
// Note there is no onScanLine plugin for random data
if(this.skipping) {
this.controller.dsDiscardedItem();
} else {
record = new this.Record(newRow);
this.controller.dsNewItem(record);
}
}
if(this.shutdown) {
this.controller.dsFinished();
}
};
////////////////////////////// BufferDescriptor ///////////
function BufferDescriptor(source) {
this.source = source;
this.lineStart = 0;
this.lineEnd = 0;
this.lineHasFields = false;
this.atEnd = false;
}
BufferDescriptor.prototype.println = function() {
udebug.log(this.source.substring(this.lineStart, this.lineEnd));
};
////////////////////////////// FileDataSource ///////////
function FileDataSource(job, controller) {
udebug.log("FileDataSource()");
this.options = job.dataSource;
this.columns = job.destination;
this.controller = controller;
this.fd = null;
this.bufferSize = 16384;
this.bufferDesc = null;
this.lineScanner = new LineScanner(this.options);
this.recopied = 0; // no. of characters left from previous read buffer
this.physLineNo = 0; // actual lines (not records) read from data file
theDataSource = this;
theController = controller;
if(this.options.isJSON) {
this.fieldScanner = new JSONFieldScanner(this.columns);
} else {
this.fieldScanner = new TextFieldScanner(this.options);
}
if(this.options.useControlFile) {
this.skipToBEGINDATA();
} else if(this.options.file === "-") {
this.fd = 0; // STDIN. (Make buffer size smaller?)
this.read();
} else {
fs.open(this.options.file, 'r', function(err, fd) {
theDataSource.onFileOpen(err, fd);
});
}
}
util.inherits(FileDataSource, AbstractDataSource);
// Record Constructor
FileDataSource.prototype.Record = function(bufferDescriptor, fields) {
this.row = fields;
this.error = null;
this.source = bufferDescriptor.source;
this.start = bufferDescriptor.lineStart;
this.end = bufferDescriptor.lineEnd;
};
FileDataSource.prototype.Record.prototype.logger = function(fd, callback) {
var message = "";
if(theDataSource.options.commentStart) {
message = theDataSource.options.commentStart +
this.error.message +
theDataSource.options.lineEndString;
}
message += this.source.substring(this.start, this.end);
var buffer = Buffer.from(message);
fs.write(fd, buffer, 0, buffer.length, null, callback);
};
FileDataSource.prototype.onFileOpen = function(err, fd) {
udebug.log("onFileOpen fd:", fd);
if(err) {
theController.dsFinished(err);
} else {
this.fd = fd;
this.read();
}
};
// TODO: Handle the case of a single record that is larger than the buffer
// OR document this limitation and let the user select the buffer size
FileDataSource.prototype.read = function() {
udebug.log("read");
var buffer, readLen;
buffer = Buffer.alloc(this.bufferSize);
/* Rewrite partial last record into new buffer */
if(this.bufferDesc && ! (this.bufferDesc.lineEnd < this.bufferDesc.lineStart)) {
this.recopied = buffer.write(this.bufferDesc.source.substring(this.bufferDesc.lineStart));
} else {
this.recopied = 0;
}
readLen = this.bufferSize - this.recopied;
fs.read(this.fd, buffer, this.recopied, readLen, null, function(err, sz, buf) {
theDataSource.onRead(err, sz, buf);
});
};
FileDataSource.prototype.onRead = function(err, size, buffer) {
udebug.log("onRead size:", size);
size += this.recopied;
if(err) {
theController.dsFinished(err);
} else if(size === 0) {
udebug.log("onRead: EOF");
this.running = 0;
this.end();
} else {
this.bufferDesc = new BufferDescriptor(buffer.toString('utf8', 0, size));
this.runIfReady();
}
};
FileDataSource.prototype.skipToBEGINDATA = function() {
var ctl, desc;
ctl = this.options.useControlFile;
desc = new BufferDescriptor(ctl.text, 0, ctl.text.length);
this.fd = ctl.openFd;
this.lineScanner.skipPhysicalLines(desc, ctl.inlineSkip);
this.physLineNo = ctl.inlineSkip;
udebug.log("skipToBEGINDATA skip to line:", this.physLineNo);
desc.lineStart = desc.lineEnd; // Advance to the newline after BEGINDATA
this.bufferDesc = desc;
this.options.useControlFile.buffer = null; // discard reference
this.runIfReady();
};
FileDataSource.prototype.runIfReady = function() {
if( this.started &&
this.running &&
this.bufferDesc) { this.run(); }
};
FileDataSource.prototype.run = function() {
udebug.log("run");
var desc, extraLines, fields, row, record;
desc = this.bufferDesc;
extraLines = 0;
while(this.running && ! desc.atEnd) {
extraLines = this.lineScanner.scan(desc); // Scan the line
this.physLineNo += (extraLines + 1);
if(this.options.columnsInHeader) {
udebug.log("Setting columns from header");
if(desc.lineHasFields) {
this.options.columnsInHeader = null; // reset from true to null
fields = this.fieldScanner.scan(desc); // Extract the data fields
this.columns.setColumnsFromArray(fields); // Creates a new tableHandler on next write
}
} else if (this.skipping) {
theController.dsDiscardedItem();
} else {
theController.plugin.onScanLine(this.physLineNo, desc.source,
desc.lineStart, desc.lineEnd);
if(desc.lineHasFields) {
fields = this.fieldScanner.scan(desc); // Extract the data fields
record = new this.Record(desc, row);
theController.dsNewItem(record);
}
}
desc.lineStart = desc.lineEnd + 1; // Advance to first char. of next line
}
if(this.shutdown) {
/* Controller has shut us down, e.g. because of DO N ROWS */
theController.dsFinished();
} else if(this.fd === null) {
/* We have been reading data from a string passed on the command line */
theController.dsFinished();
} else if(desc.atEnd) {
/* Read to the end of a read buffer; read again. */
this.read();
} else {
/* Reaching this point is probably a bug */
console.log("why are we here?", this);
}
};
exports.RandomDataSource = RandomDataSource;
exports.FileDataSource = FileDataSource;