loader/lib/control_file.js (235 lines of code) (raw):
/*
Copyright (c) 2012, 2014, Oracle and/or its affiliates. All rights
reserved.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
"use strict";
var scanner = require("./Scanner.js"),
Parser = require("./Parser.js").Parser,
LoaderJob = require("./LoaderJob.js").LoaderJob,
util = require("util"),
assert = require("assert"),
P = new Parser();
/* Declare NonTerminals and their semantic visitors */
var LoadDataStatement = P.Nonterminal("visitLoadDataStatement"),
StringOrName = P.Nonterminal("visitStringOrName"),
DataSource = P.Nonterminal("visitDataSource"),
RandomData = P.Nonterminal("visitRandomData"),
SpecificData = P.Nonterminal("visitSpecificData"),
FileData = P.Nonterminal("visitFileData"),
Charset = P.Nonterminal("visitCharset"),
CommonDataFormat = P.Nonterminal("visitCommonDataFormat"),
DataJSON = P.Nonterminal("visitDataJSON"),
DataCSV = P.Nonterminal("visitDataCSV"),
LogSpec = P.Nonterminal("visitLogSpec"),
InsertMode = P.Nonterminal("visitInsertMode"),
Destination = P.Nonterminal("visitDestination"),
SQLObject = P.Nonterminal("visitSQLObject"),
FieldSpec = P.Nonterminal("visitFieldSpec"),
FieldOption = P.Nonterminal("visitFieldOption"),
FieldsSeparated = P.Nonterminal("visitFieldsSeparated"),
FieldSepOpt = P.Nonterminal("visitFieldSepOpt"),
FieldQuoteOpt = P.Nonterminal("visitFieldQuoteOpt"),
FieldEscapeOpt = P.Nonterminal("visitFieldEscapeOpt"),
Lines = P.Nonterminal("visitLines"),
LineSpec = P.Nonterminal("visitLineSpec"),
LineStartSpec = P.Nonterminal("visitLineStartSpec"),
LineEndSpec = P.Nonterminal("visitLineEndSpec"),
Options = P.Nonterminal("visitOptions"),
OptComments = P.Nonterminal("visitOptComments"),
OptAtomic = P.Nonterminal("visitOptAtomic"),
OptIgnore = P.Nonterminal("visitOptIgnore"),
OptDolines = P.Nonterminal("visitOptDolines"),
OptWorker = P.Nonterminal("visitOptWorker"),
OptSpeed = P.Nonterminal("visitOptSpeed"),
OptSpeedMeasure = P.Nonterminal("visitOptSpeedMeasure"),
ColumnListSpec = P.Nonterminal("visitColumnListSpec"),
ColumnsInHeader = P.Nonterminal("visitColumnsInHeader"),
ColumnList = P.Nonterminal("visitColumnList"),
ColumnDefn = P.Nonterminal("visitColumnDefn"),
ColumnPosition = P.Nonterminal("visitColumnPosition"),
IgnoredOptions = P.Nonterminal("visitIgnoredOptions"),
BeginData = P.Nonterminal("visitBeginData")
;
/* Productions */
P.defineProductions(
LoadDataStatement , P.Series( "LOAD",
P.Option(CommonDataFormat),
DataSource,
P.Option(LogSpec),
P.Option(InsertMode),
Destination,
P.Option(Charset),
P.Option(FieldSpec),
P.Option(LineSpec),
P.Several(Options),
P.Option(ColumnListSpec),
P.Option(";"),
P.Option(BeginData)
),
StringOrName , P.Alts("{string}", "{name}"),
/* Common Data Format */
CommonDataFormat , P.Alts(DataJSON, DataCSV),
DataJSON , P.Series("JSON"),
DataCSV , P.Series("CSV"),
/* Data Source */
DataSource , P.Alts(RandomData, SpecificData),
RandomData , P.Series("RANDOM", "DATA"),
SpecificData , P.Series("DATA", P.Option(IgnoredOptions),
P.Option(FileData)),
IgnoredOptions , P.Alts("LOCAL", "CONCURRENT", "LOW_PRIORITY"),
FileData , P.Series("INFILE", "{string}"),
/* Log Spec */
LogSpec , P.Series("BADFILE", "{string}"),
/* Insert Mode */
InsertMode , P.Alts("INSERT", "REPLACE", "APPEND", "TRUNCATE", "IGNORE"),
/* Destination Table */
Destination , P.Series("INTO", "TABLE", SQLObject),
SQLObject , P.Series(StringOrName, P.Option(".", StringOrName)),
/* Data Encoding */
Charset , P.Series("CHARACTER", "SET", StringOrName),
/* Field options. */
FieldSpec , P.Series(P.Alts("FIELDS","COLUMNS"), FieldOption,
P.Several(FieldOption)),
FieldOption , P.Alts(FieldsSeparated, FieldQuoteOpt, FieldEscapeOpt),
FieldsSeparated , P.Series(P.Alts("TERMINATED","SEPARATED"), "BY", FieldSepOpt),
FieldSepOpt , P.Alts("WHITESPACE","{string}"),
FieldQuoteOpt , P.Series(P.Option("OPTIONALLY"), "ENCLOSED", "BY",
"{string}", P.Option("AND", "{string}")),
FieldEscapeOpt , P.Series("ESCAPED", "BY", "{string}"),
/* Line Options */
LineSpec , P.Series("LINES", P.Option(LineStartSpec),
P.Option(LineEndSpec)),
LineStartSpec , P.Series("STARTING", "BY", "{string}"),
LineEndSpec , P.Series("TERMINATED", "BY", "{string}"),
/* Misc. Options */
Options , P.Alts(OptComments, OptAtomic, OptIgnore, OptDolines,
OptWorker, OptSpeed),
OptComments , P.Series("COMMENTS", "STARTING", "BY", "{string}"),
OptAtomic , P.Series("IN", "ONE", "TRANSACTION"),
Lines , P.Alts("LINE", "LINES", "ROW", "ROWS"),
OptIgnore , P.Series(P.Alts("IGNORE","SKIP"), "{number}", Lines),
OptDolines , P.Series("DO", "{number}", Lines),
OptWorker , P.Series("WORKER", "{number}", "OF", "{number}"),
OptSpeed , P.Series("SPEED", P.Alts("FAST", "SLOW", OptSpeedMeasure)),
OptSpeedMeasure , P.Series("{number}",
P.Alts("KB", "MB", "GB", "ROWS"), "PER",
P.Alts("HOUR", "MINUTE", "SECOND", "SEC")),
/* Column Description */
ColumnListSpec , P.Alts(ColumnList, ColumnsInHeader),
ColumnsInHeader , P.Series("COLUMNS","FROM","HEADER"),
ColumnList , P.Series("(" , ColumnDefn, P.Several("," , ColumnDefn),
")" ),
ColumnDefn , P.Series(StringOrName, P.Option(ColumnPosition)),
ColumnPosition , P.Series("POSITION","(","{number}",":","{number}",")"),
BeginData , P.Series("BEGINDATA")
);
/* Visit the parse tree and generate a Loader Job Spec
*/
function SqlVisitor() {
}
// The generic visitor simply visits all children of a node.
// You can declare the methods that override that behavior.
// WORKER 1 OF 3
SqlVisitor.prototype.visitOptWorker = function(node, job) {
job.setWorkerId(node.getNumber(0), node.getNumber(1));
};
// LOAD RANDOM DATA
SqlVisitor.prototype.visitRandomData = function(node, job) {
job.generateRandomData();
};
// INFILE {string}
SqlVisitor.prototype.visitFileData = function(node, job) {
job.setDataFile(node.getString(0));
};
// LOAD DATA ... BEGINDATA
SqlVisitor.prototype.visitBeginData = function(node, job) {
// Use the control file as the data file; skip all lines up to BEGINDATA
job.BeginDataAtControlFileLine(node.nonTerminal.parser.final_line);
};
// JSON
SqlVisitor.prototype.visitDataJSON = function(node, job) {
job.dataSourceIsJSON();
};
SqlVisitor.prototype.visitDataCSV = function(node, job) {
job.dataSourceIsCSV();
};
SqlVisitor.prototype.visitOptSpeed = function(node, job) {
var speed;
speed = node.getName(1);
if(speed === null) {
job.controller.speedFast = false;
job.controller.speedMeasure = [];
node.visitChildNodes(this, job.controller.speedMeasure);
} else {
job.controller.speedFast = (speed.toUpperCase() === 'FAST');
}
};
// INTO TABLE SqlObject
SqlVisitor.prototype.visitDestination = function(node, job) {
var collector = [];
node.visitChildNodes(this, collector);
switch(collector.length) {
case 3: // INTO TABLE a
job.destination.table = collector[2];
break;
case 5: // INTO TABLE a . b
job.destination.database = collector[2];
job.destination.table = collector[4];
break;
}
};
// BADFILE {string}
SqlVisitor.prototype.visitLogSpec = function(node, job) {
job.setBadFile(node.getString(0));
};
// IN ONE TRANSACTION
SqlVisitor.prototype.visitOptAtomic = function(node, job) {
job.inOneTransaction();
};
// IGNORE n LINES
SqlVisitor.prototype.visitOptIgnore = function(node, job) {
job.setSkipRows(node.getNumber(0));
};
// DO n LINES
SqlVisitor.prototype.visitOptDolines = function(node, job) {
job.setMaxRows(node.getNumber(0));
};
// COMMENTS STARTING BY {string}
SqlVisitor.prototype.visitOptComments = function(node, job) {
job.setCommentStart(node.getString(0));
};
// LINES STARTING BY {string}
SqlVisitor.prototype.visitLineStartSpec = function(node, job) {
job.setLineStart(node.getString(0));
};
// LINES TERMINATED BY {string}
SqlVisitor.prototype.visitLineEndSpec = function(node, job) {
job.setLineEnd(node.getString(0));
};
/* Field Options */
// [OPTIONALLY] ENCLOSED BY string [ AND string ]
SqlVisitor.prototype.visitFieldQuoteOpt = function(node, job) {
var collector = [];
node.visitChildNodes(this, collector);
if(collector[0].toUpperCase() === "OPTIONALLY") {
collector.shift();
job.setFieldQuoteOptional();
}
if(collector[4]) {
job.setFieldQuoteStartAndEnd(collector[2], collector[4]);
} else {
job.setFieldQuoteStartAndEnd(collector[2], collector[2]);
}
};
// ESCAPED BY {string}
SqlVisitor.prototype.visitFieldEscapeOpt = function(node, job) {
job.setFieldQuoteEsc(node.getString(0));
};
// TERMINATED BY whitespace | {string}
SqlVisitor.prototype.visitFieldSepOpt = function(node, job) {
var fieldSep = node.getString(0);
if(fieldSep) {
job.setFieldSeparator(fieldSep);
} else {
// getString(0) is null, so the token holds the name "whitespace"
job.setFieldSeparatorToWhitespace();
}
};
/* Column Definitions: */
// COLUMNS FROM HEADER
SqlVisitor.prototype.visitColumnsInHeader = function(node, job) {
job.setColumnsInHeader();
};
// P.Series(StringOrName, P.Option(ColumnPosition))
/* Visit child nodes twice: first to fetch the column name on a collector,
and then to pass the ColumnDefinition down to a ColumnPosition node.
*/
SqlVisitor.prototype.visitColumnDefn = function(node, job) {
var name, defn, collector;
collector = [];
node.visitChildNodes(this, collector);
name = collector[0];
defn = job.destination.addColumnDefinition(name);
node.visitChildNodes(this, defn);
};
SqlVisitor.prototype.visitColumnPosition = function(node, defn) {
defn.startPos = node.getNumber(0);
defn.endPos = node.getNumber(1);
};
SqlVisitor.prototype.visitInsertMode = function(node, job) {
job.setInsertMode(node.getToken(0));
};
////////////////// EXPORTED FUNCTIONS
exports.scan = function scanSourceFile(str) {
P.setText(str);
return scanner.tokenize(str);
};
exports.parse = function ParseLoaderString(tokens) {
var tree = {};
if(tokens.length) {
P.begin(tokens);
tree = P.evaluate(LoadDataStatement);
P.done();
}
return tree;
};
exports.analyze = function(tree, loaderJob) {
var sqlVisitor = new SqlVisitor();
if(tree) {
tree.visit(sqlVisitor, loaderJob);
}
};