Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix network notifier startup #1994

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions collector/lib/CollectorConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ PathEnvVar tls_client_cert_path("ROX_COLLECTOR_TLS_CLIENT_CERT");
PathEnvVar tls_client_key_path("ROX_COLLECTOR_TLS_CLIENT_KEY");

BoolEnvVar disable_process_arguments("ROX_COLLECTOR_NO_PROCESS_ARGUMENTS", false);

IntEnvVar grpc_wait_time("ROX_COLLECTOR_GRPC_WAIT_TIME", 30);
} // namespace

constexpr bool CollectorConfig::kTurnOffScrape;
Expand Down Expand Up @@ -113,6 +115,7 @@ void CollectorConfig::InitCollectorConfig(CollectorArgs* args) {
enable_introspection_ = enable_introspection.value();
track_send_recv_ = track_send_recv.value();
disable_process_arguments_ = disable_process_arguments.value();
grpc_wait_time_ = grpc_wait_time.value();

for (const auto& syscall : kSyscalls) {
syscalls_.emplace_back(syscall);
Expand Down
2 changes: 2 additions & 0 deletions collector/lib/CollectorConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ class CollectorConfig {
unsigned int GetSinspTotalBufferSize() const { return sinsp_total_buffer_size_; }
unsigned int GetSinspThreadCacheSize() const { return sinsp_thread_cache_size_; }
bool DisableProcessArguments() const { return disable_process_arguments_; }
int GRPCWaitTime() const { return grpc_wait_time_; }

static std::pair<option::ArgStatus, std::string> CheckConfiguration(const char* config, Json::Value* root);

Expand Down Expand Up @@ -207,6 +208,7 @@ class CollectorConfig {
double connection_stats_error_;
unsigned int connection_stats_window_;
int64_t per_container_rate_limit_ = 1024;
int grpc_wait_time_;

// URL to the GRPC server
std::optional<std::string> grpc_server_;
Expand Down
5 changes: 3 additions & 2 deletions collector/lib/NetworkStatusNotifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ void NetworkStatusNotifier::Stop() {

void NetworkStatusNotifier::WaitUntilWriterStarted(IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>* writer, int wait_time_seconds) {
if (!writer->WaitUntilStarted(std::chrono::seconds(wait_time_seconds))) {
CLOG(ERROR) << "Failed to establish network connection info stream.";
CLOG(ERROR) << "Unable to establish network connection info stream.";
CLOG(FATAL) << "Failed to communicate with Sensor.";
return;
}

Expand Down Expand Up @@ -218,7 +219,7 @@ bool NetworkStatusNotifier::UpdateAllConnsAndEndpoints() {
}

void NetworkStatusNotifier::RunSingle(IDuplexClientWriter<sensor::NetworkConnectionInfoMessage>* writer) {
WaitUntilWriterStarted(writer, 10);
WaitUntilWriterStarted(writer, config_.GRPCWaitTime());

ConnMap old_conn_state;
AdvertisedEndpointMap old_cep_state;
Expand Down
4 changes: 2 additions & 2 deletions collector/lib/SignalServiceClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ bool SignalServiceClient::EstablishGRPCStreamSingle() {
// stream writer
context_ = MakeUnique<grpc::ClientContext>();
writer_ = DuplexClient::CreateWithReadsIgnored(&SignalService::Stub::AsyncPushSignals, channel_, context_.get());
if (!writer_->WaitUntilStarted(std::chrono::seconds(30))) {
CLOG(ERROR) << "Signal stream not ready after 30 seconds. Retrying ...";
if (!writer_->WaitUntilStarted(std::chrono::seconds(config_.GRPCWaitTime()))) {
CLOG(ERROR) << "Signal stream not ready after " << config_.GRPCWaitTime() << " seconds. Retrying ...";
CLOG(ERROR) << "Error message: " << writer_->FinishNow().error_message();
writer_.reset();
return true;
Expand Down
7 changes: 5 additions & 2 deletions collector/lib/SignalServiceClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "api/v1/signal.pb.h"
#include "internalapi/sensor/signal_iservice.grpc.pb.h"

#include "CollectorConfig.h"
#include "DuplexGRPC.h"
#include "SignalHandler.h"
#include "StoppableThread.h"
Expand All @@ -35,8 +36,8 @@ class SignalServiceClient : public ISignalServiceClient {
using SignalService = sensor::SignalService;
using SignalStreamMessage = sensor::SignalStreamMessage;

explicit SignalServiceClient(std::shared_ptr<grpc::Channel> channel)
: channel_(std::move(channel)), stream_active_(false) {}
explicit SignalServiceClient(std::shared_ptr<grpc::Channel> channel, CollectorConfig& config)
: channel_(std::move(channel)), stream_active_(false), config_(config) {}

void Start();
void Stop();
Expand All @@ -57,6 +58,8 @@ class SignalServiceClient : public ISignalServiceClient {
std::unique_ptr<grpc::ClientContext> context_;
std::unique_ptr<IDuplexClientWriter<SignalStreamMessage>> writer_;

CollectorConfig& config_;

bool first_write_;
};

Expand Down
2 changes: 1 addition & 1 deletion collector/lib/system-inspector/Service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void Service::Init(const CollectorConfig& config, std::shared_ptr<ConnectionTrac
}

if (config.grpc_channel) {
signal_client_.reset(new SignalServiceClient(std::move(config.grpc_channel)));
signal_client_.reset(new SignalServiceClient(std::move(config.grpc_channel), config));
} else {
signal_client_.reset(new StdoutSignalServiceClient());
}
Expand Down
Loading