Skip to content

Commit d3c07b3

Browse files
committed
2 worker goroutines for sending
1 parent 545db61 commit d3c07b3

File tree

4 files changed

+132
-93
lines changed

4 files changed

+132
-93
lines changed

multiping.go

+6-93
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ package multiping
2121
import (
2222
"context"
2323
"math/rand"
24-
"net/netip"
2524
"sync"
2625
"time"
2726

@@ -64,6 +63,7 @@ type MultiPing struct {
6463
conn4 *icmp.PacketConn
6564
conn6 *icmp.PacketConn
6665
rxChan chan *pinger.Packet
66+
txChan chan *pinger.Packet
6767
}
6868

6969
func New(privileged bool) (*MultiPing, error) {
@@ -88,7 +88,7 @@ func New(privileged bool) (*MultiPing, error) {
8888
// try initialise connections to test that everything's working
8989
err := mp.restart()
9090
if err != nil {
91-
mp.close()
91+
mp.closeConnection()
9292
return nil, err
9393
}
9494

@@ -125,12 +125,13 @@ func (mp *MultiPing) restart() (err error) {
125125
}
126126

127127
mp.rxChan = make(chan *pinger.Packet)
128+
mp.txChan = make(chan *pinger.Packet)
128129

129130
return nil
130131
}
131132

132133
// closes active connections
133-
func (mp *MultiPing) close() {
134+
func (mp *MultiPing) closeConnection() {
134135
if mp.conn4 != nil {
135136
mp.conn4.Close()
136137
}
@@ -141,7 +142,8 @@ func (mp *MultiPing) close() {
141142

142143
// cleanup cannot be done in close, because some goroutines may be using struct members
143144
func (mp *MultiPing) cleanup() {
144-
// Close channels
145+
// Close rx channel.
146+
// Tx channel is closed in batchPrepareIcmp()
145147
close(mp.rxChan)
146148

147149
// invalidate connections
@@ -154,92 +156,3 @@ func (mp *MultiPing) cleanup() {
154156
// Invalidate IP address
155157
mp.pinger.SetIPAddr(nil)
156158
}
157-
158-
// Ping is blocking function and runs for mp.Timeout time and pings all hosts in data
159-
func (mp *MultiPing) Ping(data *pingdata.PingData) {
160-
if data.Count() == 0 {
161-
return
162-
}
163-
164-
// Lock the pinger - its instance may be reused by several clients
165-
mp.Lock()
166-
defer mp.Unlock()
167-
168-
err := mp.restart()
169-
if err != nil {
170-
return
171-
}
172-
173-
// Some subfunctions in goroutines will need this pointer to store ping results
174-
mp.pingData = data
175-
176-
mp.ctx, mp.cancel = context.WithTimeout(context.Background(), mp.Timeout)
177-
defer mp.cancel()
178-
179-
// This goroutine depends on rxChan and no need to add it to workgroup
180-
// It will terminate on channel close
181-
go mp.batchProcessPacket()
182-
183-
if mp.conn4 != nil {
184-
mp.wg.Add(1)
185-
mp.conn4.SetReadDeadline(time.Now().Add(mp.Timeout))
186-
go mp.batchRecvICMP(pinger.ProtocolIpv4)
187-
}
188-
if mp.conn6 != nil {
189-
mp.wg.Add(1)
190-
mp.conn6.SetReadDeadline(time.Now().Add(mp.Timeout))
191-
go mp.batchRecvICMP(pinger.ProtocolIpv6)
192-
}
193-
194-
// Sender goroutine
195-
mp.wg.Add(1)
196-
go func() {
197-
defer mp.wg.Done()
198-
mp.pingData.Iterate(func(addr netip.Addr, stats *pingdata.PingStats) {
199-
mp.pinger.SetIPAddr(&addr)
200-
stats.Send(mp.sequence)
201-
202-
mp.pinger.SendICMP(mp.sequence)
203-
time.Sleep(time.Millisecond)
204-
})
205-
}()
206-
207-
// wait for timeout and close connections
208-
<-mp.ctx.Done()
209-
mp.close()
210-
211-
// wait for all goroutines to terminate
212-
mp.wg.Wait()
213-
214-
mp.cleanup()
215-
}
216-
217-
func (mp *MultiPing) batchRecvICMP(proto pinger.ProtocolVersion) {
218-
defer func() {
219-
mp.wg.Done()
220-
}()
221-
222-
for {
223-
pkt, err := mp.pinger.RecvPacket(proto)
224-
if err != nil {
225-
return
226-
}
227-
228-
mp.rxChan <- pkt
229-
}
230-
}
231-
232-
// This function runs in goroutine and nobody is interested in return errors
233-
// Discard errors silently
234-
func (mp *MultiPing) batchProcessPacket() {
235-
for recv := range mp.rxChan {
236-
pingStats := mp.pinger.ParsePacket(recv)
237-
if pingStats.Tracker != mp.Tracker {
238-
continue
239-
}
240-
241-
if stats, ok := mp.pingData.Get(recv.Addr); ok {
242-
stats.Recv(pingStats.Seq, pingStats.RTT)
243-
}
244-
}
245-
}

ping.go

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package multiping
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/drgkaleda/go-multiping/pingdata"
8+
"github.com/drgkaleda/go-multiping/pinger"
9+
)
10+
11+
// Ping is blocking function and runs for mp.Timeout time and pings all hosts in data
12+
func (mp *MultiPing) Ping(data *pingdata.PingData) {
13+
if data.Count() == 0 {
14+
return
15+
}
16+
17+
// Lock the pinger - its instance may be reused by several clients
18+
mp.Lock()
19+
defer mp.Unlock()
20+
21+
err := mp.restart()
22+
if err != nil {
23+
return
24+
}
25+
26+
// Some subfunctions in goroutines will need this pointer to store ping results
27+
mp.pingData = data
28+
29+
mp.ctx, mp.cancel = context.WithTimeout(context.Background(), mp.Timeout)
30+
defer mp.cancel()
31+
32+
// This goroutine depends on rxChan and no need to add it to workgroup
33+
// It will terminate on channel close
34+
go mp.batchProcessPacket()
35+
36+
// 2 receiver goroutines: separate for IPv4 and IPv6
37+
if mp.conn4 != nil {
38+
mp.wg.Add(1)
39+
mp.conn4.SetReadDeadline(time.Now().Add(mp.Timeout))
40+
go mp.batchRecvICMP(pinger.ProtocolIpv4)
41+
}
42+
if mp.conn6 != nil {
43+
mp.wg.Add(1)
44+
mp.conn6.SetReadDeadline(time.Now().Add(mp.Timeout))
45+
go mp.batchRecvICMP(pinger.ProtocolIpv6)
46+
}
47+
48+
// 2 Sender goroutine workers:
49+
// one prepares message and other actually sends it
50+
mp.wg.Add(1)
51+
go mp.batchSendIcmp()
52+
go mp.batchPrepareIcmp()
53+
54+
// wait for timeout and close connections
55+
<-mp.ctx.Done()
56+
mp.closeConnection()
57+
58+
// wait for all goroutines to terminate and cleanup
59+
mp.wg.Wait()
60+
mp.cleanup()
61+
}

recv.go

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package multiping
2+
3+
import "github.com/drgkaleda/go-multiping/pinger"
4+
5+
func (mp *MultiPing) batchRecvICMP(proto pinger.ProtocolVersion) {
6+
defer func() {
7+
mp.wg.Done()
8+
}()
9+
10+
for {
11+
pkt, err := mp.pinger.RecvPacket(proto)
12+
if err != nil {
13+
return
14+
}
15+
16+
mp.rxChan <- pkt
17+
}
18+
}
19+
20+
// This function runs in goroutine and nobody is interested in return errors
21+
// Discard errors silently
22+
func (mp *MultiPing) batchProcessPacket() {
23+
for recv := range mp.rxChan {
24+
pingStats := mp.pinger.ParsePacket(recv)
25+
if pingStats.Tracker != mp.Tracker {
26+
continue
27+
}
28+
29+
if stats, ok := mp.pingData.Get(recv.Addr); ok {
30+
stats.Recv(pingStats.Seq, pingStats.RTT)
31+
}
32+
}
33+
}

send.go

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package multiping
2+
3+
import (
4+
"net/netip"
5+
6+
"github.com/drgkaleda/go-multiping/pingdata"
7+
)
8+
9+
func (mp *MultiPing) batchPrepareIcmp() {
10+
defer close(mp.txChan)
11+
12+
mp.pingData.Iterate(func(addr netip.Addr, stats *pingdata.PingStats) {
13+
pkt, err := mp.pinger.PrepareICMP(addr, mp.sequence)
14+
if err == nil {
15+
stats.Send(mp.sequence)
16+
mp.txChan <- pkt
17+
}
18+
})
19+
20+
}
21+
22+
func (mp *MultiPing) batchSendIcmp() {
23+
var err error
24+
defer mp.wg.Done()
25+
26+
for pkt := range mp.txChan {
27+
err = mp.pinger.SendPacket(pkt)
28+
if err != nil {
29+
break
30+
}
31+
}
32+
}

0 commit comments

Comments
 (0)