Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat[MQB, MWC]: report the number of tcp connections #384

Merged
merged 12 commits into from
Sep 19, 2024
113 changes: 96 additions & 17 deletions src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ namespace BloombergLP {
namespace mqbnet {

const char* TCPSessionFactory::k_CHANNEL_PROPERTY_PEER_IP = "tcp.peer.ip";
const char* TCPSessionFactory::k_CHANNEL_PROPERTY_LOCAL_PORT =
"tcp.local.port";
const char* TCPSessionFactory::k_CHANNEL_PROPERTY_CHANNEL_ID =
"channelpool.channel.id";
const char* TCPSessionFactory::k_CHANNEL_STATUS_CLOSE_REASON =
Expand Down Expand Up @@ -280,36 +282,35 @@ TCPSessionFactory::channelStatContextCreator(
const bsl::shared_ptr<mwcio::Channel>& channel,
const bsl::shared_ptr<mwcio::StatChannelFactoryHandle>& handle)
{
mwcst::StatContext* parent = 0;

int peerAddress;
channel->properties().load(&peerAddress, k_CHANNEL_PROPERTY_PEER_IP);

ntsa::Ipv4Address ipv4Address(static_cast<bsl::uint32_t>(peerAddress));
ntsa::IpAddress ipAddress(ipv4Address);
if (!mwcio::ChannelUtil::isLocalHost(ipAddress)) {
parent = d_statController_p->channelsStatContext(
mqbstat::StatController::ChannelSelector::e_LOCAL);
}
else {
parent = d_statController_p->channelsStatContext(
mqbstat::StatController::ChannelSelector::e_REMOTE);
}
mqbstat::StatController::ChannelSelector::Enum selector =
mwcio::ChannelUtil::isLocalHost(ipAddress)
? mqbstat::StatController::ChannelSelector::e_REMOTE
: mqbstat::StatController::ChannelSelector::e_LOCAL;

mwcst::StatContext* parent = d_statController_p->channelsStatContext(
selector);
waldgange marked this conversation as resolved.
Show resolved Hide resolved
BSLS_ASSERT_SAFE(parent);

bsl::string name;
bsl::string endpoint(d_allocator_p), port(d_allocator_p);
if (handle->options().is<mwcio::ConnectOptions>()) {
name = handle->options().the<mwcio::ConnectOptions>().endpoint();
endpoint = handle->options().the<mwcio::ConnectOptions>().endpoint();
port = d_ports.extract(channel->peerUri());
}
else {
name = channel->peerUri();
endpoint = channel->peerUri();
port = d_ports.extract(
handle->options().the<mwcio::ListenOptions>().endpoint());
}
channel->properties().set(TCPSessionFactory::k_CHANNEL_PROPERTY_LOCAL_PORT,
port);

bdlma::LocalSequentialAllocator<2048> localAllocator(d_allocator_p);
mwcst::StatContextConfiguration statConfig(name, &localAllocator);

return parent->addSubcontext(statConfig);
bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex); // LOCK
return d_ports.addChannelContext(parent, endpoint, port);
}

void TCPSessionFactory::negotiate(
Expand Down Expand Up @@ -682,6 +683,9 @@ void TCPSessionFactory::channelStateCallback(
else {
// Keep track of active channels, for logging purposes
++d_nbActiveChannels;
{
bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex); // LOCK
}

// Register as observer of the channel to get the 'onClose'
channel->onClose(bdlf::BindUtil::bind(
Expand Down Expand Up @@ -714,6 +718,11 @@ void TCPSessionFactory::onClose(const bsl::shared_ptr<mwcio::Channel>& channel,
{
--d_nbActiveChannels;

bslstl::StringRef port;
channel->properties().load(
&port,
TCPSessionFactory::k_CHANNEL_PROPERTY_LOCAL_PORT);

ChannelInfoSp channelInfo;
{
// Lookup the session and remove it from internal map
Expand All @@ -724,6 +733,7 @@ void TCPSessionFactory::onClose(const bsl::shared_ptr<mwcio::Channel>& channel,
channelInfo = it->second;
d_channels.erase(it);
}
d_ports.deleteChannelContext(port);
} // close mutex lock guard // UNLOCK

if (!channelInfo) {
Expand Down Expand Up @@ -891,6 +901,7 @@ TCPSessionFactory::TCPSessionFactory(
, d_noSessionCondition(bsls::SystemClockType::e_MONOTONIC)
, d_noClientCondition(bsls::SystemClockType::e_MONOTONIC)
, d_channels(allocator)
, d_ports(allocator)
, d_heartbeatSchedulerActive(false)
, d_heartbeatChannels(allocator)
, d_initialMissedHeartbeatCounter(calculateInitialMissedHbCounter(config))
Expand Down Expand Up @@ -1414,5 +1425,73 @@ bool TCPSessionFactory::isEndpointLoopback(const bslstl::StringRef& uri) const
mwcio::ChannelUtil::isLocalHost(endpoint.host());
}

// ------------------------------------
// class TCPSessionFactory::PortManager
// ------------------------------------

TCPSessionFactory::PortManager::PortManager(bslma::Allocator* allocator)
: d_portMap(allocator)
, d_regex(allocator)
, d_allocator_p(allocator)
{
const char pattern[] = ":(\\d{1,5})$";
bsl::string error(allocator);
size_t errorOffset;
BSLA_MAYBE_UNUSED const int rc = d_regex.prepare(
&error,
&errorOffset,
pattern,
bdlpcre::RegEx::k_FLAG_JIT);
BSLS_ASSERT_SAFE(rc == 0);
BSLS_ASSERT_SAFE(d_regex.isPrepared());
}

bslma::ManagedPtr<mwcst::StatContext>
TCPSessionFactory::PortManager::addChannelContext(mwcst::StatContext* parent,
const bsl::string& endpoint,
const bsl::string& port)
{
bdlma::LocalSequentialAllocator<2048> localAllocator(d_allocator_p);
mwcst::StatContextConfiguration statConfig(endpoint, &localAllocator);
waldgange marked this conversation as resolved.
Show resolved Hide resolved

bslma::ManagedPtr<mwcst::StatContext> channelStatContext;

PortMap::iterator portIt = d_portMap.find(port);

if (portIt != d_portMap.end()) {
channelStatContext = portIt->second.d_portContext->addSubcontext(
statConfig);
++portIt->second.d_numChannels;
}
else {
mwcst::StatContextConfiguration portConfig(port, &localAllocator);
bsl::shared_ptr<mwcst::StatContext> portStatContext =
parent->addSubcontext(
portConfig.storeExpiredSubcontextValues(true));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we convert bslma::ManagedPtr to bsl::shared_ptr. How does the ownership transfer works in this situiation? Is this operation safe? Are we sure that the underlying StatContext is not accidentally destroyed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this is a common practice:

bsl::shared_ptr<mwcst::StatContext> statContext(
d_config.d_statContextCreator(channel, handleSp));

d_partitionsStatContexts.emplace_back(
bsl::shared_ptr<mwcst::StatContext>(
d_statContext_mp->addSubcontext(
mwcst::StatContextConfiguration(partitionName,
&localAllocator))));

StatContextMp statContextMp =
d_clusterData.clusterNodesStatContext()->addSubcontext(config);
StatContextSp statContextSp(statContextMp, d_allocator_p);

and so on

channelStatContext = portStatContext->addSubcontext(statConfig);
d_portMap.emplace(port, PortContext({portStatContext, 1}));
}

return channelStatContext;
}

void TCPSessionFactory::PortManager::deleteChannelContext(
const bsl::string& port)
{
// Lookup the port's StatContext and remove it from the internal containers
PortMap::iterator it = d_portMap.find(port);
if (it != d_portMap.end() && --it->second.d_numChannels == 0) {
d_portMap.erase(it);
}
}

bsl::string_view
TCPSessionFactory::PortManager::extract(const bsl::string& endpoint) const
{
bsl::string_view result;

return d_regex.match(&result, endpoint) == 0 ? result.substr(1) : result;
}

} // close package namespace
} // close enterprise namespace
46 changes: 46 additions & 0 deletions src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
// BDE
#include <bdlbb_blob.h>
#include <bdlmt_eventscheduler.h>
#include <bdlpcre_regex.h>
waldgange marked this conversation as resolved.
Show resolved Hide resolved
#include <bsl_functional.h>
#include <bsl_memory.h>
#include <bsl_ostream.h>
Expand Down Expand Up @@ -137,6 +138,9 @@ class TCPSessionFactory {
/// Name of a property set on the channel representing the peer's IP.
static const char* k_CHANNEL_PROPERTY_PEER_IP;

/// Name of a property set on the channel representing the local port.
static const char* k_CHANNEL_PROPERTY_LOCAL_PORT;

/// Name of a property set on the channel representing the BTE channel
/// id.
static const char* k_CHANNEL_PROPERTY_CHANNEL_ID;
Expand Down Expand Up @@ -214,6 +218,45 @@ class TCPSessionFactory {
// scheduler thread.
};

/// This class provides mechanism to store a map of port stat contexts.
class PortManager {
public:
// PUBLIC TYPES
struct PortContext {
bsl::shared_ptr<mwcst::StatContext> d_portContext;
bsl::size_t d_numChannels;
};
typedef bsl::unordered_map<bsl::string, PortContext> PortMap;

private:
// PRIVATE DATA

PortMap d_portMap;
// A map of all ports

// Regex used to find partitionId.
bdlpcre::RegEx d_regex;

bslma::Allocator* d_allocator_p;
// Allocator to use
waldgange marked this conversation as resolved.
Show resolved Hide resolved

public:
// CREATORS
explicit PortManager(bslma::Allocator* allocator = 0);

// PUBLIC METHODS
bslma::ManagedPtr<mwcst::StatContext>
addChannelContext(mwcst::StatContext* parent,
const bsl::string& endpoint,
const bsl::string& port);

void deleteChannelContext(const bsl::string& port);

/// Parse the specified `endpoint` string and try to find the
/// port inside. Return the port on success, assert on fail.
bsl::string_view extract(const bsl::string& endpoint) const;
};

typedef bsl::shared_ptr<ChannelInfo> ChannelInfoSp;

/// Map associating a `Channel` to its corresponding `ChannelInfo` (as
Expand Down Expand Up @@ -319,6 +362,9 @@ class TCPSessionFactory {
ChannelMap d_channels;
// Map of all active channels

PortManager d_ports;
// Manager of all open ports

bool d_heartbeatSchedulerActive;
// True if the recurring
// heartbeat check event is
Expand Down
1 change: 1 addition & 0 deletions src/groups/mqb/mqbstat/mqbstat_statcontroller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include <bdlbb_blob.h>
#include <bdlf_bind.h>
#include <bdlf_placeholder.h>
#include <bdlma_localsequentialallocator.h>
#include <bdlmt_eventscheduler.h>
#include <bdlt_timeunitratio.h>
#include <bsl_algorithm.h>
Expand Down
19 changes: 11 additions & 8 deletions src/groups/mqb/mqbstat/mqbstat_statcontroller.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,17 @@ class StatController {

private:
// PRIVATE TYPES
typedef bslma::ManagedPtr<bdlmt::TimerEventScheduler> SchedulerMp;
typedef bslma::ManagedPtr<mwcst::StatContext> StatContextMp;
typedef bsl::shared_ptr<mwcst::StatContext> StatContextSp;
typedef bslma::ManagedPtr<mwcsys::StatMonitor> SystemStatMonitorMp;
typedef bslma::ManagedPtr<Printer> PrinterMp;
typedef bslma::ManagedPtr<JsonPrinter> JsonPrinterMp;
typedef bslma::ManagedPtr<mqbplug::StatPublisher> StatPublisherMp;
typedef bslma::ManagedPtr<mqbplug::StatConsumer> StatConsumerMp;
typedef bslma::ManagedPtr<bdlmt::TimerEventScheduler> SchedulerMp;
typedef bslma::ManagedPtr<mwcst::StatContext> StatContextMp;
typedef bsl::shared_ptr<mwcst::StatContext> StatContextSp;
typedef bslma::ManagedPtr<mwcsys::StatMonitor> SystemStatMonitorMp;
typedef bslma::ManagedPtr<Printer> PrinterMp;
typedef bslma::ManagedPtr<JsonPrinter> JsonPrinterMp;
typedef bslma::ManagedPtr<mqbplug::StatPublisher> StatPublisherMp;
typedef bslma::ManagedPtr<mqbplug::StatConsumer> StatConsumerMp;
typedef bsl::list<StatContextMp> StatContextList;
typedef StatContextList::iterator StatContextIt;
typedef bsl::unordered_map<bsl::string, StatContextIt> StatContextMap;

/// Struct containing a statcontext and bool specifying if the
/// statcontext is managed.
Expand Down
3 changes: 2 additions & 1 deletion src/groups/mwc/mwcio/mwcio_statchannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ StatChannel::StatChannel(const StatChannelConfig& config,
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(config.d_statContext_sp);
d_config.d_statContext_sp->adjustValue(Stat::e_CONNECTIONS, 1);
}

StatChannel::~StatChannel()
{
// NOTHING
d_config.d_statContext_sp->adjustValue(Stat::e_CONNECTIONS, -1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do per-channel stat contexts, then the number of connections will always be 0 or 1, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, I utilize the aggregation functionality Of StatContextTable here in order to deal only with the Channel's StatContext here. Otherways I would have to deal with its parent (port's StatContext), and, hence, store its shared_ptr here in StatChannel class

}

// MANIPULATORS
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mwc/mwcio/mwcio_statchannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class StatChannel : public DecoratingChannelPartialImp {
/// `mwcio::StatChannelFactory`).
struct Stat {
// TYPES
enum Enum { e_BYTES_IN = 0, e_BYTES_OUT = 1 };
enum Enum { e_BYTES_IN = 0, e_BYTES_OUT = 1, e_CONNECTIONS = 2 };
};

private:
Expand Down
24 changes: 24 additions & 0 deletions src/groups/mwc/mwcio/mwcio_statchannelfactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ StatChannelFactoryUtil::statContextConfiguration(const bsl::string& name,
config.isTable(true);
config.value("in_bytes")
.value("out_bytes")
.value("connections")
.storeExpiredSubcontextValues(true);

if (historySize != -1) {
Expand Down Expand Up @@ -268,6 +269,10 @@ void StatChannelFactoryUtil::initializeStatsTable(
StatChannel::Stat::e_BYTES_OUT,
mwcst::StatUtil::value,
start);
schema.addColumn("connections",
StatChannel::Stat::e_CONNECTIONS,
mwcst::StatUtil::value,
start);

if (!(end == mwcst::StatValue::SnapshotLocation())) {
schema.addColumn("in_bytes_delta",
Expand All @@ -280,6 +285,11 @@ void StatChannelFactoryUtil::initializeStatsTable(
mwcst::StatUtil::valueDifference,
start,
end);
schema.addColumn("connections_delta",
StatChannel::Stat::e_CONNECTIONS,
mwcst::StatUtil::valueDifference,
start,
end);
}

// Configure records
Expand Down Expand Up @@ -316,6 +326,14 @@ void StatChannelFactoryUtil::initializeStatsTable(
.printAsMemory();
}
tip->addColumn("out_bytes", "total").zeroString("").printAsMemory();

tip->setColumnGroup("Connections");
if (!(end == mwcst::StatValue::SnapshotLocation())) {
tip->addColumn("connections_delta", "delta")
.zeroString("")
.setPrecision(0);
}
tip->addColumn("connections", "total").setPrecision(0);
}

bsls::Types::Int64
Expand Down Expand Up @@ -352,6 +370,12 @@ StatChannelFactoryUtil::getValue(const mwcst::StatContext& context,
case Stat::e_BYTES_OUT_ABS: {
return STAT_SINGLE(value, StatChannel::Stat::e_BYTES_OUT);
}
case Stat::e_CONNECTIONS_DELTA: {
return STAT_RANGE(valueDifference, StatChannel::Stat::e_CONNECTIONS);
}
case Stat::e_CONNECTIONS_ABS: {
return STAT_SINGLE(value, StatChannel::Stat::e_CONNECTIONS);
}
default: {
BSLS_ASSERT_SAFE(false && "Attempting to access an unknown stat");
}
Expand Down
4 changes: 3 additions & 1 deletion src/groups/mwc/mwcio/mwcio_statchannelfactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ struct StatChannelFactoryUtil {
e_BYTES_IN_DELTA,
e_BYTES_IN_ABS,
e_BYTES_OUT_DELTA,
e_BYTES_OUT_ABS
e_BYTES_OUT_ABS,
e_CONNECTIONS_DELTA,
e_CONNECTIONS_ABS
};
};

Expand Down
9 changes: 9 additions & 0 deletions src/groups/mwc/mwcst/mwcst_statcontext.h
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,10 @@ class StatContext {
/// all sibling subcontexts.
int uniqueId() const;

/// Return the number of times this 'StatContext' had 'snapshot' called on
/// it.
bsls::Types::Int64 numSnapshots() const;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is useful, but also it's an interface change in MWC. Do we want to keep it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to check it in portsDeleter() as before the first snapshot numSubcontexts() returns zero, and I don't want to delete new StatContexts in this case. It happens because StatContext::addSubcontext() puts the newly created contexts to d_newSubcontexts vector.

d_newSubcontexts.push_back(newContext);

Then it moves them to d_statContext vector on next StatContext::snapshot()

However StatContext::numSubcontexts() doesn't take into account d_newSubcontext and will return 0 if there haven't been any stapshots yet.
inline int StatContext::numSubcontexts() const
{
return static_cast<int>(d_subcontexts.size() +
d_deletedSubcontexts.size());
}

So if we don't check the number of snapshots in portsDeleter() we can unintentionally delete the StatContext that has just been added.


/// Return the number of subcontexts held by this `StatContext`
int numSubcontexts() const;

Expand Down Expand Up @@ -1133,6 +1137,11 @@ inline int StatContext::uniqueId() const
return d_uniqueId;
}

inline bsls::Types::Int64 StatContext::numSnapshots() const
{
return d_numSnapshots;
}

inline int StatContext::numSubcontexts() const
{
return static_cast<int>(d_subcontexts.size() +
Expand Down
Loading
Loading