database-jones/Adapter/common/SQLTransactionHandler.js (141 lines of code) (raw):
/*
Copyright (c) 2014, Oracle and/or its affiliates. All rights
reserved.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
02110-1301 USA
*/
"use strict";
var stats = {
"execute" : { "commit": 0, "no_commit" : 0},
"closed" : 0,
"commit" : 0,
"rollback" : 0
};
var jones = require("database-jones"),
unified_debug = require("unified_debug"),
udebug = unified_debug.getLogger("SQLTransactionHandler.js"),
stats_module = require(jones.api.stats);
stats_module.register(stats, "spi", "SQLTransactionHandler");
/**
* TransactionHandler is responsible for executing operations that were defined
* via DBSession.buildXXXOperation. UserContext is responsible for creating the
* operations and for calling TransactionHandler.execute when they are ready for execution.
*
* A batch of operations is executed in sequence. Each batch is defined by a closure
* which contains an operationsList and a callback. The callback is the function
* that is to be called once all operations in the operationsList have been completely
* executed including the user callback for each operation.
*
* The list of closures is contained in the pendingBatches list. If the pendingBatches list
* is non-empty at the time execute is called, a batch closure is created with the parameters
* to the execute function (operationsList and executionCompleteCallback) and the closure is
* pushed onto the pendingBatches list. In the fullness of time, the operations will be executed
* and the callback will be called.
*
* Within the execution of a single batch as defined by execute, each operation is executed
* in sequence. With AbortOnError set to true, an error returned by any operation aborts the
* transaction. This implies that a failure to insert a row due to duplicate key exception,
* or a failure to delete a row due to row not found will fail the transaction. This is the only
* implementable strategy for dealing with the mysql server due to the error handling at the
* server. The server will decide to roll back a transaction on certain errors, but will not
* notify the client that it has done so. The client will behave as if operations that succeeded
* will be effective upon commit, but in fact, some operations that succeeded will be rolled back
* if a subsequent operation fails. Therefore, AbortOnError is the only strategy that will detect
* errors and report them to the user.
*
* The implementation strategy involves keeping track for each transaction if there has been an error
* reported, and returning an error on all subsequent operations. This is accomplished by setting
* RollbackOnly on failed transactions, and keeping track of the error that caused the RollbackOnly
* status to be set. Since users can also call setRollbackOnly, a different Error object is created
* that indicates UserError. For errors reported by the mysql adapter, the original Error is
* reported to the operation that caused it, and a different TransactionRolledBackError error
* that includes the original error is created and reported to subsequent operations as well as
* to the transaction.execute callback.
*
* Errors reported in the transaction callback contain the cause of the transaction error. A member
* property of error, cause, is introduced to contain the underlying cause. A transaction error
* caused by a duplicate key error on insert will contain the DBOperationError as the cause.
*/
function TransactionRolledBackError(err) {
this.cause = err;
this.sqlstate = 'HY000';
this.message = 'Transaction was aborted due to operation failure. See this.cause for underlying error.';
}
function SQLTransactionHandler(dbSession, sqlSocket, autocommit) {
udebug.log('new TransactionHandler');
this.dbSession = dbSession;
this.sqlSocket = sqlSocket;
this.autocommit = autocommit;
this.firstTime = ! autocommit;
this.numberOfOperations = 0;
this.currentOperation = 0;
this.operationsList = null;
this.executedOperations = [];
this.pendingBatches = [];
this.isCommitting = false;
this.transactionExecuteCallback = null;
}
SQLTransactionHandler.prototype.executeOperations = function() {
var transactionHandler = this;
this.isCommitting = false;
this.numberOfOperations = this.operationsList.length;
udebug.log('executeOperations numberOfOperations: ', this.numberOfOperations);
// execute the first operation; the operationCompleteCallback will execute each successive operation
this.currentOperation = 0;
this.operationsList[0].execute(this.sqlSocket, function(op) {
transactionHandler.operationCompleteCallback(op);
});
};
SQLTransactionHandler.prototype.execute = function(operationsList, transactionExecuteCallback) {
var transactionHandler = this;
function executeOnBegin(err) {
if (err) {
transactionHandler.transactionExecuteCallback(err);
}
transactionHandler.firstTime = false;
transactionHandler.executeOperations();
}
// execute begin operation the first time for non-autocommit
if (this.firstTime) {
stats.execute.no_commit++;
this.operationsList = operationsList;
this.transactionExecuteCallback = transactionExecuteCallback;
this.begin(executeOnBegin);
} else {
stats.execute.commit++;
if (this.numberOfOperations > 0) {
// there are pending batches, so just put this request on the list
this.pendingBatches.push(
{list: operationsList,
callback: transactionExecuteCallback
});
} else {
// this is the first (only) so execute it now
this.operationsList = operationsList;
this.transactionExecuteCallback = transactionExecuteCallback;
this.executeOperations();
}
}
};
SQLTransactionHandler.prototype.close = function() {
stats.closed++;
};
SQLTransactionHandler.prototype.batchComplete = function() {
var nextBatch;
if (typeof(this.transactionExecuteCallback) === 'function') {
this.transactionExecuteCallback(this.error, this);
}
// reset executedOperations if the transaction execute callback did not pop them
this.executedOperations = [];
// reset number of operations (after callbacks are done)
this.numberOfOperations = 0;
// if we committed the transaction, tell dbSession we are gone
if (this.isCommitting) {
//HELP ME!
this.dbSession.transactionHandler = null;
//HELP ME!
}
// see if there are any pending batches to execute
// each pending batch consists of an operation list and a callback
if (this.pendingBatches.length !== 0) {
// remove the first pending batch from the list (FIFO)
nextBatch = this.pendingBatches.shift();
this.operationsList = nextBatch.list;
this.transactionExecuteCallback = nextBatch.callback;
delete this.error;
this.executeOperations();
}
};
SQLTransactionHandler.prototype.operationCompleteCallback = function(completedOperation) {
var transactionHandler = this;
var complete, operation;
udebug.log("operationCompleteCallback", completedOperation.type);
// analyze the completed operation to see if it had an error
if (completedOperation.result.error) {
// this is AbortOnError behavior
// propagate the error to the transaction object
this.error = new TransactionRolledBackError(completedOperation.result.error);
}
this.executedOperations.push(completedOperation);
complete = this.executedOperations.length;
udebug.log_detail("Completed", complete, "of", this.numberOfOperations);
if (complete === this.numberOfOperations) {
this.batchComplete();
} else {
// there are more operations to execute in this batch
if (this.error) {
// do not execute the remaining operations, but call their callbacks with the propagated error
// transactionHandler.currentOperation refers to the current (error) operation
this.currentOperation++;
for (this.currentOperation;
this.currentOperation < this.numberOfOperations;
this.currentOperation++) {
udebug.log_detail('error aborting operation ' + this.currentOperation);
operation = this.operationsList[this.currentOperation];
operation.result.error = this.error;
if (typeof(operation.callback) === 'function') {
operation.callback(this.error, operation);
}
this.executedOperations.push(operation);
}
// finally, execute the batch complete function
this.batchComplete();
} else {
// execute the next operation in the current batch
this.currentOperation++;
this.operationsList[this.currentOperation].execute(this.sqlSocket, function(op) {
transactionHandler.operationCompleteCallback(op);
});
}
}
};
SQLTransactionHandler.prototype.begin = function(callback) {
this.sqlSocket.query("begin", callback);
};
SQLTransactionHandler.prototype.commit = function(callback) {
udebug.log('SQLTransactionHandler.commit.');
stats.commit++;
this.autocommit = true;
this.sqlSocket.query("commit", callback);
};
SQLTransactionHandler.prototype.rollback = function(callback) {
udebug.log('SQLTransactionHandler.rollback.');
stats.rollback++;
this.autocommit = true;
this.sqlSocket.query("rollback", callback);
};
module.exports = SQLTransactionHandler;