loader/lib/LoaderJob.js (255 lines of code) (raw):
/*
Copyright (c) 2014, Oracle and/or its affiliates. All rights
reserved.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
"use strict";
var assert = require("assert"),
fs = require("fs"),
jones = require("database-jones"),
udebug = require("unified_debug").getLogger("LoaderJob.js"),
machine = require("./control_file.js"),
Controller = require("./Controller.js").Controller;
/* Specification for a Loader Job */
// Define a single column of a destination table
function ColumnDefinition(columnName) {
this.name = columnName;
this.startPos = null; // For fixed-width input
this.endPos = null;
}
// Define a destination: Database, Table, columns, and mapped class
function LoaderJobDestination() {
this.database = null;
this.table = "";
this.columnDefinitions = [];
this.rowConstructor = null;
}
LoaderJobDestination.prototype.addColumnDefinition = function(name) {
assert(typeof name === 'string');
var defn = new ColumnDefinition(name);
this.columnDefinitions.push(defn);
return defn;
};
LoaderJobDestination.prototype.createTableMapping = function() {
if(this.table.length === 0) {
throw new Error("No table specified in loader job.");
}
var literalMapping, mapping;
literalMapping = {
table : this.table,
database : this.database,
mapAllColumns : (this.columnDefinitions.length === 0)
};
mapping = new jones.TableMapping(literalMapping);
/* A ``function() {}'' for constructing mapped objects */
this.rowConstructor = function() {};
this.columnDefinitions.forEach(function(column) {
mapping.mapField(column.name);
});
mapping.applyToClass(this.rowConstructor);
return this.rowConstructor;
};
LoaderJobDestination.prototype.setColumnsFromArray = function(columnArray) {
udebug.log("setColumnsFromArray length:", columnArray.length);
var i;
for(i = 0 ; i < columnArray.length ; i++) {
this.columnDefinitions.push(new ColumnDefinition(columnArray[i]));
}
this.createTableMapping(); // Causes the default TableHandler to be discarded
};
LoaderJobDestination.prototype.getTableHandler = function() {
// FIXME this uses undocumented path to access the dbTableHandler:
return this.rowConstructor.prototype.jones.tableHandler;
};
// LoaderJob
function LoaderJob() {
this.plugin = null;
this.destination = new LoaderJobDestination();
this.ctlFileDescriptor = null;
this.controller = {
randomData : false,
badfile : "",
maxRows : null,
speedMeasure : null,
speedFast : true,
workerId : 0,
nWorkers : 1,
skipRows : 0,
inOneTransaction : false
};
this.dataSource = {
file : "",
useControlFile : null,
isJSON : false,
commentStart : null,
fieldSep : "\t",
fieldSepOnWhitespace : false,
fieldQuoteStart : "\"",
fieldQuoteEnd : "\"",
fieldQuoteEsc : "\\",
fieldQuoteOptional : false,
lineStartString : "",
lineEndString : "\n",
columnsInHeader : false
};
this.dataLoader = {
replaceMode : false,
requireEmpty : false,
doTruncate : false
};
this.setInsertMode("APPEND");
}
LoaderJob.prototype.setPlugin = function(plugin) {
this.plugin = plugin;
};
LoaderJob.prototype.initializeFromFile = function(filename) {
// Get the SQL text. The control file may also contain data.
var ctlMaxReadLen, ctlReadBuffer, size, sqlText;
ctlMaxReadLen = 16 * 1024;
ctlReadBuffer = Buffer.alloc(ctlMaxReadLen);
this.ctlFileDescriptor = fs.openSync(filename, 'r');
size = fs.readSync(this.ctlFileDescriptor, ctlReadBuffer, 0, ctlMaxReadLen);
sqlText = ctlReadBuffer.toString('utf8', 0, size);
this.initializeFromSQL(sqlText);
if(this.dataSource.useControlFile) {
this.dataSource.useControlFile.text = sqlText;
} else {
fs.closeSync(this.ctlFileDescriptor);
}
};
LoaderJob.prototype.BeginDataAtControlFileLine = function(lineNo) {
this.dataSource.useControlFile =
{
openFd : this.ctlFileDescriptor,
text : null,
inlineSkip : lineNo
};
};
LoaderJob.prototype.initializeFromSQL = function(text) {
var tokens, tree, error;
if(text) {
error = null;
try {
tokens = machine.scan(text);
} catch(e) {
error = e;
}
this.plugin.onSqlScan(error, tokens);
error = null;
// try {
tree = machine.parse(tokens);
// } catch(e) {
// error = e;
// }
this.plugin.onSqlParse(error, tree);
}
error = null;
try {
machine.analyze(tree, this);
} catch(e) {
error = e;
}
this.plugin.onLoaderJob(error, this);
};
LoaderJob.prototype.run = function(session, finalCallback) {
var controller = new Controller(this, session, finalCallback);
controller.run();
};
LoaderJob.prototype.setWorkerId = function(id, nWorkers) {
assert(typeof id === 'number' &&
typeof nWorkers === 'number' &&
id > 0 &&
id <= nWorkers);
this.controller.workerId = id - 1;
this.controller.nWorkers = nWorkers;
};
LoaderJob.prototype.setDataFile = function(fileName) {
assert(typeof fileName === 'string');
this.dataSource.file = fileName;
};
LoaderJob.prototype.generateRandomData = function() {
this.controller.randomData = true;
};
LoaderJob.prototype.dataSourceIsJSON = function() {
this.dataSource.isJSON = true;
};
LoaderJob.prototype.dataSourceIsCSV = function() {
this.setFieldSeparator(",");
this.setColumnsInHeader();
};
/* [ INSERT | REPLACE | APPEND | TRUNCATE | IGNORE ]
This is the union of keywords supported by Oracle SQL*Loader and by MySQL.
APPEND allows the table to have existing data. This is the default behavior.
INSERT requires the table to be empty before loading.
TRUNCATE instructs the loader to delete all rows before loading data.
REPLACE has the meaning, as in MySQL, that existing rows will be updated
with values from the data file. (This is quite different from the semantics
of REPLACE in SQL*Loader). With REPLACE, all rows are written with the
semantics of save() ("update or insert") rather than persist().
IGNORE is present for compatibility with MySQL, but is ignored.
*/
LoaderJob.prototype.setInsertMode = function(mode) {
mode = mode.toUpperCase();
switch(mode) {
case "INSERT":
this.dataLoader.requireEmpty = true;
this.dataLoader.replaceMode = false;
this.dataLoader.doTruncate = false;
throw new Error("INSERT mode is not yet implemented");
break;
case "REPLACE":
this.dataLoader.requireEmpty = false;
this.dataLoader.replaceMode = true;
this.dataLoader.doTruncate = false;
break;
case "APPEND":
this.dataLoader.requireEmpty = false;
this.dataLoader.replaceMode = false;
this.dataLoader.doTruncate = false;
break;
case "TRUNCATE":
this.dataLoader.requireEmpty = false;
this.dataLoader.replaceMode = false;
this.dataLoader.doTruncate = true;
throw new Error("TRUNCATE mode is not yet implemented");
break;
case "IGNORE":
// log a warning that IGNORE is ignored?
break;
default:
throw new Error("Illegal insert mode:" + mode);
}
};
LoaderJob.prototype.setFieldQuoteOptional = function() {
this.dataSource.fieldQuoteOptional = true;
};
LoaderJob.prototype.setFieldQuoteStartAndEnd = function(start, end) {
assert(typeof start === 'string' &&
typeof end === 'string' &&
start.length === 1 &&
end.length === 1);
this.dataSource.fieldQuoteStart = start;
this.dataSource.fieldQuoteEnd = end;
};
LoaderJob.prototype.setFieldSeparator = function(sep) {
assert(typeof sep === 'string' &&
sep.length === 1);
this.dataSource.fieldSep = sep;
};
LoaderJob.prototype.setFieldSeparatorToWhitespace = function() {
this.dataSource.fieldSepOnWhitespace = true;
};
LoaderJob.prototype.setFieldQuoteEsc = function(escChar) {
assert(typeof escChar === 'string' &&
escChar.length === 1);
this.dataSource.fieldQuoteEsc = escChar;
};
LoaderJob.prototype.setTable = function(name) {
this.destination.table = name;
};
LoaderJob.prototype.setDatabase = function(db) {
this.destination.database = db;
};
LoaderJob.prototype.setBadFile = function(file) {
assert(typeof file === 'string');
this.controller.badfile = file;
};
LoaderJob.prototype.inOneTransaction = function() {
this.controller.inOneTransaction = true;
};
LoaderJob.prototype.setSkipRows = function(n) {
assert(typeof n === 'number' && n >= 0);
this.controller.skipRows = n;
};
LoaderJob.prototype.setMaxRows = function(n) {
assert(typeof n === 'number' && n >= 0);
this.controller.maxRows = n;
};
LoaderJob.prototype.setCommentStart = function(str) {
assert(typeof str === 'string' && str.length > 0);
this.dataSource.commentStart = str;
};
LoaderJob.prototype.setLineStart = function(str) {
assert(typeof str === 'string' && str.length > 0);
this.dataSource.lineStartString = str;
};
LoaderJob.prototype.setLineEnd = function(str) {
assert(typeof str === 'string' && str.length > 0);
this.dataSource.lineEndString = str;
};
LoaderJob.prototype.setColumnsInHeader = function() {
this.dataSource.columnsInHeader = true;
};
exports.LoaderJob = LoaderJob;