Skip to content

Commit

Permalink
Addressing review
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Sep 19, 2024
1 parent 1d20cad commit 670dde5
Show file tree
Hide file tree
Showing 32 changed files with 112 additions and 93 deletions.
2 changes: 1 addition & 1 deletion src/groups/bmq/bmqp/bmqp_schemaeventbuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ EncodingType::Enum SchemaEventBuilderUtil::bestEncodingSupported(
return EncodingType::e_BER; // RETURN
}

// If remote suppports BER, return BER
// If remote supports BER, return BER
if (bsl::find(encodingsSupported.cbegin(),
encodingsSupported.cend(),
bsl::string(EncodingFeature::k_ENCODING_BER)) !=
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqba/mqba_adminsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,11 +420,11 @@ void AdminSession::tearDown(const bsl::shared_ptr<void>& session,

void AdminSession::initiateShutdown(const ShutdownCb& callback,
const bsls::TimeInterval& timeout,
bool suppportShutdownV2)
bool supportShutdownV2)
{
// executed by the *ANY* thread
(void)timeout;
(void)suppportShutdownV2;
(void)supportShutdownV2;

dispatcher()->execute(
bdlf::BindUtil::bind(&AdminSession::initiateShutdownDispatched,
Expand Down
5 changes: 4 additions & 1 deletion src/groups/mqb/mqba/mqba_adminsession.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,13 @@ class AdminSession : public mqbnet::Session, public mqbi::DispatcherClient {
/// Initiate the shutdown of the session and invoke the specified
/// `callback` upon completion of (asynchronous) shutdown sequence or
/// if the specified `timeout` is expired.
/// The optional (temporary) specified 'supportShutdownV2' indicates
/// shutdown V2 logic which is not applicable to `AdminSession`
/// implementation.
void
initiateShutdown(const ShutdownCb& callback,
const bsls::TimeInterval& timeout,
bool suppportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE;
bool supportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE;

/// Make the session abandon any work it has.
void invalidate() BSLS_KEYWORD_OVERRIDE;
Expand Down
6 changes: 3 additions & 3 deletions src/groups/mqb/mqba/mqba_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,9 +463,9 @@ void Application::stop()
d_transportManager_mp->initiateShutdown();
BALL_LOG_INFO << "Stopped listening for new connections.";

bool suppportShutdownV2 = initiateShutdown();
bool supportShutdownV2 = initiateShutdown();

if (suppportShutdownV2) {
if (supportShutdownV2) {
BALL_LOG_INFO << ": Executing GRACEFUL_SHUTDOWN_V2";
}
else {
Expand All @@ -484,7 +484,7 @@ void Application::stop()
++clusterIt, --count) {
clusterIt.cluster()->initiateShutdown(
bdlf::BindUtil::bind(&bslmt::Latch::arrive, &latch),
suppportShutdownV2);
supportShutdownV2);
}
latch.wait();

Expand Down
8 changes: 6 additions & 2 deletions src/groups/mqb/mqba/mqba_application.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,12 @@ class Application {
/// Pendant operation of the `oneTimeInit` one.
void oneTimeShutdown();

/// Attempt to execute shutdown logic v2. Return 'true' if all nodes and
/// proxies support it.
/// Attempt to execute graceful shutdown logic v2.
///
/// If any node or proxy does not support the v2 graceful shutdown logic,
/// do not perform any shutdown actions and return `false`. Otherwise,
/// send v2 shutdown requests to all nodes, shutdown clients and proxies,
/// and return `true`.
bool initiateShutdown();

private:
Expand Down
8 changes: 4 additions & 4 deletions src/groups/mqb/mqba/mqba_clientsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ void ClientSession::onHandleConfiguredDispatched(
void ClientSession::initiateShutdownDispatched(
const ShutdownCb& callback,
const bsls::TimeInterval& timeout,
bool suppportShutdownV2)
bool supportShutdownV2)
{
// executed by the *CLIENT* dispatcher thread

Expand Down Expand Up @@ -889,7 +889,7 @@ void ClientSession::initiateShutdownDispatched(
return; // RETURN
}

if (suppportShutdownV2) {
if (supportShutdownV2) {
d_operationState = e_SHUTTING_DOWN_V2;
d_queueSessionManager.shutDown();

Expand Down Expand Up @@ -2868,7 +2868,7 @@ void ClientSession::tearDown(const bsl::shared_ptr<void>& session,

void ClientSession::initiateShutdown(const ShutdownCb& callback,
const bsls::TimeInterval& timeout,
bool suppportShutdownV2)
bool supportShutdownV2)
{
// executed by the *ANY* thread

Expand Down Expand Up @@ -2906,7 +2906,7 @@ void ClientSession::initiateShutdown(const ShutdownCb& callback,
d_self.acquire()),
callback,
timeout,
suppportShutdownV2),
supportShutdownV2),
this,
mqbi::DispatcherEventType::e_DISPATCHER);
// Use 'mqbi::DispatcherEventType::e_DISPATCHER' to avoid (re)enabling
Expand Down
9 changes: 5 additions & 4 deletions src/groups/mqb/mqba/mqba_clientsession.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ class ClientSession : public mqbnet::Session,
enum OperationState {
e_RUNNING // Running normally
,
// TEMPORARY, remove 'after switching to StopRequest V2
// TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest
// V2.
e_SHUTTING_DOWN // Shutting down due to 'initiateShutdown' request
,
e_SHUTTING_DOWN_V2 // Shutting down due to 'initiateShutdown' request
Expand Down Expand Up @@ -483,7 +484,7 @@ class ClientSession : public mqbnet::Session,
/// if the specified `timeout` is expired.
void initiateShutdownDispatched(const ShutdownCb& callback,
const bsls::TimeInterval& timeout,
bool suppportShutdownV2);
bool supportShutdownV2);

void invalidateDispatched();

Expand Down Expand Up @@ -687,14 +688,14 @@ class ClientSession : public mqbnet::Session,
/// Initiate the shutdown of the session and invoke the specified
/// `callback` upon completion of (asynchronous) shutdown sequence or
/// if the specified `timeout` is expired. If the optional (temporary)
/// specified 'suppportShutdownV2' is 'true' execute shutdown logic V2
/// specified 'supportShutdownV2' is 'true' execute shutdown logic V2
/// where upstream (not downstream) nodes deconfigure queues and the
/// shutting down node (not downstream) waits for CONFIRMS.
/// The shutdown is complete when 'tearDownAllQueuesDone'.
void
initiateShutdown(const ShutdownCb& callback,
const bsls::TimeInterval& timeout,
bool suppportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE;
bool supportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE;

/// Make the session abandon any work it has.
void invalidate() BSLS_KEYWORD_OVERRIDE;
Expand Down
18 changes: 8 additions & 10 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ void Cluster::processCommandDispatched(mqbcmd::ClusterResult* result,
}

void Cluster::initiateShutdownDispatched(const VoidFunctor& callback,
bool suppportShutdownV2)
bool supportShutdownV2)
{
// executed by the *DISPATCHER* thread

Expand All @@ -636,7 +636,7 @@ void Cluster::initiateShutdownDispatched(const VoidFunctor& callback,
d_clusterData.membership().setSelfNodeStatus(
bmqp_ctrlmsg::NodeStatus::E_STOPPING);

if (suppportShutdownV2) {
if (supportShutdownV2) {
d_clusterOrchestrator.queueHelper().requestToStopPushing();

bsls::TimeInterval whenToStop(
Expand All @@ -652,7 +652,8 @@ void Cluster::initiateShutdownDispatched(const VoidFunctor& callback,
bdlf::PlaceHolders::_1)); // completionCb
}
else {
// Temporary, remove after switching all to version 2
// TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest
// V2.
// Send StopRequest to all nodes and proxies. The peers are expected
// not to send any PUT msgs to this node after receiving StopRequest.
// For each queue for which this node is the primary, peers (replicas
Expand All @@ -671,7 +672,7 @@ void Cluster::initiateShutdownDispatched(const VoidFunctor& callback,

SessionSpVec sessions;
for (mqbnet::TransportManagerIterator sessIt(
&d_clusterData.transportManager());
&d_clusterData.transportManager());
sessIt;
++sessIt) {
bsl::shared_ptr<mqbnet::Session> sessionSp =
Expand All @@ -694,16 +695,13 @@ void Cluster::initiateShutdownDispatched(const VoidFunctor& callback,
}

if (mqbnet::ClusterUtil::isClient(negoMsg)) {
// if (!d_suppportShutdownV2) {
link.insert(bdlf::BindUtil::bind(
&mqbnet::Session::initiateShutdown,
sessionSp,
bdlf::PlaceHolders::_1, // completion callback
shutdownTimeout,
false));
// }
// else there is no need to de-confgiure queues and wait for
// unconfirmed since V2 upstreams do that on StopRequest V2

continue; // CONTINUE
}

Expand Down Expand Up @@ -2696,7 +2694,7 @@ int Cluster::start(bsl::ostream& errorDescription)
}

void Cluster::initiateShutdown(const VoidFunctor& callback,
bool suppportShutdownV2)
bool supportShutdownV2)
{
// executed by *ANY* thread

Expand All @@ -2709,7 +2707,7 @@ void Cluster::initiateShutdown(const VoidFunctor& callback,
bdlf::BindUtil::bind(&Cluster::initiateShutdownDispatched,
this,
callback,
suppportShutdownV2),
supportShutdownV2),
this);

// Wait for above event to complete. This is needed because
Expand Down
12 changes: 7 additions & 5 deletions src/groups/mqb/mqbblp/mqbblp_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,11 @@ class Cluster : public mqbi::Cluster,

/// Executed by dispatcher thread.
void initiateShutdownDispatched(const VoidFunctor& callback,
bool suppportShutdownV2);
bool supportShutdownV2);

// TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest
// V2.

// // Temporary, remove after switching all to version 2
/// Send stop request to proxies and nodes specified in `sessions` using
/// the specified `stopCb` as a callback to be called once all the
/// requests get responses.
Expand Down Expand Up @@ -569,13 +571,13 @@ class Cluster : public mqbi::Cluster,
/// Initiate the shutdown of the cluster. It is expected that `stop()`
/// will be called soon after this routine is invoked. Invoke the
/// specified `callback` upon completion of (asynchronous) shutdown
/// sequence. If the optional (temporary) specified 'suppportShutdownV2'
/// sequence. If the optional (temporary) specified 'supportShutdownV2'
/// is 'true' execute shutdown logic V2 where upstream (not downstream)
/// nodes deconfigure queues and he shutting down node (not downstream)
/// nodes deconfigure queues and the shutting down node (not downstream)
/// wait for CONFIRMS.
void
initiateShutdown(const VoidFunctor& callback,
bool suppportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE;
bool supportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE;

/// Stop the `Cluster`.
void stop() BSLS_KEYWORD_OVERRIDE;
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_clustercatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ class ClusterCatalog {

mqbnet::Session::AdminCommandEnqueueCb d_adminCb;
// Callback function to enqueue admin commands

RequestManagerType d_requestManager;
// Request manager to use

Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ void ClusterOrchestrator::processStopRequest(
<< ", current status: " << ns->nodeStatus()
<< ", new status: " << bmqp_ctrlmsg::NodeStatus::E_STOPPING;

// Temporary, remove after switching all to version 2
// TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest V2.
if (stopRequest.version() == 1 && stopRequest.clusterName() != name) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
BALL_LOG_ERROR << d_clusterData_p->identity().description()
Expand Down
13 changes: 7 additions & 6 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ void ClusterProxy::startDispatched()
}

void ClusterProxy::initiateShutdownDispatched(const VoidFunctor& callback,
bool suppportShutdownV2)
bool supportShutdownV2)
{
// executed by the *DISPATCHER* thread

Expand All @@ -171,7 +171,7 @@ void ClusterProxy::initiateShutdownDispatched(const VoidFunctor& callback,
// Mark self as stopping.
d_isStopping = true;

if (suppportShutdownV2) {
if (supportShutdownV2) {
d_queueHelper.requestToStopPushing();

bsls::TimeInterval whenToStop(
Expand All @@ -187,7 +187,8 @@ void ClusterProxy::initiateShutdownDispatched(const VoidFunctor& callback,
bdlf::PlaceHolders::_1)); // completionCb
}
else {
// Temporary, remove after switching all to version 2
// TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest
// V2.

// Fill the first link with client session shutdown operations
mwcu::OperationChainLink link(d_shutdownChain.allocator());
Expand All @@ -197,7 +198,7 @@ void ClusterProxy::initiateShutdownDispatched(const VoidFunctor& callback,
clusterProxyConfig()->queueOperations().shutdownTimeoutMs());

for (mqbnet::TransportManagerIterator sessIt(
&d_clusterData.transportManager());
&d_clusterData.transportManager());
sessIt;
++sessIt) {
bsl::shared_ptr<mqbnet::Session> sessionSp =
Expand Down Expand Up @@ -1150,7 +1151,7 @@ int ClusterProxy::start(BSLS_ANNOTATION_UNUSED bsl::ostream& errorDescription)
}

void ClusterProxy::initiateShutdown(const VoidFunctor& callback,
bool suppportShutdownV2)
bool supportShutdownV2)
{
// executed by *ANY* thread

Expand All @@ -1163,7 +1164,7 @@ void ClusterProxy::initiateShutdown(const VoidFunctor& callback,
bdlf::BindUtil::bind(&ClusterProxy::initiateShutdownDispatched,
this,
callback,
suppportShutdownV2),
supportShutdownV2),
this);

dispatcher()->synchronize(this);
Expand Down
12 changes: 6 additions & 6 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
/// be called when the shutdown is completed. This routine is invoked
/// in the cluster-dispatcher thread.
void initiateShutdownDispatched(const VoidFunctor& callback,
bool suppportShutdownV2 = false);
bool supportShutdownV2 = false);

/// Stop the `Cluster`.
void stopDispatched();
Expand Down Expand Up @@ -401,7 +401,7 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
void
processResponseDispatched(const bmqp_ctrlmsg::ControlMessage& response);

// Temporary, remove after switching all to version 2
// TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest V2.
/// Send stop request to proxies specified in `sessions` using the
/// specified `stopCb` as a callback to be called once all the requests
/// get responses.
Expand Down Expand Up @@ -459,13 +459,13 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
/// Initiate the shutdown of the cluster and invoke the specified
/// `callback` upon completion of (asynchronous) shutdown sequence. It
/// is expected that `stop()` will be called soon after this routine is
/// invoked. If the optional (temporary) specified 'suppportShutdownV2' is
/// invoked. If the optional (temporary) specified 'supportShutdownV2' is
/// 'true' execute shutdown logic V2 where upstream (not downstream) nodes
/// deconfigure queues and he shutting down node (not downstream) wait for
/// CONFIRMS.
/// deconfigure queues and the shutting down node (not downstream) wait
/// for CONFIRMS.
void
initiateShutdown(const VoidFunctor& callback,
bool suppportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE;
bool supportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE;

/// Stop the `Cluster`.
void stop() BSLS_KEYWORD_OVERRIDE;
Expand Down
Loading

0 comments on commit 670dde5

Please sign in to comment.