jones-mysql/impl/MySQLConnectionPool.js (383 lines of code) (raw):

/* Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ "use strict"; /* Requires version 2.0 of Felix Geisendoerfer's MySQL client */ var stats = { "created" : 0, "list_tables" : 0, "get_table_metadata" : 0, "connections" : { "successful" : 0, "failed" : 0 } }; var mysql = require("mysql"); var jones = require("database-jones"); var mysqlConnection = require("./MySQLConnection.js"); var mysqlDictionary = require("./MySQLDictionary.js"); var udebug = unified_debug.getLogger("MySQLConnectionPool.js"); var util = require('util'); var stats_module = require(jones.api.stats); var MySQLTime = require(jones.common.MySQLTime); var SQLBuilder = require(jones.common.SQLBuilder); var DBTableHandler = require(jones.common.DBTableHandler).DBTableHandler; stats_module.register(stats, "spi","mysql","DBConnectionPool"); var sqlBuilder = new SQLBuilder(); /* Translate our properties to the driver's */ function getDriverProperties(props) { var driver = {}; if(props.mysql_socket) { driver.SocketPath = props.mysql_socket; } else { driver.host = props.mysql_host; driver.port = props.mysql_port; } if(props.mysql_user) { driver.user = props.mysql_user; } if(props.mysql_password) { driver.password = props.mysql_password; } driver.database = props.database; driver.debug = props.mysql_debug; driver.trace = props.mysql_trace; if (props.mysql_charset) { driver.charset = props.mysql_charset; } else { // by default, use utf-8 multibyte for character encoding driver.charset = 'UTF8MB4'; } if (props.mysql_sql_mode !== undefined) { driver.sql_mode = props.mysql_sql_mode; } else { // default to STRICT_ALL_TABLES driver.sql_mode = 'STRICT_ALL_TABLES'; } // connection pool maximum size if (props.mysql_pool_size !== undefined) { driver.connectionLimit = props.mysql_pool_size; if (props.mysql_pool_queue_size !== 'undefined') { driver.queueLimit = props.mysql_pool_queue_size; } } // allow multiple statements in one query (used to set character set) driver.multipleStatements = true; return driver; } /** Default domain type converter for timestamp and datetime objects. The domain type is Date * and the intermediate type is MySQLTime. MySQLTime provides a lossless conversion from * database DATETIME and TIMESTAMP with fractional microseconds. The default domain type converter * to javascript Date is lossy: javascript Date does not support microseconds. Users might supply * their own domain type with a converter that supports microseconds. */ var DomainTypeConverterDateTime = function() { // just a bit of documentation for debugging this.converter = 'DomainTypeConverterDateTime'; }; DomainTypeConverterDateTime.prototype.toDB = function toDB(userDate) { if (userDate === null || userDate === undefined) { return userDate; } // convert to the string form of the mySQLTime object var mysqlTime = new MySQLTime(); mysqlTime.fsp = 6; mysqlTime.initializeFromJsDateLocal(userDate); return mysqlTime; }; DomainTypeConverterDateTime.prototype.fromDB = function fromDB(mysqlTime) { if (mysqlTime === null || mysqlTime === undefined) { return mysqlTime; } var jsDate = mysqlTime.toJsDateLocal(); return jsDate; }; /** Default database type converter for timestamp and datetime objects. The database type is string * and the intermediate type is MySQLTime. MySQLTime provides a lossless conversion from * database DATETIME and TIMESTAMP with fractional microseconds. */ var DatabaseTypeConverterDateTime = function() { // just a bit of documentation for debugging this.converter = 'DatabaseTypeConverterDateTime'; }; DatabaseTypeConverterDateTime.prototype.toDB = function toDB(mysqlTime) { if (typeof mysqlTime !== 'object' || mysqlTime === null) { return mysqlTime; } // convert to the string form of the mySQLTime object var dbDateTime = mysqlTime.toDateTimeString(); return dbDateTime; }; DatabaseTypeConverterDateTime.prototype.fromDB = function fromDB(dbDateTime) { if (dbDateTime === null || dbDateTime === undefined) { return dbDateTime; } var mysqlTime = new MySQLTime(); mysqlTime.initializeFromDateTimeString(dbDateTime); return mysqlTime; }; /* Constructor saves properties but doesn't actually do anything with them until connect is called. */ exports.DBConnectionPool = function(props) { this.props = props; this.driverproperties = getDriverProperties(props); udebug.log('MySQLConnectionPool constructor with driverproperties: ' + util.inspect(this.driverproperties)); // connections that are being used (wrapped by DBSession) this.openConnections = []; this.is_connected = false; // create database type converter map this.databaseTypeConverterMap = {}; this.databaseTypeConverterMap.TIMESTAMP = new DatabaseTypeConverterDateTime(); this.databaseTypeConverterMap.DATETIME = new DatabaseTypeConverterDateTime(); this.databaseTypeConverterMap.JSON = jones.converters.JSONConverter; // create domain type converter map this.domainTypeConverterMap = {}; this.domainTypeConverterMap.TIMESTAMP = new DomainTypeConverterDateTime(); this.domainTypeConverterMap.DATETIME = new DomainTypeConverterDateTime(); this.pooling = props.mysql_pool_size ? true:false ; stats.created++; }; /* Capabilities provided by this connection. */ exports.DBConnectionPool.prototype.getCapabilities = function() { return { "UniqueIndexes" : true, // Tables can have secondary unique keys "TableScans" : true, // Query can scan a table "OrderedIndexScans" : true, // Query can scan an index "ForeignKeys" : true // Named foreign key relationships }; }; /** Register a user-specified domain type converter for this connection pool. * Called by SessionFactory.registerTypeConverter. */ exports.DBConnectionPool.prototype.registerTypeConverter = function(typeName, converterObject) { if (converterObject) { this.domainTypeConverterMap[typeName] = converterObject; } else { this.domainTypeConverterMap[typeName] = undefined; } }; /** Get the database type converter for the parameter type name. * Called when creating the DBTableHandler for a constructor. */ exports.DBConnectionPool.prototype.getDatabaseTypeConverter = function(typeName) { return this.databaseTypeConverterMap[typeName]; }; /** Get the domain type converter for the parameter type name. * Called when creating the DBTableHandler for a constructor. */ exports.DBConnectionPool.prototype.getDomainTypeConverter = function(typeName) { return this.domainTypeConverterMap[typeName]; }; /** Get a connection. If pooling via felix, get a connection from the pool. * If not, create a connection. This api does not manage the list of open connections. * * @param callback (err, connection) */ exports.DBConnectionPool.prototype.getConnection = function(callback) { var connectionPool = this; var connection, error; function getConnectionOnConnection(err, c) { udebug.log('getConnectionOnConnection'); if (err) { stats.connections.failed++; // create a new Error with a message and this stack error = new Error('Connection failed.'); // add cause to the error error.cause = err; // add sqlstate to error error.sqlstate = '08000'; callback(error); } else { stats.connections.successful++; if (connectionPool.pooling) { // some older versions of node-mysql do not have release() if(typeof c.release !== 'function') { c.release = c.end; } callback(null, c); } else { callback(null, connection); } } } // getConnection starts here if (connectionPool.pooling) { // get a connection from the felix pool udebug.log('getConnection using connection pooling: true'); connectionPool.pool.getConnection(getConnectionOnConnection); } else if (connectionPool.is_connected || connectionPool.is_connecting) { // create a new connection udebug.log('getConnection using connection pooling: false'); connection = mysql.createConnection(connectionPool.driverproperties); connection.connect(getConnectionOnConnection); } else { // error callback(new Error('getConnection called before connect.')); } }; /** Release a connection (synchronous). If pooling via felix, return the connection to the pool. * If not, end the connection. No errors are reported to the user. */ exports.DBConnectionPool.prototype.releaseConnection = function(connection) { var connectionPool = this; if (connectionPool.pooling) { udebug.log('releaseConnection using connection pooling: true'); connection.release(); } else { udebug.log('releaseConnection using connection pooling: false'); connection.end(); } }; /** Connect to the database. Verify connection properties. * @param user_callback (err, connectionPool) */ exports.DBConnectionPool.prototype.connect = function(callback) { var connectionPool = this; var error; function connectOnConnection(err, connection) { connectionPool.is_connecting = false; if (err) { stats.connections.failed++; // create a new Error with a message and this stack error = new Error('Connection to MySQL server failed.\n' + 'Your connection properties have been translated into node-mysql connection options:\n' + 'https://github.com/felixge/node-mysql/#connection-options\n' + util.inspect(connectionPool.driverproperties)); // add cause to the error error.cause = err; // add sqlstate to error error.sqlstate = '08000'; callback(error); } else { stats.connections.successful++; connectionPool.is_connected = true; connectionPool.releaseConnection(connection); callback(null, connectionPool); } } // connect begins here if (connectionPool.is_connected) { udebug.log('MySQLConnectionPool.connect is already connected'); callback(null, connectionPool); } else { connectionPool.is_connecting = true; if (connectionPool.pooling) { connectionPool.pool = mysql.createPool(connectionPool.driverproperties); } // verify that the connection properties work by getting a connection connectionPool.getConnection(connectOnConnection); } }; exports.DBConnectionPool.prototype.close = function(user_callback) { var connectionPool = this; udebug.log('close'); var i, openConnection; for (i = 0; i < this.openConnections.length; ++i) { openConnection = this.openConnections[i]; udebug.log('close ending open connection', i); if (openConnection && openConnection._connectCalled) { connectionPool.releaseConnection(openConnection); } } this.openConnections = []; this.is_connected = false; // end the underlying connection pool (close all connections gracefully) connectionPool.pool.end(function(err) { user_callback(err); }); }; exports.DBConnectionPool.prototype.destroy = function() { }; exports.DBConnectionPool.prototype.isConnected = function() { return this.is_connected; }; var countOpenConnections = function(connectionPool) { var i, count = 0; for (i = 0; i < connectionPool.openConnections.length; ++i) { if (connectionPool.openConnections[i] !== null) { count++; } } return count; }; exports.DBConnectionPool.prototype.getDBSession = function(index, callback) { var connectionPool = this; var newDBSession = null; var charset = connectionPool.driverproperties.charset; var charsetQuery = 'SET character_set_client=\'' + charset + '\';SET character_set_connection=\'' + charset + '\';SET character_set_results=\'' + charset + '\';'; var sqlModeQuery = ''; // set SQL_MODE if specified in driverproperties if (connectionPool.driverproperties.sql_mode !== undefined) { sqlModeQuery = 'SET SQL_MODE = \'' + connectionPool.driverproperties.sql_mode + '\';'; } udebug.log(sqlModeQuery); function charsetComplete(err) { callback(err, newDBSession); } var connected_callback = function(err, pooledConnection) { if (err) { callback(err); return; } newDBSession = new mysqlConnection.DBSession(pooledConnection, connectionPool, index); connectionPool.openConnections[index] = pooledConnection; udebug.log_detail('MySQLConnectionPool.getDBSession created a new pooledConnection for index ' + index + ' ; ', ' openConnections: ', countOpenConnections(connectionPool)); // set character set server variables pooledConnection.query(charsetQuery + sqlModeQuery, charsetComplete); }; // create a new connection connectionPool.getConnection(connected_callback); }; /** Close the connection being used by the dbSession. * @param dbSession contains index the index into the openConnections array * pooledConnection the connection being used * @param callback when the connection is closed call the user */ exports.DBConnectionPool.prototype.closeConnection = function(dbSession, callback) { var connectionPool = this; if (dbSession.pooledConnection) { connectionPool.releaseConnection(dbSession.pooledConnection); dbSession.pooledConnection = null; } connectionPool.openConnections[dbSession.index] = null; if (typeof callback === 'function') { callback(null); } }; exports.DBConnectionPool.prototype.getTableMetadata = function(databaseName, tableName, dbSession, user_callback) { var connectionPool = this; var connection, dictionary; stats.get_table_metadata++; function getTableMetadataOnMetadata(err, metadata) { if (!dbSession) { connectionPool.releaseConnection(connection); } user_callback(err, metadata); } function getTableMetadataOnConnection(err, c) { if (err) { user_callback(err); } else { connection = c; dictionary = new mysqlDictionary.DataDictionary(connection, connectionPool); udebug.log_detail('MySQLConnectionPool.getTableMetadata calling dictionary.getTableMetadata for', databaseName, tableName); dictionary.getTableMetadata(databaseName, tableName, getTableMetadataOnMetadata); } } // getTableMetadata starts here if (dbSession) { // dbSession exists; use the connection in the db session getTableMetadataOnConnection(null, dbSession.pooledConnection); } else { // dbSession does not exist; get a connection for the call connectionPool.getConnection(getTableMetadataOnConnection); } }; exports.DBConnectionPool.prototype.listTables = function(databaseName, dbSession, user_callback) { var connectionPool = this; var connection, dictionary; stats.list_tables++; function listTablesOnTableList(err, list) { if (!dbSession) { // return the connection we got just for this call connectionPool.releaseConnection(connection); } // return the list to the user user_callback(err, list); } function listTablesOnConnection(err, c) { if (err) { user_callback(err); } else { connection = c; dictionary = new mysqlDictionary.DataDictionary(connection); dictionary.listTables(databaseName, listTablesOnTableList); } } // listTables starts here if (dbSession) { listTablesOnConnection(null, dbSession.pooledConnection); } else { // dbSession does not exist; get a connection for the call connectionPool.getConnection(listTablesOnConnection); } }; /** Create the table in the database for the mapping. * @param tableMapping the mapping for this table * @param session the session to use for database operations * @param user_callback the user callback(err) * @return err if any errors * * Side effect: if TableMapping.database is not set, createTable() sets it * to the default database */ exports.DBConnectionPool.prototype.createTable = function(tableMapping, session, user_callback) { var engine = this.props.mysql_storage_engine; var connectionPool = this; var connection; function createTableOnQuery(err) { if (!session || !session.dbSession) { // return the connection we got just for this call connectionPool.releaseConnection(connection); } user_callback(err); } function createTableOnConnection(err, c) { var createTableSQL; if (err) { user_callback(err); } else { connection = c; createTableSQL = sqlBuilder.getSqlForTableCreation(tableMapping, engine); connection.query(createTableSQL, createTableOnQuery); } } // createTable starts here udebug.log('createTable for tableMapping:', tableMapping); if(! tableMapping.database) { tableMapping.database = this.driverproperties.database; } if (session && session.dbSession) { // dbSession exists; use the connection in the db session createTableOnConnection(null, session.dbSession.pooledConnection); } else { // dbSession does not exist; get a connection for the call this.getConnection(createTableOnConnection); } }; exports.DBConnectionPool.prototype.dropTable = function(dbName, tableName, session, user_callback) { var connectionPool, connection, qualifiedName; connectionPool = this; function dropTableOnQuery(err) { if (!session || !session.dbSession) { // return the connection we got just for this call connectionPool.releaseConnection(connection); } user_callback(err); } function dropTableOnConnection(err, c) { if(err) { user_callback(err); } else { connection = c; connection.query("DROP TABLE IF EXISTS " + qualifiedName, dropTableOnQuery); } } // dropTable starts here qualifiedName = dbName + "." + tableName; udebug.log('dropTable for ', qualifiedName); if(session && session.dbSession) { dropTableOnConnection(null, session.dbSession.pooledConnection); } else { this.getConnection(dropTableOnConnection); } };