Skip to content

Commit

Permalink
Report the number of tcp connections
Browse files Browse the repository at this point in the history
Signed-off-by: Anton Pryakhin <[email protected]>
  • Loading branch information
waldgange committed Aug 7, 2024
1 parent 6ec6a32 commit 2ceb0f5
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 45 deletions.
48 changes: 29 additions & 19 deletions src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include <bdlf_bind.h>
#include <bdlf_placeholder.h>
#include <bdlma_localsequentialallocator.h>
#include <bdlpcre_regex.h>
#include <bdlt_timeunitratio.h>
#include <bsl_algorithm.h>
#include <bsl_cstdlib.h>
Expand Down Expand Up @@ -100,6 +101,21 @@ const int k_SESSION_DESTROY_WAIT = 20;
const int k_CLIENT_CLOSE_WAIT = 20;
// Time to wait incrementally (in seconds) for all clients and
// proxies to be destroyed during stop sequence.
const char k_PORT_PATTERN[] = ":(\\d{1,5})";

bsl::string portFromUri(const bsl::string& endpoint)
{
bdlpcre::RegEx regEx;
bsl::string errorMessage;
size_t errorOffset;
std::vector<bsl::string_view> matchVector;

BSLS_ASSERT_SAFE(
0 == regEx.prepare(&errorMessage, &errorOffset, k_PORT_PATTERN));
BSLS_ASSERT_SAFE(0 == regEx.match(&matchVector, endpoint));

return bsl::string(matchVector[1]);
}

char calculateInitialMissedHbCounter(const mqbcfg::TcpInterfaceConfig& config)
{
Expand Down Expand Up @@ -280,36 +296,30 @@ TCPSessionFactory::channelStatContextCreator(
const bsl::shared_ptr<mwcio::Channel>& channel,
const bsl::shared_ptr<mwcio::StatChannelFactoryHandle>& handle)
{
mwcst::StatContext* parent = 0;

int peerAddress;
channel->properties().load(&peerAddress, k_CHANNEL_PROPERTY_PEER_IP);

ntsa::Ipv4Address ipv4Address(static_cast<bsl::uint32_t>(peerAddress));
ntsa::IpAddress ipAddress(ipv4Address);
if (!mwcio::ChannelUtil::isLocalHost(ipAddress)) {
parent = d_statController_p->channelsStatContext(
mqbstat::StatController::ChannelSelector::e_LOCAL);
}
else {
parent = d_statController_p->channelsStatContext(
mqbstat::StatController::ChannelSelector::e_REMOTE);
}
mqbstat::StatController::ChannelSelector::Enum selector =
mwcio::ChannelUtil::isLocalHost(ipAddress)
? mqbstat::StatController::ChannelSelector::e_REMOTE
: mqbstat::StatController::ChannelSelector::e_LOCAL;

BSLS_ASSERT_SAFE(parent);

bsl::string name;
bsl::string name, localPort;
if (handle->options().is<mwcio::ConnectOptions>()) {
name = handle->options().the<mwcio::ConnectOptions>().endpoint();
name = handle->options().the<mwcio::ConnectOptions>().endpoint();
localPort = portFromUri(channel->peerUri());
}
else {
name = channel->peerUri();
name = channel->peerUri();
localPort = portFromUri(
handle->options().the<mwcio::ListenOptions>().endpoint());
}

bdlma::LocalSequentialAllocator<2048> localAllocator(d_allocator_p);
mwcst::StatContextConfiguration statConfig(name, &localAllocator);

return parent->addSubcontext(statConfig);
return d_statController_p->addChannelStatContext(selector,
localPort,
name);
}

void TCPSessionFactory::negotiate(
Expand Down
53 changes: 53 additions & 0 deletions src/groups/mqb/mqbstat/mqbstat_statcontroller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include <bdlbb_blob.h>
#include <bdlf_bind.h>
#include <bdlf_placeholder.h>
#include <bdlma_localsequentialallocator.h>
#include <bdlmt_eventscheduler.h>
#include <bdlt_timeunitratio.h>
#include <bsl_algorithm.h>
Expand Down Expand Up @@ -81,6 +82,20 @@ const bsls::Types::Int64 k_NS_PER_MESSAGE = 15 *

const char k_PUBLISHINTERVAL_SUFFIX[] = ".PUBLISHINTERVAL";

void portsDeleter(
bsl::unordered_map<bsl::string, bslma::ManagedPtr<mwcst::StatContext> >*
map,
bslmt::Mutex* mutex,
const mwcst::StatContext& context)
{
// Lookup the port's StatContext and remove it from the 'map'
bslmt::LockGuard<bslmt::Mutex> guard(mutex); // LOCK
if (context.numSnapshots() != 0 && !context.isDeleted() &&
context.numSubcontexts() == 0) {
map->erase(context.name());
}
};

typedef bsl::unordered_set<mqbplug::PluginFactory*> PluginFactories;

/// Post on the optionally specified `semaphore`.
Expand Down Expand Up @@ -983,5 +998,43 @@ int StatController::processCommand(
return -1;
}

StatController::StatContextMp
StatController::addChannelStatContext(ChannelSelector::Enum selector,
const bsl::string& port,
const bsl::string& endpoint)
{
mwcst::StatContext* parent = channelsStatContext(selector);
BSLS_ASSERT_SAFE(parent);

bdlma::LocalSequentialAllocator<2048> localAllocator(d_allocator_p);
mwcst::StatContextConfiguration portConfig(port, &localAllocator);
mwcst::StatContextConfiguration statConfig(endpoint, &localAllocator);

bslma::ManagedPtr<mwcst::StatContext> channelStatContext;

{
bslmt::LockGuard<bslmt::Mutex> guard(&d_portsMutex); // LOCK
StatContextMap::iterator portIt = d_portsMap.find(port);

if (portIt == d_portsMap.end()) {
bslma::ManagedPtr<mwcst::StatContext> portStatContext =
parent->addSubcontext(
portConfig.storeExpiredSubcontextValues(true)
.preSnapshotCallback(
bdlf::BindUtil::bind(portsDeleter,
&d_portsMap,
&d_portsMutex,
bdlf::PlaceHolders::_1)));
channelStatContext = portStatContext->addSubcontext(statConfig);
d_portsMap.emplace(portStatContext->name(), portStatContext);
}
else {
channelStatContext = portIt->second->addSubcontext(statConfig);
}
}

return channelStatContext;
}

} // close package namespace
} // close enterprise namespace
30 changes: 22 additions & 8 deletions src/groups/mqb/mqbstat/mqbstat_statcontroller.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,15 @@ class StatController {

private:
// PRIVATE TYPES
typedef bslma::ManagedPtr<bdlmt::TimerEventScheduler> SchedulerMp;
typedef bslma::ManagedPtr<mwcst::StatContext> StatContextMp;
typedef bsl::shared_ptr<mwcst::StatContext> StatContextSp;
typedef bslma::ManagedPtr<mwcsys::StatMonitor> SystemStatMonitorMp;
typedef bslma::ManagedPtr<Printer> PrinterMp;
typedef bslma::ManagedPtr<JsonPrinter> JsonPrinterMp;
typedef bslma::ManagedPtr<mqbplug::StatPublisher> StatPublisherMp;
typedef bslma::ManagedPtr<mqbplug::StatConsumer> StatConsumerMp;
typedef bslma::ManagedPtr<bdlmt::TimerEventScheduler> SchedulerMp;
typedef bslma::ManagedPtr<mwcst::StatContext> StatContextMp;
typedef bsl::shared_ptr<mwcst::StatContext> StatContextSp;
typedef bslma::ManagedPtr<mwcsys::StatMonitor> SystemStatMonitorMp;
typedef bslma::ManagedPtr<Printer> PrinterMp;
typedef bslma::ManagedPtr<JsonPrinter> JsonPrinterMp;
typedef bslma::ManagedPtr<mqbplug::StatPublisher> StatPublisherMp;
typedef bslma::ManagedPtr<mqbplug::StatConsumer> StatConsumerMp;
typedef bsl::unordered_map<bsl::string, StatContextMp> StatContextMap;

/// Struct containing a statcontext and bool specifying if the
/// statcontext is managed.
Expand Down Expand Up @@ -189,6 +190,12 @@ class StatController {
/// 'channels' stat context
StatContextMp d_statContextChannelsRemote_mp;

/// Mutex for thread safety of the 'd_portsMap'
bslmt::Mutex d_portsMutex;

/// Map of all open ports to their StatContext's
StatContextMap d_portsMap;

/// System stat monitor (for cpu and
/// memory).
SystemStatMonitorMp d_systemStatMonitor_mp;
Expand Down Expand Up @@ -339,6 +346,13 @@ class StatController {
/// Retrieve the channels stat context corresponding to the specified
/// `selector`.
mwcst::StatContext* channelsStatContext(ChannelSelector::Enum selector);

/// Add a StatContext for the specified 'port' as a subcontext to the root
/// StatContext of local or remote channels, corresponding to the specified
/// 'selector'.
StatContextMp addChannelStatContext(ChannelSelector::Enum selector,
const bsl::string& port,
const bsl::string& endpoint);
};

// ============================================================================
Expand Down
3 changes: 2 additions & 1 deletion src/groups/mwc/mwcio/mwcio_statchannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ StatChannel::StatChannel(const StatChannelConfig& config,
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(config.d_statContext_sp);
d_config.d_statContext_sp->adjustValue(Stat::e_CONNECTIONS, 1);
}

StatChannel::~StatChannel()
{
// NOTHING
d_config.d_statContext_sp->adjustValue(Stat::e_CONNECTIONS, -1);
}

// MANIPULATORS
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mwc/mwcio/mwcio_statchannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class StatChannel : public DecoratingChannelPartialImp {
/// `mwcio::StatChannelFactory`).
struct Stat {
// TYPES
enum Enum { e_BYTES_IN = 0, e_BYTES_OUT = 1 };
enum Enum { e_BYTES_IN = 0, e_BYTES_OUT = 1, e_CONNECTIONS = 2 };
};

private:
Expand Down
24 changes: 24 additions & 0 deletions src/groups/mwc/mwcio/mwcio_statchannelfactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ StatChannelFactoryUtil::statContextConfiguration(const bsl::string& name,
config.isTable(true);
config.value("in_bytes")
.value("out_bytes")
.value("connections")
.storeExpiredSubcontextValues(true);

if (historySize != -1) {
Expand Down Expand Up @@ -268,6 +269,10 @@ void StatChannelFactoryUtil::initializeStatsTable(
StatChannel::Stat::e_BYTES_OUT,
mwcst::StatUtil::value,
start);
schema.addColumn("connections",
StatChannel::Stat::e_CONNECTIONS,
mwcst::StatUtil::value,
start);

if (!(end == mwcst::StatValue::SnapshotLocation())) {
schema.addColumn("in_bytes_delta",
Expand All @@ -280,6 +285,11 @@ void StatChannelFactoryUtil::initializeStatsTable(
mwcst::StatUtil::valueDifference,
start,
end);
schema.addColumn("connections_delta",
StatChannel::Stat::e_CONNECTIONS,
mwcst::StatUtil::valueDifference,
start,
end);
}

// Configure records
Expand Down Expand Up @@ -316,6 +326,14 @@ void StatChannelFactoryUtil::initializeStatsTable(
.printAsMemory();
}
tip->addColumn("out_bytes", "total").zeroString("").printAsMemory();

tip->setColumnGroup("Connections");
if (!(end == mwcst::StatValue::SnapshotLocation())) {
tip->addColumn("connections_delta", "delta")
.zeroString("")
.setPrecision(0);
}
tip->addColumn("connections", "total").setPrecision(0);
}

bsls::Types::Int64
Expand Down Expand Up @@ -352,6 +370,12 @@ StatChannelFactoryUtil::getValue(const mwcst::StatContext& context,
case Stat::e_BYTES_OUT_ABS: {
return STAT_SINGLE(value, StatChannel::Stat::e_BYTES_OUT);
}
case Stat::e_CONNECTIONS_DELTA: {
return STAT_RANGE(valueDifference, StatChannel::Stat::e_CONNECTIONS);
}
case Stat::e_CONNECTIONS_ABS: {
return STAT_SINGLE(value, StatChannel::Stat::e_CONNECTIONS);
}
default: {
BSLS_ASSERT_SAFE(false && "Attempting to access an unknown stat");
}
Expand Down
4 changes: 3 additions & 1 deletion src/groups/mwc/mwcio/mwcio_statchannelfactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ struct StatChannelFactoryUtil {
e_BYTES_IN_DELTA,
e_BYTES_IN_ABS,
e_BYTES_OUT_DELTA,
e_BYTES_OUT_ABS
e_BYTES_OUT_ABS,
e_CONNECTIONS_DELTA,
e_CONNECTIONS_ABS
};
};

Expand Down
9 changes: 9 additions & 0 deletions src/groups/mwc/mwcst/mwcst_statcontext.h
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,10 @@ class StatContext {
/// all sibling subcontexts.
int uniqueId() const;

/// Return te number of times this 'StatContext' had 'snapshot' called on
/// it.
bsls::Types::Int64 numSnapshots() const;

/// Return the number of subcontexts held by this `StatContext`
int numSubcontexts() const;

Expand Down Expand Up @@ -1133,6 +1137,11 @@ inline int StatContext::uniqueId() const
return d_uniqueId;
}

inline bsls::Types::Int64 StatContext::numSnapshots() const
{
return d_numSnapshots;
}

inline int StatContext::numSubcontexts() const
{
return static_cast<int>(d_subcontexts.size() +
Expand Down
Loading

0 comments on commit 2ceb0f5

Please sign in to comment.