jones-ndb/impl/ndb/NdbOperation.js (808 lines of code) (raw):
/*
Copyright (c) 2013, 2016, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
as published by the Free Software Foundation.
This program is also distributed with certain software (including
but not limited to OpenSSL) that is licensed under separate terms,
as designated in a particular file or component or in included license
documentation. The authors of MySQL hereby grant you an additional
permission to link the program and your derivative works with the
separately licensed software that they have included with MySQL.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License, version 2.0, for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
"use strict";
/* This corresponds to OperationCodes */
var op_stats = {
"read" : 0,
"insert" : 0,
"update" : 0,
"write" : 0,
"delete" : 0,
"scan" : 0,
"scan_read" : 0,
"scan_count" : 0,
"scan_delete" : 0,
"projection_read" : 0
};
var index_stats = {};
var path = require("path"),
assert = require("assert"),
conf = require("./path_config"),
adapter = require(conf.binary).ndb,
jones = require("database-jones"),
doc = require(jones.spi_doc.DBOperation),
stats_module = require(jones.api.stats),
QueuedAsyncCall = require(jones.common.QueuedAsyncCall).QueuedAsyncCall,
prepareFilterSpec = require("./NdbScanFilter.js").prepareFilterSpec,
getIndexBounds = require(jones.common.IndexBounds).getIndexBounds,
markQuery = require(jones.common.IndexBounds).markQuery,
bufferForText = adapter.impl.bufferForText,
textFromBuffer = adapter.impl.textFromBuffer,
COMMIT = adapter.ndbapi.Commit,
NOCOMMIT = adapter.ndbapi.NoCommit,
ROLLBACK = adapter.ndbapi.Rollback,
constants = adapter.impl,
OpHelper = constants.OpHelper,
ScanHelper = constants.Scan.helper,
BoundHelper = constants.IndexBound.helper,
opcodes = doc.OperationCodes,
NdbProjection = require("./NdbProjection"),
udebug = unified_debug.getLogger("NdbOperation.js");
stats_module.register(op_stats, "spi","ndb","DBOperation","created");
stats_module.register(index_stats, "spi","ndb","key_access");
stats_module.register(adapter.impl.encoder_stats, "spi","ndb","encoder");
var storeNativeConstructorInMapping;
var DBResult = function() {
this.success = null;
this.error = null;
this.value = null;
this.insert_id = null;
};
// DBOperationError
var errorClassificationMap = {
"ConstraintViolation" : "23000",
"NoDataFound" : "02000",
"UnknownResultError" : "08000"
};
var sqlStateMessages = {
"22000" : "Data error",
"22001" : "String too long",
"22003" : "Numeric value out of range",
"22007" : "Invalid datetime",
"23000" : "Column cannot be null",
"HY000" : "Incorrect numeric value",
"0F001" : "BLOB or BINARY value is not a Buffer",
"WCTOR" :
"A Domain Object Constructor has overwritten persistent properties "+
"that were read from the database. The Domain Object Constructor "+
"is called with no arguments and its ``this'' parameter set to the "+
"newly read object."
};
function DBOperationError(message) {
this.message = message || "";
this.ndb_error = null;
}
DBOperationError.prototype = {
sqlstate : "NDB00",
cause : null
};
DBOperationError.prototype.fromNdbError = function(ndb_error) {
this.message = ndb_error.message + " [" + ndb_error.code + "]";
this.sqlstate = errorClassificationMap[ndb_error.classification];
this.ndb_error = ndb_error;
return this;
};
DBOperationError.prototype.fromSqlState = function(sqlstate) {
this.message = sqlStateMessages[sqlstate];
this.sqlstate = sqlstate;
/* Some exceptional behavior: */
if(sqlstate == "0F001") {
this.sqlstate = "22000";
}
return this;
};
DBOperationError.prototype.cascading = function(cause) {
udebug.log("Adding indirect error from", cause);
this.message = "Cascading Error";
this.sqlstate = cause.sqlstate;
this.cause = cause;
return this;
};
function keepIndexStatistics(dbTable, index) {
var i, idxStats, keyName;
if(index_stats[dbTable.name] === undefined) {
idxStats = { "PrimaryKey" : 0 };
for(i = 1 ; i < dbTable.indexes.length ; i++) {
idxStats[dbTable.indexes[i].name] = 0;
}
index_stats[dbTable.name] = idxStats;
}
keyName = (index.isPrimaryKey ? "PrimaryKey" : index.name);
index_stats[dbTable.name][keyName]++;
}
function storeResultRecord(dbTableHandler) {
if(! dbTableHandler.resultRecord) {
// getRecordForMapping(table, ndb, nColumns, columnsArray)
dbTableHandler.resultRecord =
adapter.impl.DBDictionary.getRecordForMapping(
dbTableHandler.dbTable,
dbTableHandler.dbTable.per_table_ndb,
dbTableHandler.getNumberOfColumns(),
dbTableHandler.getAllColumnMetadata()
);
}
return dbTableHandler.resultRecord;
}
var DBOperation = function(opcode, tx, indexHandler, tableHandler) {
assert(tx);
this.opcode = opcode;
this.userCallback = null;
this.transaction = tx;
this.keys = {};
this.values = {};
this.lockMode = "";
this.result = new DBResult();
this.indexHandler = indexHandler;
if(indexHandler) {
this.tableHandler = indexHandler.tableHandler;
this.index = indexHandler.dbIndex;
keepIndexStatistics(this.tableHandler.dbTable, this.index);
}
else {
this.tableHandler = tableHandler;
this.index = null;
}
storeResultRecord(this.tableHandler);
/* NDB Impl-specific properties */
this.encoderError = null;
this.query = null;
this.scanOp = null;
this.needAutoInc = false;
this.buffers = { 'row' : null, 'key' : null };
this.columnMask = [];
this.scan = {};
this.blobs = null;
this.connProperties = tx.dbSession.parentPool.properties;
op_stats[opcodes[opcode]]++;
};
function allocateKeyBuffer(op) {
assert(op.buffers.key === null);
op.buffers.key = Buffer.alloc(op.index.record.getBufferSize());
}
function releaseKeyBuffer(op) {
if(op.opcode !== 2) { /* all but insert use a key */
op.buffers.key = null;
}
}
/* If an error occurs while encoding,
encodeColumnsInBuffer() returns a DBOperationError
*/
function encodeColumnsInBuffer(fields, ncolumns, metadata,
record, buffer, definedColumnList) {
var i, column, value, encoderError, error;
error = null;
function addError(value) {
udebug.log("encoderWrite error:", encoderError, "for", value);
if(error) { // More than one column error, so use the generic code
error.sqlstate = "22000";
error.message += "; [" + column.name + "]";
} else {
error = new DBOperationError().fromSqlState(encoderError);
error.message += " [" + column.name + "]";
}
}
/* encodeColumnsInBuffer starts here */
for(i = 0 ; i < ncolumns ; i++) {
column = metadata[i];
value = fields[i];
if(value !== undefined) {
definedColumnList.push(column.columnNumber);
if(value === null) {
if(column.isNullable) { record.setNull(i, buffer); }
else { encoderError = "23000"; addError(); }
}
else {
encoderError = record.encoderWrite(i, buffer, value);
if(encoderError) { addError(value); }
}
}
}
return error;
}
function encodeKeyBuffer(op) {
var oneCol = op.indexHandler.singleColumn; // single-column index
if(oneCol && op.keys[0] !== null && op.keys[0] !== undefined) {
return op.index.record.encoderWrite(0, op.buffers.key, op.keys[0]);
}
return encodeColumnsInBuffer(op.keys,
op.indexHandler.getNumberOfColumns(),
op.indexHandler.getAllColumnMetadata(),
op.index.record,
op.buffers.key, []);
}
function defineBlobs(ncolumns, metadata, values) {
var i, blobs, col;
blobs = [];
for(i = 0 ; i < ncolumns ; i++) {
col = metadata[i];
if(col.isLob) {
blobs[i] = col.isBinary ? values[i] : bufferForText(col, values[i]) ;
}
}
return blobs;
}
function allocateRowBuffer(op) {
assert(op.buffers.row === null);
op.buffers.row = Buffer.alloc(op.tableHandler.resultRecord.getBufferSize());
}
function releaseRowBuffer(op) {
op.buffers.row = null;
}
function encodeRowBuffer(op) {
udebug.log("encodeRowBuffer");
var valuesArray = op.tableHandler.getColumns(op.values);
var ncolumns = op.tableHandler.getNumberOfColumns();
var columnMetadata = op.tableHandler.getAllColumnMetadata();
if(op.tableHandler.numberOfLobColumns) {
op.blobs = defineBlobs(ncolumns, columnMetadata, valuesArray);
}
return encodeColumnsInBuffer(valuesArray,
ncolumns,
columnMetadata,
op.tableHandler.resultRecord,
op.buffers.row,
op.columnMask);
}
function HelperSpec() {
this.clear();
}
HelperSpec.prototype.clear = function() {
this[0] = null; // row_buffer
this[1] = null; // key_buffer
this[2] = null; // row_record
this[3] = null; // key_record
this[4] = null; // lock_mode
this[5] = null; // column_mask
this[6] = null; // value_obj
this[7] = null; // opcode
this[8] = null; // is_value_obj
this[9] = null; // blobs
this[10] = null; // is_valid
};
var helperSpec = new HelperSpec();
function ScanHelperSpec() {
this.clear();
}
ScanHelperSpec.prototype.clear = function() {
this[ScanHelper.table_record] = null;
this[ScanHelper.index_record] = null;
this[ScanHelper.lock_mode] = null;
this[ScanHelper.bounds] = null;
this[ScanHelper.flags] = 0;
this[ScanHelper.batch_size] = null;
this[ScanHelper.parallel] = null;
this[ScanHelper.filter_code] = null;
};
var scanSpec = new ScanHelperSpec();
function BoundHelperSpec() {
this[BoundHelper.low_key] = null;
this[BoundHelper.low_key_count] = 0;
this[BoundHelper.low_inclusive] = true;
this[BoundHelper.high_key] = null;
this[BoundHelper.high_key_count] = 0;
this[BoundHelper.high_inclusive] = true;
this[BoundHelper.range_no] = 0;
}
/* Create part of of a bound spec
*/
BoundHelperSpec.prototype.buildPartialSpec = function(base, bound,
dbIndexHandler, buffer) {
var nparts, err, columns;
columns = dbIndexHandler.getAllColumnMetadata();
err = null;
/* count finite key parts.
IndexBounds has assumed all columns are nullable, so we may have to
transform a NULL bound to a -Infinity.
*/
for(nparts = 0 ; nparts < bound.key.length; nparts++) {
if((bound.key[nparts] == Infinity) ||
(bound.key[nparts] == -Infinity) ||
(bound.key[nparts] === null && ! columns[nparts].isNullable))
{
break;
}
}
if(nparts > 0) {
err = encodeColumnsInBuffer(bound.key, nparts, columns,
dbIndexHandler.dbIndex.record, buffer, []);
}
udebug.log("Encoded", nparts, "parts for", (base ? "high" : "low"), "bound");
this[base] = (nparts > 0 ? buffer : null);
this[base + 1] = nparts;
this[base + 2] = bound.inclusive;
return err;
};
BoundHelperSpec.prototype.setLow = function(bound, dbIndexHandler, buffer) {
return this.buildPartialSpec(BoundHelper.low_key, bound.low, dbIndexHandler, buffer);
};
BoundHelperSpec.prototype.setHigh = function(bound, dbIndexHandler, buffer) {
return this.buildPartialSpec(BoundHelper.high_key, bound.high, dbIndexHandler, buffer);
};
/* Takes an array of IndexBounds;
Returns an array of BoundHelpers which will be used to build NdbIndexBounds.
Builds a buffer of encoded parameters used in index bounds and
stores a reference to it in op.scan.
*/
DBOperation.prototype.buildBoundHelpers = function(indexBounds) {
var dbIndexHandler, bound, sz, n, helper, allHelpers, mainBuffer, offset, i;
dbIndexHandler = this.indexHandler;
sz = dbIndexHandler.dbIndex.record.getBufferSize();
n = indexBounds.length;
if(sz && n) {
allHelpers = [];
mainBuffer = Buffer.alloc(sz * n * 2);
offset = 0;
this.scan.bound_param_buffer = mainBuffer; // maintain a reference!
for(i = 0 ; i < n ; i++) {
bound = indexBounds[i];
helper = new BoundHelperSpec();
helper.setLow(bound, dbIndexHandler, mainBuffer.slice(offset, offset+sz));
offset += sz;
helper.setHigh(bound, dbIndexHandler, mainBuffer.slice(offset, offset+sz));
offset += sz;
helper[BoundHelper.range_no] = i;
allHelpers.push(helper);
}
}
this.scan.index_bound_helpers = allHelpers; // maintain a reference
return allHelpers;
};
DBOperation.prototype.buildOpHelper = function(helper) {
var code = this.opcode;
var isVOwrite = (this.values && adapter.impl.isValueObject(this.values));
var error = null;
/* All operations but insert use a key. */
if(code !== 2) {
allocateKeyBuffer(this);
encodeKeyBuffer(this);
helper[OpHelper.key_record] = this.index.record;
helper[OpHelper.key_buffer] = this.buffers.key;
}
/* If this is an update-after-read operation on a Value Object,
DBOperationHelper only needs the VO.
*/
if(isVOwrite) {
error = adapter.impl.prepareForUpdate(this.values);
if(error) {
this.encoderError = new DBOperationError().fromSqlState(error);
} else {
helper[OpHelper.value_obj] = this.values;
}
}
else {
/* All non-VO operations get a row record */
helper[OpHelper.row_record] = this.tableHandler.resultRecord;
/* All but delete get an allocated row buffer, and column mask */
if(code !== 16) {
allocateRowBuffer(this);
helper[OpHelper.row_buffer] = this.buffers.row;
helper[OpHelper.column_mask] = this.columnMask;
/* Read gets a lock mode, and possibly a blobs array.
writes get the data encoded into the row buffer. */
if(code === 1) {
helper[OpHelper.lock_mode] = constants.LockModes[this.lockMode];
if(this.tableHandler.numberOfLobColumns) {
this.blobs = [];
}
}
else {
this.encoderError = encodeRowBuffer(this);
}
}
}
helper[OpHelper.opcode] = code;
helper[OpHelper.is_value_obj] = isVOwrite;
helper[OpHelper.blobs] = this.blobs;
helper[OpHelper.is_valid] = this.encoderError ? false : true;
};
function prepareOperations(dbTransactionContext, dbOperationList, recycleWrapper) {
assert(dbTransactionContext);
var n, length, specs;
length = dbOperationList.length;
if(length == 1) {
specs = [ helperSpec ]; /* Reuse the global helperSpec */
helperSpec.clear();
dbOperationList[0].buildOpHelper(helperSpec);
}
else {
specs = new Array(length);
for(n = 0 ; n < dbOperationList.length ; n++) {
specs[n] = new HelperSpec();
dbOperationList[n].buildOpHelper(specs[n]);
}
}
return adapter.impl.DBOperationHelper(length, specs, dbTransactionContext, recycleWrapper);
}
/* Prepare a scan operation.
This produces the scan filter and index bounds, and then a ScanOperation,
which is returned back to NdbTransactionHandler for execution.
*/
DBOperation.prototype.prepareScan = function(dbTransactionContext) {
var indexBounds = null;
var boundsHelpers, dbIndex, skipFilterForTesting;
/* There is one global ScanHelperSpec */
scanSpec.clear();
scanSpec[ScanHelper.table_record] = this.query.dbTableHandler.resultRecord;
if(this.query.queryType == 2) { /* Index Scan */
dbIndex = this.query.dbIndexHandler.dbIndex;
scanSpec[ScanHelper.index_record] = dbIndex.record;
indexBounds = getIndexBounds(this.query, dbIndex, this.params);
udebug.log("index bounds:", indexBounds.length);
if(indexBounds.length) {
boundsHelpers = this.buildBoundHelpers(indexBounds);
scanSpec[ScanHelper.bounds] = [];
if(indexBounds.length > 1) {
scanSpec[ScanHelper.flags] |= constants.Scan.flags.SF_MultiRange;
}
boundsHelpers.forEach(function(helper) {
var b = adapter.impl.IndexBound.create(helper);
scanSpec[ScanHelper.bounds].push(b);
});
}
}
scanSpec[ScanHelper.lock_mode] = constants.LockModes[this.lockMode];
if(this.params.order !== undefined) {
scanSpec[ScanHelper.flags] |= constants.Scan.flags.SF_OrderBy;
if(this.params.order.toLocaleLowerCase() == 'desc') {
scanSpec[ScanHelper.flags] |= constants.Scan.flags.SF_Descending;
}
}
skipFilterForTesting = false;
if(this.query.ndbFilterSpec && ! skipFilterForTesting) {
scanSpec[ScanHelper.filter_code] =
this.query.ndbFilterSpec.getScanFilterCode(this.params);
this.scan.filter = scanSpec[ScanHelper.filter_code];
udebug.log("Using Scan Filter");
}
udebug.log("Flags", scanSpec[ScanHelper.flags]);
this.scanOp = adapter.impl.Scan.create(scanSpec, 33, dbTransactionContext);
return this.scanOp;
};
DBOperation.prototype.isQueryOperation = function() {
return (this.opcode == 97);
};
DBOperation.prototype.isScanOperation = function() {
return (this.opcode >= 32);
};
function buildResultRow_nonVO(op, dbt, buffer, blobs) {
udebug.log("buildResultRow");
var i, value;
var record = dbt.resultRecord;
var ncolumns = dbt.getNumberOfColumns();
var col = dbt.getAllColumnMetadata();
var resultRow = op.result.value || dbt.newResultObject();
for(i = 0 ; i < ncolumns ; i++) {
if(col[i].isLob) {
value = col[i].isBinary ? blobs[i] : textFromBuffer(col[i], blobs[i]);
} else if(record.isNull(i, buffer)) {
value = null;
} else {
value = record.encoderRead(i, buffer);
}
dbt.set(resultRow, i, value);
}
return resultRow;
}
function buildValueObject(op, tableHandler, buffer, blobs) {
udebug.log("buildValueObject");
var VOC = tableHandler.ValueObject; // NDB Value Object Constructor
var DOC = tableHandler.newObjectConstructor; // User's Domain Object Ctor
var nWritesPre, nWritesPost, value, i;
if(VOC) {
/* Turn the buffer into a Value Object */
value = new VOC(buffer, blobs);
/* Allow DBT to apply converters if it has them */
for(i = 0 ; i < tableHandler.getNumberOfColumns(); i++) {
if(tableHandler.columnHasConverter(i)) {
tableHandler.set(value, i, adapter.impl.getValueObjectFieldByNumber(value, i));
}
}
/* Finally the user's constructor is called on the new value: */
if(DOC) {
nWritesPre = adapter.impl.getValueObjectWriteCount(value);
DOC.call(value);
nWritesPost = adapter.impl.getValueObjectWriteCount(value);
if(nWritesPost > nWritesPre) {
op.result.error = new DBOperationError().fromSqlState("WCTOR");
op.result.success = false;
}
}
}
return value;
}
function getResultValue(op, tableHandler, buffer, blobs) {
// workaround: currently NdbRecordObject will not correctly hide
// the sparse field container from the user
var use_nro = (op.connProperties.use_mapped_ndb_record &&
tableHandler.is1to1 &&
op.result.value === null);
return use_nro ? buildValueObject(op, tableHandler, buffer, blobs) :
buildResultRow_nonVO(op, tableHandler, buffer, blobs);
}
function getScanResults(scanop, userCallback) {
var buffer,results,dbSession,postScanCallback,nSkip,maxRow,i,recordSize,gather;
dbSession = scanop.transaction.dbSession;
postScanCallback = {
fn : userCallback,
arg0: null,
arg1: null
};
i = 0;
nSkip = 0;
maxRow = 100000000000;
if(scanop.params) {
if(scanop.params.skip > 0) { nSkip = scanop.params.skip; }
if(scanop.params.limit >= 0) { maxRow = nSkip + scanop.params.limit; }
}
if(udebug.is_debug()) {
udebug.log("skip", nSkip, "+ limit", scanop.params.limit, "=", maxRow);
}
recordSize = scanop.tableHandler.resultRecord.getBufferSize();
function fetchResults(dbSession, ndb_scan_op, buffer) {
var apiCall = new QueuedAsyncCall(dbSession.execQueue, null);
var force_send = true;
apiCall.preCallback = gather;
apiCall.ndb_scan_op = ndb_scan_op;
apiCall.description = "fetchResults" + scanop.transaction.moniker + i;
apiCall.buffer = buffer;
apiCall.run = function runFetchResults() {
this.ndb_scan_op.fetchResults(this.buffer, force_send, this.callback);
};
apiCall.enqueue();
i++;
}
function pushNewResult() {
var blobs, result;
blobs = scanop.scanOp.readBlobResults();
udebug.log("pushNewResult",i,blobs);
result = getResultValue(scanop, scanop.tableHandler, buffer, blobs);
results.push(result);
}
function fetch() {
buffer = Buffer.alloc(recordSize);
fetchResults(dbSession, scanop.scanOp, buffer); // gather() is the callback
}
/* <0: ERROR, 0: RESULTS_READY, 1: SCAN_FINISHED, 2: CACHE_EMPTY */
/* gather runs as a preCallback */
gather = function(error, status) {
udebug.log("gather() status", status);
if(status < 0) { // error
if(udebug.is_debug()) { udebug.log("gather() error", error); }
postScanCallback.arg0 = error;
return postScanCallback;
}
/* Gather more results. */
while(status === 0 && results.length < maxRow) {
pushNewResult();
buffer = Buffer.alloc(recordSize);
status = scanop.scanOp.nextResult(buffer);
}
if(status == 2 && results.length < maxRow) { // Cache empty
fetch();
}
else { // end of scan.
/* Now remove the rows that should have been skipped
(fixme: do something more efficient) */
for(i = 0 ; i < nSkip ; i++) { results.shift(); }
udebug.log("gather() 1 End_Of_Scan. Final length:", results.length);
scanop.result.success = true;
scanop.result.value = results;
postScanCallback.arg1 = results;
return postScanCallback;
}
};
/* start here */
results = [];
fetch();
}
function getQueryResults(op, userCallback) {
var i = 0;
var sectors = [];
var ndbProjection = op.query;
while(ndbProjection) {
sectors[i++] = ndbProjection;
ndbProjection = ndbProjection.next;
}
op.scanOp.fetchAllResults(function(err, nresults) {
var wrapper, level, current, parentLevel, resultObject;
current = []; // current values for each sector
current[0] = null;
wrapper = {}; // the wrapper is reused in each call to getResult()
function setValueInRelatedTable(relatedField, resultValue) {
if(relatedField.toMany) {
if(current[parentLevel][relatedField.fieldName] === undefined) {
current[parentLevel][relatedField.fieldName] = [];
}
if(resultValue !== null) {
current[parentLevel][relatedField.fieldName].push(resultValue);
}
} else { // toOne
current[parentLevel][relatedField.fieldName] = resultValue;
}
}
function assemble() {
current[level] = resultObject;
if(level > 0) {
setValueInRelatedTable(sectors[level].relatedField, resultObject);
}
}
function assembleSpecial(tag) {
udebug.log_detail("assembleSpecial table", level, "tag", tag);
if(tag & 2) { /* This row came from a many-to-many join table but
is not itself part of the user's result object. */
current[level] = current[parentLevel];
}
if(tag & 1) { /* Row is null */
current[level] = null;
if(level > 0) {
setValueInRelatedTable(sectors[level].relatedField, null);
}
}
if(tag & 8) {
udebug.log_detail("Filtered - row is duplicate");
}
}
udebug.log("fetchAllResults returns", err, nresults);
if(err) {
op.result.success = false;
op.result.error = new DBOperationError().fromNdbError(err);
} else if (nresults == 0) {
op.result.success = false;
op.result.error = new DBOperationError().fromSqlState("02000");
} else {
for(i = 0 ; i < nresults ; i++) {
op.scanOp.getResult(i, wrapper);
level = wrapper.level;
if(level > 0) { parentLevel = sectors[level].parent.serial; }
if(udebug.is_detail) {
udebug.log("TABLE", level, sectors[level].tableHandler.dbTable.name,
"PARENT TABLE", parentLevel);
}
if(wrapper.tag) {
assembleSpecial(wrapper.tag);
} else {
resultObject = getResultValue(op, sectors[level].tableHandler,
wrapper.data, null);
assemble();
}
}
op.result.success = true;
op.result.value = current[0];
}
udebug.log("Join result:", current[0]);
userCallback(err, op.result.value);
});
}
function buildOperationResult(transactionHandler, op, op_ndb_error, execMode) {
udebug.log("buildOperationResult");
/* Summarize Operation Error */
if(op.encoderError) {
udebug.log("Operation has encoder error");
op.result.success = false;
op.result.error = op.encoderError;
} else if(op_ndb_error === null) {
op.result.success = true;
} else {
op.result.success = false;
if(op_ndb_error !== true) { // TRUE here means NdbOperation is null
op.result.error = new DBOperationError().fromNdbError(op_ndb_error);
}
}
/* Handle Transaction Error */
if(execMode !== ROLLBACK) {
if(op.result.success) {
if(transactionHandler.error) {
/* This operation has no error, but the transaction failed. */
udebug.log("Case txErr + opOK", transactionHandler.moniker);
op.result.success = false;
op.result.error = new DBOperationError().cascading(transactionHandler.error);
}
}
else {
/* This operation has an error. */
if(transactionHandler.error) {
udebug.log("Case txErr + OpErr", transactionHandler.moniker);
}
else {
if(op.opcode === opcodes.OP_READ || execMode === NOCOMMIT) {
udebug.log("Case txOK + OpErr [READ | NOCOMMIT]", transactionHandler.moniker);
}
else {
udebug.log("Case txOK + OpErr", transactionHandler.moniker);
transactionHandler.error = new DBOperationError().cascading(op.result.error);
}
}
}
if(op.result.success && op.opcode === opcodes.OP_READ) {
op.result.value = getResultValue(op, op.tableHandler, op.buffers.row, op.blobs);
}
}
if(udebug.is_detail()) { udebug.log("buildOperationResult finished:", op.result); }
}
function completeExecutedOps(dbTxHandler, execMode, operations) {
/* operations is an object:
{
"operationList" : operationList,
"pendingOperationSet" : pendingOpsSet
};
*/
if(udebug.is_debug()) { udebug.log("completeExecutedOps mode:", execMode,
"operations: ", operations.operationList.length); }
var n, op, op_err;
for(n = 0 ; n < operations.operationList.length ; n++) {
op = operations.operationList[n];
if(! op.isScanOperation()) {
op_err = operations.pendingOperationSet.getOperationError(n);
releaseKeyBuffer(op);
op.blobs = operations.pendingOperationSet.readBlobResults(n);
buildOperationResult(dbTxHandler, op, op_err, execMode);
releaseRowBuffer(op);
}
dbTxHandler.executedOperations.push(op);
if(typeof op.userCallback === 'function') {
op.userCallback(op.result.error, op);
}
}
udebug.log("completeExecutedOps done");
}
storeNativeConstructorInMapping = function(dbTableHandler) {
var i, ncolumns, record, fieldNames, proto;
var VOC, DOC; // Value Object Constructor, Domain Object Constructor
record = dbTableHandler.resultRecord || storeResultRecord(dbTableHandler);
if(dbTableHandler.ValueObject || ! dbTableHandler.is1to1) {
return;
}
ncolumns = dbTableHandler.getNumberOfColumns();
fieldNames = {};
for(i = 0 ; i < ncolumns ; i++) {
fieldNames[i] = dbTableHandler.getColumnMapping(i).fieldNames[0];
}
/* The user's constructor and prototype */
DOC = dbTableHandler.newObjectConstructor;
proto = (DOC && DOC.prototype) ? DOC.prototype : null;
/* Get the Value Object Constructor
getValueObjectConstructor(record, fieldNames, prototype)
Store it in the TableHandler
*/
VOC = adapter.impl.getValueObjectConstructor(record, fieldNames, proto);
dbTableHandler.ValueObject = VOC;
};
function verifyIndexHandler(dbIndexHandler) {
if(! dbIndexHandler.tableHandler) { throw ("Invalid dbIndexHandler"); }
}
function newReadOperation(tx, dbIndexHandler, keys, lockMode, isLoad) {
verifyIndexHandler(dbIndexHandler);
var op = new DBOperation(opcodes.OP_READ, tx, dbIndexHandler, null);
op.keys = Array.isArray(keys) ? keys : dbIndexHandler.getColumns(keys);
if (isLoad === true && typeof keys === 'object') {
op.result.value = keys; // Reuse keys as result for session.load()
} else if(! dbIndexHandler.tableHandler.ValueObject) {
storeNativeConstructorInMapping(dbIndexHandler.tableHandler);
}
assert(doc.LockModes.indexOf(lockMode) !== -1);
if(op.index.isPrimaryKey || lockMode === "EXCLUSIVE") {
op.lockMode = lockMode;
}
else {
op.lockMode = "SHARED";
}
return op;
}
function newProjectionOperation(sessionImpl, tx, indexHandler, keys, projection) {
var op = new DBOperation(opcodes.OP_PROJ_READ, tx, indexHandler, null);
/* Encode keys for operation */
op.keys = Array.isArray(keys) ? keys : indexHandler.getColumns(keys);
allocateKeyBuffer(op);
encodeKeyBuffer(op);
/* Create Value Object Constructors for all tables */
projection.sectors.forEach(function(sector) {
storeNativeConstructorInMapping(sector.tableHandler);
});
/* Create NdbProjections from sectors, then create a QueryOperation */
op.query = NdbProjection.initialize(projection.sectors, indexHandler);
if(op.query.error) { /* TODO: Report this error back to the user
rather than attempting to execute the operation */
op.result.error = new DBOperationError(op.query.error);
op.result.success = false;
} else {
op.scanOp = adapter.impl.QueryOperation.create(op.query, op.buffers.key, op.query.size, sessionImpl);
}
return op;
}
function newInsertOperation(tx, tableHandler, row) {
var op = new DBOperation(opcodes.OP_INSERT, tx, null, tableHandler);
// Test row for VO?
op.values = row;
if((op.tableHandler.autoIncFieldName) &&
(row[op.tableHandler.autoIncFieldName] === undefined)) {
// we need autoincrement services because the user did not supply the value for the autoincrement column
op.needAutoInc = true;
}
return op;
}
function newDeleteOperation(tx, dbIndexHandler, keys) {
verifyIndexHandler(dbIndexHandler);
var op = new DBOperation(opcodes.OP_DELETE, tx, dbIndexHandler, null);
op.keys = dbIndexHandler.getColumns(keys);
return op;
}
function newWriteOperation(tx, dbIndexHandler, row) {
verifyIndexHandler(dbIndexHandler);
var op = new DBOperation(opcodes.OP_WRITE, tx, dbIndexHandler, null);
// Test row for VO
op.keys = dbIndexHandler.getColumns(row);
op.values = row;
return op;
}
function newUpdateOperation(tx, dbIndexHandler, keys, row) {
verifyIndexHandler(dbIndexHandler);
var op = new DBOperation(opcodes.OP_UPDATE, tx, dbIndexHandler, null);
op.keys = dbIndexHandler.getColumns(keys);
op.values = row;
return op;
}
function newScanOperation(tx, QueryTree, properties) {
var queryHandler = QueryTree.jones_query_domain_type.queryHandler;
var op = new DBOperation(opcodes.OP_SCAN, tx,
queryHandler.dbIndexHandler,
queryHandler.dbTableHandler);
prepareFilterSpec(queryHandler); // sets query.ndbFilterSpec
op.query = queryHandler;
op.params = properties;
if(! queryHandler.dbTableHandler.ValueObject) {
storeNativeConstructorInMapping(queryHandler.dbTableHandler);
}
return op;
}
function setLockMode(ndbSession, lockMode) {
if(doc.LockModes.indexOf(lockMode) !== -1) {
return new DBOperationError("Invalid Lock Mode");
}
ndbSession.lockMode = lockMode;
return null;
}
exports.DBOperation = DBOperation;
exports.DBOperationError = DBOperationError;
exports.newReadOperation = newReadOperation;
exports.newInsertOperation = newInsertOperation;
exports.newDeleteOperation = newDeleteOperation;
exports.newUpdateOperation = newUpdateOperation;
exports.newWriteOperation = newWriteOperation;
exports.newScanOperation = newScanOperation;
exports.newProjectionOperation = newProjectionOperation;
exports.completeExecutedOps = completeExecutedOps;
exports.getScanResults = getScanResults;
exports.prepareOperations = prepareOperations;
exports.getQueryResults = getQueryResults;
exports.setLockMode = setLockMode;