jones-ndb/impl/ndb/NdbSession.js (189 lines of code) (raw):
/*
Copyright (c) 2013, 2016, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
as published by the Free Software Foundation.
This program is also distributed with certain software (including
but not limited to OpenSSL) that is licensed under separate terms,
as designated in a particular file or component or in included license
documentation. The authors of MySQL hereby grant you an additional
permission to link the program and your derivative works with the
separately licensed software that they have included with MySQL.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License, version 2.0, for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
"use strict";
var stats = {
"created" : 0,
"seizeTransactionContext" : {
"immediate" : 0 , "queued" : 0
},
"oneTableProjections" : 0
};
var conf = require("./path_config"),
adapter = require(conf.binary),
ndboperation = require("./NdbOperation.js"),
dbtxhandler = require("./NdbTransactionHandler.js"),
ndbconnpool = require("./NdbConnectionPool.js"),
util = require("util"),
assert = require("assert"),
jones = require("database-jones"),
unified_debug = require("unified_debug"),
udebug = unified_debug.getLogger("NdbSession.js"),
QueuedAsyncCall = require(jones.common.QueuedAsyncCall).QueuedAsyncCall,
NdbSession;
require(jones.api.stats).register(stats, "spi","ndb","DBSession");
/**
A session has a single transaction visible to the user at any time:
NdbSession.tx, which is created in NdbSession.getTransactionHandler()
and persists until the user performs an execute commit or rollback,
at which point NdbSession.tx is "retired" and reset to null. The previous
TransactionHandler is still alive at this point, but it no longer
represents the session's current transaction.
QUEUES
------
1. An Ndb object is "single-threaded". All execute calls on the session's
SessionImpl are serialized in NdbSession.execQueue. This is seen in
run() in NdBTransactionHandler.
2. seizeTransactionContext() calls must wait on NdbSession.seizeTxQueue
for some transaction context to be released, if more than
ndb_session_concurrency contexts are open.
*/
/* DBSession Constructor. Undocumented - private to NdbConnectionPool.
*/
NdbSession = function(pool) {
this.serial = stats.created++;
this.parentPool = pool;
this.impl = null;
this.tx = null;
this.execQueue = [];
this.seizeTxQueue = null;
this.maxTxContexts = pool.properties.ndb_session_concurrency;
this.openTxContexts = 0; // currently opened
this.isOpenNdbSession = false;
this.lockMode = "SHARED";
};
/* fetch SessionImpl. Undocumented - private to NdbConnectionPool.
ASYNC.
*/
NdbSession.prototype.fetchImpl = function(callback) {
var self = this;
var pool = this.parentPool;
adapter.ndb.impl.DBSession.create(pool.impl,
pool.asyncNdbContext,
pool.properties.database,
pool.properties.ndb_session_concurrency,
function(err, impl) {
if(err) {
callback(err, null);
} else {
self.impl = impl;
callback(null, self);
}
});
};
/* Reset the session's current transaction.
NdbTransactionHandler calls this immediately at execute(COMMIT | ROLLBACK).
The closed NdbTransactionHandler is still alive and running,
but the session can now open a new one.
Undocumented - private to NdbTransactionHandler. IMMEDIATE.
*/
NdbSession.prototype.retireTransactionHandler = function() {
this.tx = null;
};
/* seizeTransactionContext(). Undocumented - private to NdbTransactionHandler.
Takes callback; may be immediate or queued.
*/
NdbSession.prototype.seizeTransactionContext = function(callback) {
var txContext;
if(this.openTxContexts < this.maxTxContexts) {
this.openTxContexts++;
stats.seizeTransactionContext.immediate++;
udebug.log_detail("seizeTransactionContext: immediate");
txContext = this.impl.seizeTransaction();
assert(txContext);
callback(txContext);
} else {
if(this.seizeTxQueue === null) {
this.seizeTxQueue = [];
}
this.seizeTxQueue.push(callback);
stats.seizeTransactionContext.queued++;
udebug.log("seizeTransactionContext: queued; queue length:", this.seizeTxQueue.length);
}
};
/* releaseTransactionContext(). Undocumented - private to NdbTransactionHandler.
IMMEDIATE.
*/
NdbSession.prototype.releaseTransactionContext = function(txContext) {
var nextTxCallback, didRelease;
didRelease = this.impl.releaseTransaction(txContext);
this.openTxContexts--;
assert(didRelease); // false would mean that NdbTransaction was not closed.
assert(this.openTxContexts >= 0);
if(this.seizeTxQueue && this.seizeTxQueue.length) {
nextTxCallback = this.seizeTxQueue.shift();
txContext = this.impl.seizeTransaction();
this.openTxContexts++;
nextTxCallback(txContext);
}
};
/* getConnectionPool()
IMMEDIATE
RETURNS the DBConnectionPool from which this DBSession was created.
*/
NdbSession.prototype.getConnectionPool = function() {
return this.parentPool;
};
/* close()
ASYNC. Optional callback.
*/
NdbSession.prototype.close = function(callback) {
udebug.log("Close.");
ndbconnpool.closeNdbSession(this, callback);
};
/* buildReadOperation(DBIndexHandler dbIndexHandler,
Object keys,
DBTransactionHandler transaction,
Bool loadResultIntoKeys,
function(error, DBOperation) userCallback)
IMMEDIATE
Define an operation which when executed will fetch a row.
RETURNS a DBOperation
*/
NdbSession.prototype.buildReadOperation = function(dbIndexHandler, keys,
tx, isLoad, callback) {
if(udebug.is_debug()) {
udebug.log("Read",
dbIndexHandler.tableHandler.dbTable.name,
"using", dbIndexHandler.dbIndex.name);
}
var op = ndboperation.newReadOperation(tx, dbIndexHandler, keys,
this.lockMode, isLoad);
op.userCallback = callback;
return op;
};
/* buildInsertOperation(DBTableHandler tableHandler,
Object row,
DBTransactionHandler transaction,
function(error, DBOperation) userCallback)
IMMEDIATE
Define an operation which when executed will insert a row.
RETURNS a DBOperation
*/
NdbSession.prototype.buildInsertOperation = function(tableHandler, row,
tx, callback) {
assert.equal(typeof row, "object");
if(udebug.is_debug()) {
udebug.log("Insert into", tableHandler.dbTable.name);
}
var op = ndboperation.newInsertOperation(tx, tableHandler, row);
op.userCallback = callback;
return op;
};
/* buildWriteOperation(DBIndexHandler dbIndexHandler,
Object row,
DBTransactionHandler transaction,
function(error, DBOperation) userCallback)
IMMEDIATE
Define an operation which when executed will update or insert
RETURNS a DBOperation
*/
NdbSession.prototype.buildWriteOperation = function(dbIndexHandler, row,
tx, callback) {
if(udebug.is_debug()) {
udebug.log("Write to",
dbIndexHandler.tableHandler.dbTable.name,
"using", dbIndexHandler.dbIndex.name);
}
var op = ndboperation.newWriteOperation(tx, dbIndexHandler, row);
op.userCallback = callback;
return op;
};
/* buildUpdateOperation(DBIndexHandler dbIndexHandler,
Object keys,
Object values,
DBTransactionHandler transaction,
function(error, DBOperation) userCallback)
IMMEDIATE
Define an operation which when executed will access a row using the keys
object and update the values provided in the values object.
RETURNS a DBOperation
*/
NdbSession.prototype.buildUpdateOperation = function(dbIndexHandler,
keys, row, tx, userData) {
if(udebug.is_debug()) {
udebug.log("Update",
dbIndexHandler.tableHandler.dbTable.name,
"using", dbIndexHandler.dbIndex.name);
}
var op = ndboperation.newUpdateOperation(tx, dbIndexHandler, keys, row);
op.userCallback = userData;
return op;
};
/* buildDeleteOperation(DBIndexHandler dbIndexHandler,
Object keys,
DBTransactionHandler transaction,
function(error, DBOperation) userCallback)
IMMEDIATE
Define an operation which when executed will delete a row
RETURNS a DBOperation
*/
NdbSession.prototype.buildDeleteOperation = function(dbIndexHandler, keys,
tx, callback) {
if(udebug.is_debug()) {
udebug.log("Delete from",
dbIndexHandler.tableHandler.dbTable.name,
"using", dbIndexHandler.dbIndex.name);
}
var op = ndboperation.newDeleteOperation(tx, dbIndexHandler, keys);
op.userCallback = callback;
return op;
};
/* buildScanOperation(QueryHandler queryHandler,
Object properties,
DBTransactionHandler transaction,
function(error, result) userCallback)
IMMEDIATE
*/
NdbSession.prototype.buildScanOperation = function(queryHandler, properties,
tx, callback) {
udebug.log("buildScanOperation");
var op = ndboperation.newScanOperation(tx, queryHandler, properties);
op.userCallback = callback;
return op;
};
/* buildReadProjectionOperation
IMMEDIATE
*/
NdbSession.prototype.buildReadProjectionOperation = function(dbIndexHandler,
keys, projection, tx, callback) {
/* If the "join" involves only one table, it is more efficient to run
it as a ReadOperation */
if(projection.sectors.length == 1) {
stats.oneTableProjections++;
return this.buildReadOperation(dbIndexHandler, keys, tx, false, callback);
}
if(udebug.is_debug()) {
udebug.log("Projection Read from",
dbIndexHandler.tableHandler.dbTable.name,
"using", dbIndexHandler.dbIndex.name);
}
var op = ndboperation.newProjectionOperation(this.impl, tx, dbIndexHandler,
keys, projection);
op.userCallback = callback;
return op;
};
/* getTransactionHandler()
IMMEDIATE
RETURNS the current transaction handler, creating it if necessary
*/
NdbSession.prototype.getTransactionHandler = function() {
if(! this.tx) {
this.tx = new dbtxhandler.DBTransactionHandler(this);
}
return this.tx;
};
/* begin()
IMMEDIATE
Begin a user transaction context; exit autocommit mode.
*/
NdbSession.prototype.begin = function() {
var tx = this.getTransactionHandler();
assert(tx.executedOperations.length === 0);
tx.autocommit = false;
};
/* commit(callback)
ASYNC
Commit a user transaction.
Callback is optional; if supplied, will receive (err).
*/
NdbSession.prototype.commit = function(userCallback) {
this.tx.commit(userCallback);
};
/* rollback(callback)
ASYNC
Roll back a user transaction.
Callback is optional; if supplied, will receive (err).
*/
NdbSession.prototype.rollback = function (userCallback) {
this.tx.rollback(userCallback);
};
/* setLockMode(lockMode)
IMMEDIATE
Set Lock Mode for subsequent read operations
*/
NdbSession.prototype.setLockMode = function (lockMode) {
return ndboperation.setLockMode(this, lockMode);
};
exports.DBSession = NdbSession;