From 35ea489ff627c1c5edbac31a06d4baa93396368b Mon Sep 17 00:00:00 2001 From: Guy Margalit Date: Thu, 23 Feb 2017 21:06:34 +0200 Subject: [PATCH] redirector: more fixes for publish errors - close connection on error (still throw to caller) - rpc ping pong keep track of sent pings - reduce scrubber/nodes_monitor intensity --- config.js | 10 +++-- src/rpc/rpc.js | 45 +++++++++++------------ src/rpc/rpc_conn_set.js | 8 ++-- src/server/bg_services/scrubber.js | 12 +++--- src/server/node_services/nodes_monitor.js | 8 ++-- src/server/system_services/redirector.js | 17 +++++++-- src/server/utils/clustering_utils.js | 22 ++++++----- 7 files changed, 70 insertions(+), 52 deletions(-) diff --git a/config.js b/config.js index bd276fc47a..9259d08740 100644 --- a/config.js +++ b/config.js @@ -75,14 +75,16 @@ config.IO_HTTP_TRUNCATE_PART_SIZE = false; // REBUILD CONFIG // //////////////////// -config.REBUILD_BATCH_SIZE = 20; -config.REBUILD_BATCH_DELAY = 75; -config.REBUILD_BATCH_ERROR_DELAY = 3000; - +config.REBUILD_NODE_ENABLED = true; config.REBUILD_NODE_CONCURRENCY = 3; config.REBUILD_NODE_OFFLINE_GRACE = 5 * 60000; +config.REBUILD_NODE_BATCH_SIZE = 10; +config.REBUILD_NODE_BATCH_DELAY = 50; config.SCRUBBER_ENABLED = true; +config.SCRUBBER_BATCH_SIZE = 10; +config.SCRUBBER_BATCH_DELAY = 50; +config.SCRUBBER_ERROR_DELAY = 3000; config.SCRUBBER_RESTART_DELAY = 30000; ////////////////////// diff --git a/src/rpc/rpc.js b/src/rpc/rpc.js index 39a4cd418e..16e828b37c 100644 --- a/src/rpc/rpc.js +++ b/src/rpc/rpc.js @@ -558,13 +558,13 @@ RPC.prototype._accept_new_connection = function(conn) { conn._sent_requests = new Map(); conn._received_requests = new Map(); if (self._disconnected_state) { - conn.close(); - throw new Error('RPC IN DISCONNECTED STATE - rejecting connection ' + conn.connid); + const err = new Error('RPC IN DISCONNECTED STATE - rejecting connection ' + conn.connid); + conn.emit('error', err); + throw err; } conn.on('message', msg => self._on_message(conn, msg)); conn.on('close', err => self._connection_closed(conn, err)); - // we prefer to let the connection handle it's own errors and decide if to close or not - // conn.on('error', self._connection_error.bind(self, conn)); + // we let the connection handle it's own errors and decide if to close or not if (!conn.transient) { // always replace previous connection in the address map, @@ -575,11 +575,21 @@ RPC.prototype._accept_new_connection = function(conn) { // send pings to keepalive conn._ping_interval = setInterval(function() { dbg.log4('RPC PING', conn.connid); - conn._ping_last_reqid = conn._alloc_reqid(); + const reqid = conn._alloc_reqid(); + conn._ping_reqid_set = conn._ping_reqid_set || new Set(); + conn._ping_reqid_set.add(reqid); + if (conn._ping_reqid_set.size > 3) { + const err = new Error(`RPC PINGPONG EXHAUSTED pings ${ + Array.from(conn._ping_reqid_set).join(',') + } connid ${conn.connid}`); + dbg.warn(err); + conn.emit('error', err); + return null; + } P.resolve() .then(() => conn.send(RpcRequest.encode_message({ op: 'ping', - reqid: conn._ping_last_reqid + reqid: reqid }))) .catch(_.noop); // already means the conn is closed return null; @@ -652,15 +662,6 @@ RPC.prototype.disconnect_all = function() { }; -/** - * - */ -RPC.prototype._connection_error = function(conn, err) { - dbg.error('RPC CONNECTION ERROR:', conn.connid, conn.url.href, err.stack || err); - conn.close(); -}; - - /** * */ @@ -746,16 +747,14 @@ RPC.prototype._on_message = function(conn, msg_buffer) { .catch(_.noop); // already means the conn is closed break; case 'pong': - if (conn._ping_last_reqid === msg.header.reqid) { + if (conn._ping_reqid_set && conn._ping_reqid_set.has(msg.header.reqid)) { dbg.log4('RPC PINGPONG', conn.connid); - conn._ping_mismatch_count = 0; + conn._ping_reqid_set.delete(msg.header.reqid); } else { - conn._ping_mismatch_count = (conn._ping_mismatch_count || 0) + 1; - if (conn._ping_mismatch_count < 3) { - dbg.warn('RPC PINGPONG MISMATCH #' + conn._ping_mismatch_count, conn.connid); - } else { - conn.close(new Error('RPC PINGPONG MISMATCH #' + conn._ping_mismatch_count + ' ' + conn.connid)); - } + dbg.warn(`RPC PINGPONG MISMATCH pings ${ + Array.from(conn._ping_reqid_set).join(',') + } pong ${msg.header.reqid + } connid ${conn.connid}`); } break; default: diff --git a/src/rpc/rpc_conn_set.js b/src/rpc/rpc_conn_set.js index 69c4e9fcc0..098c3b32ad 100644 --- a/src/rpc/rpc_conn_set.js +++ b/src/rpc/rpc_conn_set.js @@ -14,14 +14,14 @@ class RpcConnSet { add(conn) { if (conn.is_closed()) { - dbg.warn(this.name, 'ignore closed connection', conn.url.href); + dbg.warn(this.name, 'ignore closed connection', conn.connid); return; } if (this.set.has(conn)) { - dbg.log0(this.name, 'already registered', conn.url.href); + dbg.log0(this.name, 'already registered', conn.connid); return; } - dbg.log0(this.name, 'adding connection', conn.url.href); + dbg.log0(this.name, 'adding connection', conn.connid); this.set.add(conn); const close_listener = () => this.remove(conn); conn[CLOSE_LISTENER_SYMBOL] = close_listener; @@ -29,7 +29,7 @@ class RpcConnSet { } remove(conn) { - dbg.warn(this.name, 'removing connection', conn.url.href); + dbg.warn(this.name, 'removing connection', conn.connid); const close_listener = conn[CLOSE_LISTENER_SYMBOL]; delete conn[CLOSE_LISTENER_SYMBOL]; conn.removeListener('close', close_listener); diff --git a/src/server/bg_services/scrubber.js b/src/server/bg_services/scrubber.js index 644ad5de67..88f1492865 100644 --- a/src/server/bg_services/scrubber.js +++ b/src/server/bg_services/scrubber.js @@ -31,28 +31,28 @@ function background_worker() { dbg.log0('SCRUBBER:', 'BEGIN'); } - return MDStore.instance().iterate_all_chunks(this.marker, config.REBUILD_BATCH_SIZE) + return MDStore.instance().iterate_all_chunks(this.marker, config.SCRUBBER_BATCH_SIZE) .then(res => { // update the marker for next time this.marker = res.marker; if (!res.chunk_ids.length) return; dbg.log0('SCRUBBER:', 'WORKING ON', res.chunk_ids.length, 'CHUNKS'); const builder = new map_builder.MapBuilder(res.chunk_ids); - return builder.run() - .catch(err => dbg.error('SCRUBBER:', 'BUILD ERROR', err, err.stack)); + return builder.run(); }) .then(() => { // return the delay before next batch if (this.marker) { dbg.log0('SCRUBBER:', 'CONTINUE', this.marker); - return config.REBUILD_BATCH_DELAY; + return config.SCRUBBER_BATCH_DELAY; } dbg.log0('SCRUBBER:', 'END'); return config.SCRUBBER_RESTART_DELAY; - }, err => { + }) + .catch(err => { // return the delay before next batch dbg.error('SCRUBBER:', 'ERROR', err, err.stack); - return config.REBUILD_BATCH_ERROR_DELAY; + return config.SCRUBBER_ERROR_DELAY; }); } diff --git a/src/server/node_services/nodes_monitor.js b/src/server/node_services/nodes_monitor.js index e8447a1fa0..d5675a4233 100644 --- a/src/server/node_services/nodes_monitor.js +++ b/src/server/node_services/nodes_monitor.js @@ -1496,7 +1496,7 @@ class NodesMonitor extends EventEmitter { setTimeout(() => { this._set_need_rebuild.add(item); this._wakeup_rebuild(); - }, config.REBUILD_BATCH_DELAY).unref(); + }, config.REBUILD_NODE_BATCH_DELAY).unref(); } } @@ -1508,12 +1508,14 @@ class NodesMonitor extends EventEmitter { setTimeout(() => { this._set_need_rebuild.add(item); this._wakeup_rebuild(); - }, config.REBUILD_BATCH_DELAY).unref(); + }, config.REBUILD_NODE_BATCH_DELAY).unref(); } } } _wakeup_rebuild() { + if (!this._started) return; + if (!config.REBUILD_NODE_ENABLED) return; const count = Math.min( config.REBUILD_NODE_CONCURRENCY, this._set_need_rebuild.size - this._num_running_rebuilds); @@ -1556,7 +1558,7 @@ class NodesMonitor extends EventEmitter { .then(() => MDStore.instance().iterate_node_chunks({ node_id: item.node._id, marker: start_marker, - limit: config.REBUILD_BATCH_SIZE, + limit: config.REBUILD_NODE_BATCH_SIZE, })) .then(res => { // we update the stage marker even if failed to advance the scan diff --git a/src/server/system_services/redirector.js b/src/server/system_services/redirector.js index 192e4a2a73..85b6253c3a 100644 --- a/src/server/system_services/redirector.js +++ b/src/server/system_services/redirector.js @@ -1,6 +1,7 @@ /* Copyright (C) 2016 NooBaa */ 'use strict'; +const _ = require('lodash'); const P = require('../../util/promise'); const dbg = require('../../util/debug_module')(__filename); const server_rpc = require('../server_rpc'); @@ -17,11 +18,19 @@ function publish_to_cluster(req) { const api_name = req.rpc_params.method_api.slice(0, -4); // remove _api suffix const method = req.rpc_params.method_name; const connections = cluster_conn_set.list(); - dbg.log0('publish_to_cluster:', connections.length); + dbg.log0('publish_to_cluster:', + api_name, method, req.rpc_params.request_params, + _.map(connections, 'connid')); return P.map(connections, - conn => server_rpc.client[api_name][method](req.rpc_params.request_params, { + conn => P.resolve(server_rpc.client[api_name][method](req.rpc_params.request_params, { connection: conn, auth_token: req.auth_token, + })) + .catch(err => { + // close this connection, assuming this can help to recover + conn.emit('error', new Error(`publish_to_cluster: disconnect on error ${err.message} ${conn.connid}`)); + // throw the original error so that callers will receive the root cause reason + throw err; }) ) .then(res => ({ @@ -41,7 +50,9 @@ function unregister_from_alerts(req) { function publish_alerts(req) { const connections = alerts_conn_set.list(); - dbg.log3('publish_alerts:', req.rpc_params.request_params, connections.length); + dbg.log3('publish_alerts:', + req.rpc_params.request_params, + _.map(connections, 'connid')); return P.map(connections, conn => server_rpc.client.frontend_notifications.alert(req.rpc_params.request_params, { connection: conn, diff --git a/src/server/utils/clustering_utils.js b/src/server/utils/clustering_utils.js index db4ac0bcee..ddaa6f5633 100644 --- a/src/server/utils/clustering_utils.js +++ b/src/server/utils/clustering_utils.js @@ -255,15 +255,19 @@ function get_member_upgrade_status(ip) { function send_master_update(is_master, master_address) { let system = system_store.data.systems[0]; if (!system) return P.resolve(); - let hosted_agents_promise = is_master ? server_rpc.client.hosted_agents.start() : server_rpc.client.hosted_agents.stop(); - let update_master_promise = _.isUndefined(master_address) ? P.resolve() : server_rpc.client.redirector.publish_to_cluster({ - method_api: 'server_inter_process_api', - method_name: 'update_master_change', - target: '', // required but irrelevant - request_params: { - master_address: master_address - } - }); + let hosted_agents_promise = is_master ? + server_rpc.client.hosted_agents.start() : + server_rpc.client.hosted_agents.stop(); + let update_master_promise = _.isUndefined(master_address) ? + P.resolve() : + server_rpc.client.redirector.publish_to_cluster({ + method_api: 'server_inter_process_api', + method_name: 'update_master_change', + target: '', // required but irrelevant + request_params: { + master_address: master_address + } + }); return P.join( server_rpc.client.system.set_webserver_master_state({ is_master: is_master