master.js


app/cluster/master.js
  

/**

* Multi-Threading High Availability Application Master

*

* @module greppy/app/cluster/master

* @author Hermann Mayer <hermann.mayer92@gmail.com>

*/

var cluster = require('cluster');

var winston = require('winston');

var moment = require('moment');

var colors = require('colors');

var extend = require('extend');

var getopt = require('node-getopt');

var MasterIPC = require('./master/ipc');

/**

* @constructor

* @param {Object} options - Options object

*/

var Master = function(options)

{

// Annotate this

var self = this;

greppy.master = this;

this.options = options;

// Setup cli configuration

var defaultCliConf = {

strict: true,

args: [

['h', 'help', 'Display this help'],

['v', 'version', 'Show version'],

['c', 'context=CONCRETE_WORKER', 'Start the cluster with a concrete worker implementation'],

['', 'debug[=DEBUG_PORT]', 'Start the cluster in debug mode, with a given start port'],

['d', '', 'Start the cluster in debug mode']

],

help: [

"This is a Greppy framework cluster master implementation.\n\n" + "[[OPTIONS]]\n"

].join()

};

// Parse given arguments and prepare them for the master process

var cliConf = extend(true, defaultCliConf, this.options.cli || {});

this.cli = getopt.create(cliConf.args);

this.cli.setHelp(cliConf.help);

this.cliArgs = this.cli.parseSystem();

// Check for strict mode and if args where given

if ((cliConf.strict && 0 === Object.keys(this.cliArgs.options).length)

|| this.cliArgs.options.hasOwnProperty('help')

) {

this.cli.showHelp();

process.exit(0);

}

// Print the application version

if (this.cliArgs.options.hasOwnProperty('version')) {

var package = require(process.cwd() + '/package');

console.log(

'Project '.white + package.name.green.bold +

' version: '.white + ('' + package.version).green.bold

);

process.exit(0);

}

// The master needs the concrete worker switch, so we check it

if (!this.cliArgs.options.hasOwnProperty('context')) {

console.log(

'The '.red + '--context='.red.bold + ' switch was not given. ' +

'You need a concrete worker implementation.'.red + '\n'

);

process.exit(1);

}

if (this.cliArgs.options.hasOwnProperty('d')) {

this.cliArgs.options.debug = 0;

delete this.cliArgs.options.d;

}

if (this.cliArgs.options.hasOwnProperty('debug')) {

this.cliArgs.options.debug = Number(this.cliArgs.options.debug) || 0;

}

// Annotate the worker context

this.contextName = this.cliArgs.options.context;

greppy.context = this.contextName;

// Master stats object

this.stats = {

crashs : [],

metrics : {}

};

};

/**

* Setup the master.

*

* @param {Object} options - Options object

* @param {Function} callback - Function to call on finish

*/

Master.prototype.configure = function(options, callback)

{

var self = this;

this.options = extend(true, this.options, options || {});

// Setup process title

process.title = this.options.title || 'greppy-master';

// Setup winston logger

var defaultLoggerConf = {

colors: {

debug : 'blue',

info : 'grey',

warn : 'yellow',

error : 'red'

},

transports: [

new (winston.transports.Console)({

colorize : true,

timestamp : function() {

return moment().format(

'YYYY-MM-DDTHH:mm:ss.SSSZ'

).yellow.bold + ' [Master]'.red.bold;

},

"level" : 'debug',

"silent" : false

}),

new (winston.transports.File)({

json: false,

filename: process.cwd() + '/var/log/' + this.contextName + '.master.log'

})

]

};

var loggerConf = extend(true, defaultLoggerConf, this.options.logger || {});

this.logger = new winston.Logger(loggerConf);

greppy.logger = global.logger = this.logger;

logger.info('Current environment is ' + greppy.env.bold.green);

var defaultWorkerConf = {

amount : 1,

implementation : process.cwd() + '/app/worker.js',

args : [],

execArgv : []

};

// Setup the IPC pool

this.ipc = new MasterIPC(this);

// Setup worker configuration

var workerConf = extend(true, defaultWorkerConf, this.options.worker || {});

workerConf.args.push('--context', this.contextName);

if (this.cliArgs.options.hasOwnProperty('debug')) {

workerConf.args.push('--debug');

}

// Configure the cluster master

cluster.setupMaster({

exec : workerConf.implementation,

args : workerConf.args,

execArgv : workerConf.execArgv

});

logger.info(

'Starting the cluster (' + (workerConf.amount + ' worker(s)').green.bold + ')'

);

var onlineCounter = 0;

// Online Event, fires the post-configure method

// when all workers are forked.

cluster.on('online', function(){

onlineCounter++;

// Fires the post-configure callback

if (onlineCounter === workerConf.amount) {

callback && callback();

}

});

// Fork Event

cluster.on('fork', function (worker) {

// Add worker to the IPC pool

self.ipc.addProcess(worker);

logger.info(

'Started new worker process (' + ('' + worker.process.pid).green.bold + ')'

);

});

// Exit Event

cluster.on('exit', function(worker, code, signal) {

// Remove pid from the IPC pool

self.ipc.removeProcess(worker.process.pid);

// If we got an zero, no error occured so its

// not a crash. If we got no crash, and none

// workers left the master shutdown, too.

if (((!self.gracefullShutdown) &&

(0 !== worker.process.exitCode && 130 !== worker.process.exitCode) &&

(!self.cliArgs.options.debug)) ||

(self.gracefullReboot)) {

// Reset marker

self.gracefullReboot = false;

self.gracefullShutdown = false;

logger.warn(

'Worker process (' +

('' + worker.process.pid).green.bold +

') died with error code: ' +

('' + worker.process.exitCode).green.bold

);

// Write the crash/exit to our stats

self.stats.crashs.push({

occurredAt : new Date(),

pid : worker.process.pid,

exitCode : worker.process.exitCode

});

// Spawn a new worker process

return cluster.fork();

}

logger.info(

'Worker process (' +

('' + worker.process.pid).green.bold +

') exited successfully'

);

});

// Just fork the workers

for (var i = 0; i < workerConf.amount; i++) {

if (this.cliArgs.options.hasOwnProperty('debug')) {

if (0 === this.cliArgs.options.debug) {

cluster.settings.execArgv.push('--debug');

} else {

cluster.settings.execArgv.push('--debug=' + (++this.cliArgs.options.debug));

}

}

cluster.fork({

logger: logger

});

if (this.cliArgs.options.hasOwnProperty('debug')) {

cluster.settings.execArgv.pop();

}

}

var emitGracefullShutdown = function()

{

logger.info('Catched SIGTERM/SIGINT - emit gracefull shutdown to all cluster workers\n');

self.gracefullShutdown = true;

self.ipc.broadcast('gracefull.shutdown');

};

var emitGracefullReboot = function()

{

if (self.cliArgs.options.debug) {

(new (require('../console'))()).clear();

}

logger.info('Catched SIGHUP - emit gracefull shutdown and reboot to all cluster workers\n');

self.gracefullReboot = true;

self.ipc.broadcast('gracefull.shutdown');

};

// Bind signals

process.on('SIGINT', emitGracefullShutdown);

process.on('SIGTERM ', emitGracefullShutdown);

process.on('SIGHUP', emitGracefullReboot);

};

/**

* Get the master IPC implementation object.

*

* @return {Object} The IPC Object

*/

Master.prototype.getIPC = function()

{

return this.ipc;

};

/**

* Get the master cluster object.

*

* @return {Object} The Cluster Object

*/

Master.prototype.getCluster = function()

{

return cluster;

};

/**

* Get the master commandline interface arguments.

*

* @return {Object} All passed commandline arguments

*/

Master.prototype.getCliArgs = function()

{

return this.cliArgs;

};

module.exports = Master;