From 239e6d7cd99768a078d51fffec0c8afe6b86ffda Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Fri, 7 Feb 2025 14:31:47 +1100 Subject: [PATCH 1/2] Make it so the log streamer can be signaled to stop immediately Currently if the agent is sent a sig term the log streamer will wait for all chunks to be uploaded, and even if a second sig term is sent we still wait. Other components of the agent honor the a graceful=false flag which allows operators to force the agent to stop. This change adds that graceful option to the log streamer stop. --- agent/log_streamer.go | 26 +++++++++++++++++++++++--- agent/run_job.go | 3 ++- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/agent/log_streamer.go b/agent/log_streamer.go index 7feee2acc6..a52c11855a 100644 --- a/agent/log_streamer.go +++ b/agent/log_streamer.go @@ -66,6 +66,9 @@ type LogStreamer struct { // Have we stopped? stopped bool + + // have we been asked to exit immediately? + exitImmediately bool } // NewLogStreamer creates a new instance of the log streamer. @@ -161,7 +164,7 @@ func (ls *LogStreamer) Process(ctx context.Context, output []byte) error { } // Stop stops the streamer. -func (ls *LogStreamer) Stop() { +func (ls *LogStreamer) Stop(graceful bool) { ls.processMutex.Lock() if ls.stopped { ls.processMutex.Unlock() @@ -171,8 +174,13 @@ func (ls *LogStreamer) Stop() { close(ls.queue) ls.processMutex.Unlock() - ls.logger.Debug("[LogStreamer] Waiting for workers to shut down") - ls.workerWG.Wait() + if graceful { + ls.workerWG.Wait() + ls.logger.Info("[LogStreamer] Waiting for workers to shut down and outstanding chunks to be uploaded") + } else { + ls.exitImmediately = true + ls.logger.Warn("[LogStreamer] NOT waiting for outstanding chunks to be uploaded") + } } // The actual log streamer worker @@ -189,6 +197,15 @@ func (ls *LogStreamer) worker(ctx context.Context, id int) { for { setStat("⌚️ Waiting for a chunk") + if len(ls.queue) > 0 { + ls.logger.Debug("[LogStreamer/Worker#%d] Queue length: %d", id, len(ls.queue)) + } + + if ls.exitImmediately { + ls.logger.Warn("[LogStreamer/Worker#%d] Worker is shutting down immediately, Outstanding Queue length: %d", id, len(ls.queue)) + return + } + // Get the next chunk (pointer) from the queue. This will block // until something is returned. var chunk *api.Chunk @@ -201,6 +218,9 @@ func (ls *LogStreamer) worker(ctx context.Context, id int) { return } + // used to simulate a slow upload + // time.Sleep(30 * time.Second) + setStat("📨 Uploading chunk") // Upload the chunk diff --git a/agent/run_job.go b/agent/run_job.go index cbf212f7c9..c29a8fc17e 100644 --- a/agent/run_job.go +++ b/agent/run_job.go @@ -355,7 +355,7 @@ func (r *JobRunner) cleanup(ctx context.Context, wg *sync.WaitGroup, exit core.P r.logStreamer.Process(ctx, r.output.ReadAndTruncate()) // Stop the log streamer. This will block until all the chunks have been uploaded - r.logStreamer.Stop() + r.logStreamer.Stop(true) // Stop the header time streamer. This will block until all the chunks have been uploaded r.headerTimesStreamer.Stop() @@ -471,6 +471,7 @@ func (r *JobRunner) CancelAndStop() error { r.cancelLock.Lock() r.stopped = true r.cancelLock.Unlock() + r.logStreamer.Stop(false) return r.Cancel() } From 8a6b2bed4a390e34ec9956444f90c456ead099b3 Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Fri, 7 Feb 2025 14:56:20 +1100 Subject: [PATCH 2/2] Fix test, and add another verifying the streamer exits immediately --- agent/log_streamer_test.go | 46 +++++++++++++++++++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/agent/log_streamer_test.go b/agent/log_streamer_test.go index eebb464ffd..f5479ba519 100644 --- a/agent/log_streamer_test.go +++ b/agent/log_streamer_test.go @@ -46,7 +46,7 @@ func TestLogStreamer(t *testing.T) { t.Errorf("LogStreamer.Process(ctx, %q) = %v", input, err) } - ls.Stop() + ls.Stop(true) want := []*api.Chunk{ { @@ -94,3 +94,47 @@ func TestLogStreamer(t *testing.T) { t.Errorf("after Stop: LogStreamer.Process(ctx, %q) err = %v, want %v", input, err, errStreamerStopped) } } + +func TestLogStreamerExitImmediately(t *testing.T) { + t.Parallel() + ctx := context.Background() + + logger := logger.NewConsoleLogger( + logger.NewTextPrinter(os.Stderr), + func(c int) { t.Errorf("exit(%d)", c) }, + ) + + var mu sync.Mutex + var got []*api.Chunk + callback := func(ctx context.Context, chunk *api.Chunk) error { + mu.Lock() + got = append(got, chunk) + mu.Unlock() + return nil + } + + ls := NewLogStreamer(logger, callback, LogStreamerConfig{ + Concurrency: 3, + MaxChunkSizeBytes: 10, + MaxSizeBytes: 30, + }) + + if err := ls.Start(ctx); err != nil { + t.Fatalf("LogStreamer.Start(ctx) = %v", err) + } + + input := "0123456789abcdefghijklmnopqrstuvwxyz!@#$%^&*()" // 46 bytes + if err := ls.Process(ctx, []byte(input)); err != nil { + t.Errorf("LogStreamer.Process(ctx, %q) = %v", input, err) + } + + ls.Stop(false) + + if !ls.exitImmediately { + t.Errorf("LogStreamer.Stop(false) did not set exitImmediately") + } + + if len(got) > 0 { + t.Errorf("LogStreamer.Stop(false) did not exit immediately") + } +}