Skip to content

Commit

Permalink
redirector: more fixes for publish errors
Browse files Browse the repository at this point in the history
- close connection on error (still throw to caller)
- rpc ping pong keep track of sent pings
- reduce scrubber/nodes_monitor intensity
  • Loading branch information
guymguym committed Feb 26, 2017
1 parent 8ad250a commit 35ea489
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 52 deletions.
10 changes: 6 additions & 4 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,16 @@ config.IO_HTTP_TRUNCATE_PART_SIZE = false;
// REBUILD CONFIG //
////////////////////

config.REBUILD_BATCH_SIZE = 20;
config.REBUILD_BATCH_DELAY = 75;
config.REBUILD_BATCH_ERROR_DELAY = 3000;

config.REBUILD_NODE_ENABLED = true;
config.REBUILD_NODE_CONCURRENCY = 3;
config.REBUILD_NODE_OFFLINE_GRACE = 5 * 60000;
config.REBUILD_NODE_BATCH_SIZE = 10;
config.REBUILD_NODE_BATCH_DELAY = 50;

config.SCRUBBER_ENABLED = true;
config.SCRUBBER_BATCH_SIZE = 10;
config.SCRUBBER_BATCH_DELAY = 50;
config.SCRUBBER_ERROR_DELAY = 3000;
config.SCRUBBER_RESTART_DELAY = 30000;

//////////////////////
Expand Down
45 changes: 22 additions & 23 deletions src/rpc/rpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -558,13 +558,13 @@ RPC.prototype._accept_new_connection = function(conn) {
conn._sent_requests = new Map();
conn._received_requests = new Map();
if (self._disconnected_state) {
conn.close();
throw new Error('RPC IN DISCONNECTED STATE - rejecting connection ' + conn.connid);
const err = new Error('RPC IN DISCONNECTED STATE - rejecting connection ' + conn.connid);
conn.emit('error', err);
throw err;
}
conn.on('message', msg => self._on_message(conn, msg));
conn.on('close', err => self._connection_closed(conn, err));
// we prefer to let the connection handle it's own errors and decide if to close or not
// conn.on('error', self._connection_error.bind(self, conn));
// we let the connection handle it's own errors and decide if to close or not

if (!conn.transient) {
// always replace previous connection in the address map,
Expand All @@ -575,11 +575,21 @@ RPC.prototype._accept_new_connection = function(conn) {
// send pings to keepalive
conn._ping_interval = setInterval(function() {
dbg.log4('RPC PING', conn.connid);
conn._ping_last_reqid = conn._alloc_reqid();
const reqid = conn._alloc_reqid();
conn._ping_reqid_set = conn._ping_reqid_set || new Set();
conn._ping_reqid_set.add(reqid);
if (conn._ping_reqid_set.size > 3) {
const err = new Error(`RPC PINGPONG EXHAUSTED pings ${
Array.from(conn._ping_reqid_set).join(',')
} connid ${conn.connid}`);
dbg.warn(err);
conn.emit('error', err);
return null;
}
P.resolve()
.then(() => conn.send(RpcRequest.encode_message({
op: 'ping',
reqid: conn._ping_last_reqid
reqid: reqid
})))
.catch(_.noop); // already means the conn is closed
return null;
Expand Down Expand Up @@ -652,15 +662,6 @@ RPC.prototype.disconnect_all = function() {
};


/**
*
*/
RPC.prototype._connection_error = function(conn, err) {
dbg.error('RPC CONNECTION ERROR:', conn.connid, conn.url.href, err.stack || err);
conn.close();
};


/**
*
*/
Expand Down Expand Up @@ -746,16 +747,14 @@ RPC.prototype._on_message = function(conn, msg_buffer) {
.catch(_.noop); // already means the conn is closed
break;
case 'pong':
if (conn._ping_last_reqid === msg.header.reqid) {
if (conn._ping_reqid_set && conn._ping_reqid_set.has(msg.header.reqid)) {
dbg.log4('RPC PINGPONG', conn.connid);
conn._ping_mismatch_count = 0;
conn._ping_reqid_set.delete(msg.header.reqid);
} else {
conn._ping_mismatch_count = (conn._ping_mismatch_count || 0) + 1;
if (conn._ping_mismatch_count < 3) {
dbg.warn('RPC PINGPONG MISMATCH #' + conn._ping_mismatch_count, conn.connid);
} else {
conn.close(new Error('RPC PINGPONG MISMATCH #' + conn._ping_mismatch_count + ' ' + conn.connid));
}
dbg.warn(`RPC PINGPONG MISMATCH pings ${
Array.from(conn._ping_reqid_set).join(',')
} pong ${msg.header.reqid
} connid ${conn.connid}`);
}
break;
default:
Expand Down
8 changes: 4 additions & 4 deletions src/rpc/rpc_conn_set.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@ class RpcConnSet {

add(conn) {
if (conn.is_closed()) {
dbg.warn(this.name, 'ignore closed connection', conn.url.href);
dbg.warn(this.name, 'ignore closed connection', conn.connid);
return;
}
if (this.set.has(conn)) {
dbg.log0(this.name, 'already registered', conn.url.href);
dbg.log0(this.name, 'already registered', conn.connid);
return;
}
dbg.log0(this.name, 'adding connection', conn.url.href);
dbg.log0(this.name, 'adding connection', conn.connid);
this.set.add(conn);
const close_listener = () => this.remove(conn);
conn[CLOSE_LISTENER_SYMBOL] = close_listener;
conn.once('close', close_listener);
}

remove(conn) {
dbg.warn(this.name, 'removing connection', conn.url.href);
dbg.warn(this.name, 'removing connection', conn.connid);
const close_listener = conn[CLOSE_LISTENER_SYMBOL];
delete conn[CLOSE_LISTENER_SYMBOL];
conn.removeListener('close', close_listener);
Expand Down
12 changes: 6 additions & 6 deletions src/server/bg_services/scrubber.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,28 @@ function background_worker() {
dbg.log0('SCRUBBER:', 'BEGIN');
}

return MDStore.instance().iterate_all_chunks(this.marker, config.REBUILD_BATCH_SIZE)
return MDStore.instance().iterate_all_chunks(this.marker, config.SCRUBBER_BATCH_SIZE)
.then(res => {
// update the marker for next time
this.marker = res.marker;
if (!res.chunk_ids.length) return;
dbg.log0('SCRUBBER:', 'WORKING ON', res.chunk_ids.length, 'CHUNKS');
const builder = new map_builder.MapBuilder(res.chunk_ids);
return builder.run()
.catch(err => dbg.error('SCRUBBER:', 'BUILD ERROR', err, err.stack));
return builder.run();
})
.then(() => {
// return the delay before next batch
if (this.marker) {
dbg.log0('SCRUBBER:', 'CONTINUE', this.marker);
return config.REBUILD_BATCH_DELAY;
return config.SCRUBBER_BATCH_DELAY;
}
dbg.log0('SCRUBBER:', 'END');
return config.SCRUBBER_RESTART_DELAY;
}, err => {
})
.catch(err => {
// return the delay before next batch
dbg.error('SCRUBBER:', 'ERROR', err, err.stack);
return config.REBUILD_BATCH_ERROR_DELAY;
return config.SCRUBBER_ERROR_DELAY;
});
}

Expand Down
8 changes: 5 additions & 3 deletions src/server/node_services/nodes_monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -1496,7 +1496,7 @@ class NodesMonitor extends EventEmitter {
setTimeout(() => {
this._set_need_rebuild.add(item);
this._wakeup_rebuild();
}, config.REBUILD_BATCH_DELAY).unref();
}, config.REBUILD_NODE_BATCH_DELAY).unref();
}
}

Expand All @@ -1508,12 +1508,14 @@ class NodesMonitor extends EventEmitter {
setTimeout(() => {
this._set_need_rebuild.add(item);
this._wakeup_rebuild();
}, config.REBUILD_BATCH_DELAY).unref();
}, config.REBUILD_NODE_BATCH_DELAY).unref();
}
}
}

_wakeup_rebuild() {
if (!this._started) return;
if (!config.REBUILD_NODE_ENABLED) return;
const count = Math.min(
config.REBUILD_NODE_CONCURRENCY,
this._set_need_rebuild.size - this._num_running_rebuilds);
Expand Down Expand Up @@ -1556,7 +1558,7 @@ class NodesMonitor extends EventEmitter {
.then(() => MDStore.instance().iterate_node_chunks({
node_id: item.node._id,
marker: start_marker,
limit: config.REBUILD_BATCH_SIZE,
limit: config.REBUILD_NODE_BATCH_SIZE,
}))
.then(res => {
// we update the stage marker even if failed to advance the scan
Expand Down
17 changes: 14 additions & 3 deletions src/server/system_services/redirector.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* Copyright (C) 2016 NooBaa */
'use strict';

const _ = require('lodash');
const P = require('../../util/promise');
const dbg = require('../../util/debug_module')(__filename);
const server_rpc = require('../server_rpc');
Expand All @@ -17,11 +18,19 @@ function publish_to_cluster(req) {
const api_name = req.rpc_params.method_api.slice(0, -4); // remove _api suffix
const method = req.rpc_params.method_name;
const connections = cluster_conn_set.list();
dbg.log0('publish_to_cluster:', connections.length);
dbg.log0('publish_to_cluster:',
api_name, method, req.rpc_params.request_params,
_.map(connections, 'connid'));
return P.map(connections,
conn => server_rpc.client[api_name][method](req.rpc_params.request_params, {
conn => P.resolve(server_rpc.client[api_name][method](req.rpc_params.request_params, {
connection: conn,
auth_token: req.auth_token,
}))
.catch(err => {
// close this connection, assuming this can help to recover
conn.emit('error', new Error(`publish_to_cluster: disconnect on error ${err.message} ${conn.connid}`));
// throw the original error so that callers will receive the root cause reason
throw err;
})
)
.then(res => ({
Expand All @@ -41,7 +50,9 @@ function unregister_from_alerts(req) {

function publish_alerts(req) {
const connections = alerts_conn_set.list();
dbg.log3('publish_alerts:', req.rpc_params.request_params, connections.length);
dbg.log3('publish_alerts:',
req.rpc_params.request_params,
_.map(connections, 'connid'));
return P.map(connections, conn =>
server_rpc.client.frontend_notifications.alert(req.rpc_params.request_params, {
connection: conn,
Expand Down
22 changes: 13 additions & 9 deletions src/server/utils/clustering_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -255,15 +255,19 @@ function get_member_upgrade_status(ip) {
function send_master_update(is_master, master_address) {
let system = system_store.data.systems[0];
if (!system) return P.resolve();
let hosted_agents_promise = is_master ? server_rpc.client.hosted_agents.start() : server_rpc.client.hosted_agents.stop();
let update_master_promise = _.isUndefined(master_address) ? P.resolve() : server_rpc.client.redirector.publish_to_cluster({
method_api: 'server_inter_process_api',
method_name: 'update_master_change',
target: '', // required but irrelevant
request_params: {
master_address: master_address
}
});
let hosted_agents_promise = is_master ?
server_rpc.client.hosted_agents.start() :
server_rpc.client.hosted_agents.stop();
let update_master_promise = _.isUndefined(master_address) ?
P.resolve() :
server_rpc.client.redirector.publish_to_cluster({
method_api: 'server_inter_process_api',
method_name: 'update_master_change',
target: '', // required but irrelevant
request_params: {
master_address: master_address
}
});
return P.join(
server_rpc.client.system.set_webserver_master_state({
is_master: is_master
Expand Down

0 comments on commit 35ea489

Please sign in to comment.