-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjob.go
189 lines (153 loc) · 4.26 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package goller
import (
"errors"
"strconv"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/sqsiface"
"github.com/sirupsen/logrus"
)
// Job lets you interact with the SQS message.
type Job interface {
// Reader
Attribute(attr string) (string, bool)
ID() string
Body() (string, error)
Tries() (int64, error)
Handled() bool
// Writer
Delete() error
Release(secs int64) error
Backoff() error
}
// ErrAlreadyHandled means the job has already writen back to SQS it's status.
var ErrAlreadyHandled = errors.New("job already handled")
// NewJob creates a new job.
func NewJob(cfg *Config, logger *logrus.Logger, msg sqs.Message, svc sqsiface.SQSAPI) Job {
return &sqsJob{
cfg: cfg,
log: logger,
msg: msg,
svc: svc,
}
}
type sqsJob struct {
cfg *Config
handled bool
log *logrus.Logger
msg sqs.Message
svc sqsiface.SQSAPI
}
// Attribute looks for custom attributes set on the message by the sender.
// If nothing is found it will then look to SQS defined attributes.
func (j *sqsJob) Attribute(attr string) (string, bool) {
if a, ok := j.msg.MessageAttributes[attr]; ok {
return *a.StringValue, true
}
if a, ok := j.msg.Attributes[attr]; ok {
return a, true
}
return "", false
}
// ID is the unique ID given to it by SQS.
func (j *sqsJob) ID() string {
return *j.msg.MessageId
}
// Body is the payload of the job.
func (j *sqsJob) Body() (string, error) {
body := aws.StringValue(j.msg.Body)
if len(body) == 0 {
return "", errors.New("job body is empty")
}
return body, nil
}
// Tries returns the number of previous attempts to process the job.
// If a job is put back on the queue (un)intentionally this will increase.
func (j *sqsJob) Tries() (int64, error) {
count, ok := j.Attribute(string(sqs.MessageSystemAttributeNameApproximateReceiveCount))
if !ok {
return 0, errors.New("failed to get recieve count off sqs message")
}
tries, err := strconv.ParseInt(count, 10, 64)
if tries > 0 {
tries--
}
return tries, err
}
// Handled returns whether the Goller handler has successfully process the job.
func (j *sqsJob) Handled() bool {
return j.handled
}
// Delete the message from SQS.
func (j *sqsJob) Delete() error {
if j.handled {
return ErrAlreadyHandled
}
req := j.svc.DeleteMessageRequest(&sqs.DeleteMessageInput{
QueueUrl: aws.String(j.cfg.QueueURL),
ReceiptHandle: j.msg.ReceiptHandle,
})
start := time.Now()
_, err := req.Send()
sqsJobTimer.Set(time.Since(start).Seconds())
if err == nil {
j.handled = true
j.log.WithField("jid", j.ID()).Debug("job deleted")
}
return err
}
// Release the job back on to the SQS queue for the given number of seconds.
func (j *sqsJob) Release(secs int64) error {
if j.handled {
return ErrAlreadyHandled
}
if secs < j.cfg.Job.MinVisibilityTimeout {
j.log.WithFields(logrus.Fields{
"jid": j.ID(),
"requested": time.Duration(secs) * time.Second,
}).Debug("release time is below minimum")
secs = j.cfg.Job.MinVisibilityTimeout
}
if secs > j.cfg.Job.MaxVisibilityTimeout {
j.log.WithFields(logrus.Fields{
"jid": j.ID(),
"requested": time.Duration(secs) * time.Second,
}).Debug("release time is above maximum")
secs = j.cfg.Job.MaxVisibilityTimeout
}
// SQS states that the maximum is 43200 seconds, which equals 12 hours.
// However, it needs to be `< 43200` so enforce that here.
if secs >= int64((12 * time.Hour).Seconds()) {
secs--
}
req := j.svc.ChangeMessageVisibilityRequest(&sqs.ChangeMessageVisibilityInput{
QueueUrl: aws.String(j.cfg.QueueURL),
ReceiptHandle: j.msg.ReceiptHandle,
VisibilityTimeout: aws.Int64(secs),
})
start := time.Now()
_, err := req.Send()
sqsJobTimer.Set(time.Since(start).Seconds())
if err == nil {
j.handled = true
j.log.WithFields(logrus.Fields{
"jid": j.ID(),
"time": time.Duration(secs) * time.Second,
}).Debug("released job back to SQS")
}
return err
}
// Backoff will exponentially release the job back to the queue.
func (j *sqsJob) Backoff() error {
tries, err := j.Tries()
if err != nil {
return err
}
j.log.WithFields(logrus.Fields{
"jid": j.ID(),
"tries": tries,
}).Debug("released job back to SQS")
backoff := j.cfg.Job.BackoffCalc(tries)
return j.Release(backoff)
}