-
Notifications
You must be signed in to change notification settings - Fork 3
/
consumer_test.go
105 lines (84 loc) · 2.25 KB
/
consumer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package gokaf
import (
"context"
"testing"
"time"
)
func TestConsumerClose(t *testing.T) {
topicName := "testTopic"
topic := newTopic(context.Background(), mockLogger, topicName, 0)
// Mock handler function for the consumer
mockHandler := func(interface{}) {}
consumer := newConsumer(topic, mockLogger, mockHandler)
// Test: Consumer close
t.Run("ConsumerClose", func(t *testing.T) {
consumer.Stop()
// Wait for the Consumer to finish closing (WaitGroup counter to reach zero)
done := make(chan struct{})
go func() {
defer close(done)
consumer.wg.Wait()
}()
select {
case <-done:
// The consumer has finished closing
case <-time.After(time.Second):
t.Error("Timed out waiting for consumer to close")
}
})
}
func TestConsumerRun(t *testing.T) {
topicName := "testTopic"
topic := newTopic(context.Background(), mockLogger, topicName, 0)
sentMsg := "Go is awesome"
done := make(chan struct{})
// Mock handler function for the consumer
mockHandler := func(receivedMsg interface{}) {
defer close(done)
if receivedMsg != sentMsg {
t.Errorf("Expected message %v, got %v", sentMsg, receivedMsg)
}
}
consumer := newConsumer(topic, mockLogger, mockHandler)
// Test: Consumer close
t.Run("ConsumerRun", func(t *testing.T) {
consumer.Run()
go func() {
consumer.topic.channel.ch <- sentMsg
}()
select {
case <-done:
// The consumer has finished closing
case <-time.After(time.Second):
t.Error("Timed out waiting for consumer to close")
}
})
}
func TestConsumerStopAfterRun(t *testing.T) {
topicName := "testTopic"
topic := newTopic(context.Background(), mockLogger, topicName, 0)
sentMsg := "Go is awesome"
done := make(chan struct{})
// Mock handler function for the consumer
mockHandler := func(receivedMsg interface{}) {
close(done)
if receivedMsg != sentMsg {
t.Errorf("Expected message %v, got %v", sentMsg, receivedMsg)
}
}
consumer := newConsumer(topic, mockLogger, mockHandler)
// Test: Consumer close
t.Run("ConsumerRun", func(t *testing.T) {
consumer.Run()
go func() {
consumer.topic.channel.ch <- sentMsg
}()
select {
case <-done:
consumer.Stop()
consumer.wg.Wait()
case <-time.After(time.Second):
t.Error("Timed out waiting for consumer to close")
}
})
}