"use strict";
const genericPool = require("generic-pool");
const retry = require("retry-as-promised");
const pick = require("lodash.pick");
const url = require("url");
const pg = require("pg");
const debug = require("debug")("nodePgConnectionPool");
function injectLogger (logger) {
if (!logger) {
logger = {};
}
return Object.assign({
log: function () {},
debug: function () {},
info: function () {},
warn: function () {},
error: function () {}
}, logger);
}
const create = function (pgOptions, pgNative) {
return new Promise((resolve, reject) => {
let client;
if (pgNative === true && !pg.hasOwnProperty("native")) {
const err = new Error("pg-native is not installed");
err.message = "Native bindings not available. Make sure pg-native is installed.";
throw err;
} else if (pgNative === true) {
client = new pg.native.Client(pgOptions);
} else {
client = new pg.Client(pgOptions);
}
client.on("error", function (err) {
reject(err);
});
client.connect(function (err) {
if (err) {
reject(err);
} else {
resolve(client);
}
});
});
};
/**
* @constructor
* @param {object} options - Accepts properties ["name", "pgOptions", "poolOptions", "logger"]
* @param {string} options.name - Name your pool
* @param {object} options.pgOptions - opts from [node-postgres/wiki/Client#parameters]{@link https://github.com/brianc/node-postgres/wiki/Client#parameters}
* @param {object} options.pgNative - Use native bindings
* @param {object} options.poolOptions - opts from [node-pool#createpool]{@link https://github.com/coopernurse/node-pool#createpool}
* @param {object} options.logger - Inject your custom logger
*/
const PgPool = module.exports = function (options) {
options = pick(options, ["name", "pgOptions", "pgNative", "poolOptions", "logger"]);
this.name = options.name || `pgPool-${Math.random().toString(36).substr(2, 10)}`;
this.pgOptions = {};
if (typeof options.pgOptions === "string") {
const params = url.parse(options.pgOptions, true);
const auth = params.auth.split(":");
this.pgOptions = {
user: auth[0],
password: auth[1],
host: params.hostname,
port: params.port,
database: params.pathname.split("/")[1],
ssl: params.query.ssl || true
};
}
this.pgNative = options.pgNative;
this.poolOptions = options.poolOptions;
this.logger = injectLogger(options.logger);
const factory = {
create: () => {
// for retry
let createAttempts = 0;
// this is due to the limitation of node-pool ATM
// https://github.com/coopernurse/node-pool/issues/161#issuecomment-261109882
return retry(function () {
createAttempts++;
if (createAttempts > 3) {
const err = new Error("CONN_FAILED");
debug("Max conn createAttempts reached: %s, resolving to error:", createAttempts, err);
// reset for next try
createAttempts = 0;
return Promise.resolve(err);
}
return create(options.pgOptions, options.pgNative);
}, {
max: 10,
name: "factory.create",
report: debug
});
},
destroy: (client) => {
return new Promise((resolve) => {
try {
// Flush when closing.
client.end(true, () => resolve());
debug("Client conn closed. Available count : %s. Pool size: %s", this.availableCount(), this.getPoolSize());
this.logger.log("Client conn closed. Available count : %s. Pool size: %s", this.availableCount(), this.getPoolSize());
} catch (err) {
debug("Failed to destroy connection", err);
this.logger.error("Failed to destroy connection", err);
// throw error cause infinite event loop; limitation of node-pool
// throw err;
}
});
}
};
// Now that the pool settings are ready create a pool instance.
debug("Creating pool", this.poolOptions);
this.pool = genericPool.createPool(factory, this.poolOptions);
this.pool.on("factoryCreateError", e => {
debug("Errored while connecting Postgres", e);
this.logger.error("Errored while connecting Postgres", e);
});
this.pool.on("factoryDestroyError", e => {
debug("Errored while destroying Postgres conn", e);
this.logger.error("Errored while destroying Postgres conn", e);
});
};
/**
* Run Postgres query [Client#querytext-arrayvalues]{@link https://github.com/brianc/node-postgres/wiki/Client#querystring-querytext-array-values-optional-function-callback-query}
*
* @param {string} queryText - sql query
* @param {array} arrayValues - array values
* @returns {promise} Promise resolve with the result or Error
*/
PgPool.prototype.query = function (queryText, arrayValues) {
return this.pool.acquire()
.then(client => {
return client.query(queryText, arrayValues)
.then(result => {
this.pool.release(client);
return result;
})
.catch(err => {
this.pool.release(client);
this.logger.error("Errored query", err);
debug("Errored query", err);
throw err;
});
});
};
/**
* Acquire a Postgres connection and use an optional priority.
*
* @param {number} priority - priority list number
* @param {number} db - Use the db with range {0-16}
* @returns {promise} Promise resolve with the connection or Error
*/
PgPool.prototype.acquire = function (priority) {
return this.pool.acquire(priority)
.then(client => {
if (client instanceof Error) {
debug("Couldn't acquire connection to %j", this.pgOptions);
this.logger.error("Couldn't acquire connection to %j", this.pgOptions);
throw client;
}
return client;
});
};
/**
* Release a Postgres connection to the pool.
*
* @param {object} client - Postgres connection
* @returns {promise} Promise
*/
PgPool.prototype.release = function (client) {
return this.pool.release(client);
};
/**
* Destroy a Postgres connection.
*
* @param {object} client - Postgres connection
* @returns {promise} Promise
*/
PgPool.prototype.destroy = function (client) {
return this.pool.destroy(client);
};
/**
* Drains the connection pool and call the callback id provided.
*
* @returns {promise} Promise
*/
PgPool.prototype.drain = function () {
return this.pool.drain(() => this.pool.clear());
};
/**
* Returns factory.name for this pool
*
* @returns {string} Name of the pool
*/
PgPool.prototype.getName = function () {
return this.name;
};
/**
* Returns this.pgOptions for this pool
*
* @returns {object} Postgres options given
*/
PgPool.prototype.getPgOptions = function () {
return this.pgOptions;
};
/**
* Returns this.poolOptions for this pool
*
* @returns {object} pool options given
*/
PgPool.prototype.getPoolOptions = function () {
return this.poolOptions;
};
/**
* Returns size of the pool
*
* @returns {number} size of the pool
*/
PgPool.prototype.getPoolSize = function () {
return this.pool.size;
};
/**
* Returns available connections count of the pool
*
* @returns {number} available connections count of the pool
*/
PgPool.prototype.availableCount = function () {
return this.pool.available;
};
/**
* Returns pending connections count of the pool
*
* @returns {number} pending connections count of the pool
*/
PgPool.prototype.pendingCount = function () {
return this.pool.pending;
};
/**
* Returns pool status and stats
*
* @returns {object} pool status and stats
*/
PgPool.prototype.status = function () {
return {
name: this.name,
size: this.pool.size,
available: this.pool.available,
pending: this.pool.pending
};
};