-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsync_consumer.go
114 lines (90 loc) · 2.96 KB
/
sync_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package postq
import (
"container/ring"
"fmt"
)
// SyncEventHandlerFunc processes a single event and ONLY makes db changes.
type SyncEventHandlerFunc func(Context, Event) error
type SyncEventConsumer struct {
eventLog *ring.Ring
// Name of the events in the push queue to watch for.
WatchEvents []string
// List of sync event handlers that process a single event one after another in order.
// All the handlers must succeed or else the event will be marked as failed.
Consumers []SyncEventHandlerFunc
// ConsumerOption is the configuration for the PGConsumer.
ConsumerOption *ConsumerOption
// EventFetcherOption contains configuration on how the events should be fetched.
EventFetchOption *EventFetcherOption
}
// RecordEvents will record all the events fetched by the consumer in a ring buffer.
func (t *SyncEventConsumer) RecordEvents(size int) {
t.eventLog = ring.New(size)
}
func (t SyncEventConsumer) GetRecords() ([]Event, error) {
if t.eventLog == nil {
return nil, fmt.Errorf("event log is not initialized")
}
return getRecords(t.eventLog), nil
}
func (t SyncEventConsumer) EventConsumer() (*PGConsumer, error) {
return NewPGConsumer(t.Handle, t.ConsumerOption)
}
func (t *SyncEventConsumer) Handle(ctx Context) (int, error) {
event, err := t.consumeEvent(ctx)
if err != nil {
if event == nil {
return 0, err
}
event.Attempts++
event.SetError(err.Error())
const query = `UPDATE event_queue SET error=$1, attempts=$2, last_attempt=NOW() WHERE id=$3`
if _, err := ctx.Pool().Exec(ctx, query, event.Error, event.Attempts, event.ID); err != nil {
ctx.Debugf("error saving event attempt updates to event_queue: %v\n", err)
}
}
var eventCount int
if event != nil {
eventCount = 1
}
return eventCount, err
}
// consumeEvent fetches a single event and passes it to all the consumers in one single transaction.
func (t *SyncEventConsumer) consumeEvent(ctx Context) (*Event, error) {
tx, err := ctx.Pool().Begin(ctx)
if err != nil {
return nil, fmt.Errorf("error initiating db tx: %w", err)
}
defer tx.Rollback(ctx) //nolint:errcheck
events, err := fetchEvents(ctx, tx, t.WatchEvents, 1, t.EventFetchOption)
if err != nil {
return nil, fmt.Errorf("error fetching events: %w", err)
}
if len(events) == 0 {
return nil, nil
}
// sync consumers always fetch a single event at a time
event := events[0]
if t.eventLog != nil {
t.eventLog.Value = event
t.eventLog = t.eventLog.Next()
}
for _, syncConsumer := range t.Consumers {
if err := syncConsumer(ctx, event); err != nil {
return &event, err
}
}
return &event, tx.Commit(ctx)
}
// SyncHandlers converts the given user defined handlers into sync event handlers.
func SyncHandlers[T Context](fn ...func(ctx T, e Event) error) []SyncEventHandlerFunc {
var syncHandlers []SyncEventHandlerFunc
for i := range fn {
f := fn[i]
syncHandler := func(ctx Context, e Event) error {
return f(ctx.(T), e)
}
syncHandlers = append(syncHandlers, syncHandler)
}
return syncHandlers
}