@@ -12,6 +12,7 @@ import (
12
12
"errors"
13
13
"fmt"
14
14
"net"
15
+ "sync"
15
16
"time"
16
17
17
18
model "github.com/DataDog/agent-payload/v5/process"
@@ -62,6 +63,7 @@ type npCollectorImpl struct {
62
63
workers int
63
64
stopChan chan struct {}
64
65
flushLoopDone chan struct {}
66
+ workersDone chan struct {}
65
67
runDone chan struct {}
66
68
flushInterval time.Duration
67
69
@@ -122,6 +124,7 @@ func newNpCollectorImpl(epForwarder eventplatform.Forwarder, collectorConfigs *c
122
124
stopChan : make (chan struct {}),
123
125
runDone : make (chan struct {}),
124
126
flushLoopDone : make (chan struct {}),
127
+ workersDone : make (chan struct {}),
125
128
126
129
runTraceroute : runTraceroute ,
127
130
statsdClient : statsd ,
@@ -274,7 +277,7 @@ func (s *npCollectorImpl) start() error {
274
277
275
278
go s .listenPathtests ()
276
279
go s .flushLoop ()
277
- s .startWorkers ()
280
+ go s .startWorkers ()
278
281
279
282
return nil
280
283
}
@@ -286,6 +289,7 @@ func (s *npCollectorImpl) stop() {
286
289
}
287
290
close (s .stopChan )
288
291
<- s .flushLoopDone
292
+ <- s .workersDone
289
293
<- s .runDone
290
294
s .running = false
291
295
}
@@ -496,13 +500,24 @@ func (s *npCollectorImpl) getReverseDNSResult(ipAddr string, results map[string]
496
500
497
501
func (s * npCollectorImpl ) startWorkers () {
498
502
s .logger .Debugf ("Starting workers (%d)" , s .workers )
503
+
504
+ // TODO: TEST ME
505
+ // TODO: TEST ME
506
+ // TODO: TEST ME
507
+ var wg sync.WaitGroup
499
508
for w := 0 ; w < s .workers ; w ++ {
509
+ wg .Add (1 )
500
510
s .logger .Debugf ("Starting worker #%d" , w )
501
- go s .startWorker (w )
511
+ go func () {
512
+ defer wg .Done ()
513
+ s .runWorker (w )
514
+ }()
502
515
}
516
+ wg .Wait ()
517
+ s .workersDone <- struct {}{}
503
518
}
504
519
505
- func (s * npCollectorImpl ) startWorker (workerID int ) {
520
+ func (s * npCollectorImpl ) runWorker (workerID int ) {
506
521
for {
507
522
select {
508
523
case <- s .stopChan :
0 commit comments