Skip to content

Commit

Permalink
calling channel.Cancel when worker is stopped (#149)
Browse files Browse the repository at this point in the history
  • Loading branch information
Guy Baron authored Aug 28, 2019
1 parent 4ad01a1 commit eef69fb
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 39 deletions.
8 changes: 8 additions & 0 deletions gbus/metrics/message_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ var (
rejectedMessages = newRejectedMessagesCounter()
)

//ResetRejectedMessagesCounter resets the counter intended to be used in tests only
func ResetRejectedMessagesCounter() {

prometheus.Unregister(rejectedMessages)
rejectedMessages = newRejectedMessagesCounter()
}

//ReportRejectedMessage reports a message being rejected to the metrics counter
func ReportRejectedMessage() {
rejectedMessages.Inc()
Expand All @@ -28,6 +35,7 @@ func GetRejectedMessagesValue() (float64, error) {
}

func newRejectedMessagesCounter() prometheus.Counter {

return promauto.NewCounter(prometheus.CounterOpts{
Namespace: grabbitPrefix,
Subsystem: "messages",
Expand Down
42 changes: 16 additions & 26 deletions gbus/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,14 @@ func (worker *worker) Start() error {

func (worker *worker) Stop() error {
worker.log().Info("stopping worker")
close(worker.stop) // worker.stop <- true
e1 := worker.channel.Cancel(worker.consumerTag, false)
e2 := worker.channel.Cancel(worker.consumerTag+"_rpc", false)
if e1 != nil {
return e1
}
if e2 != nil {
return e2
}
return nil
}

Expand Down Expand Up @@ -192,6 +199,7 @@ func (worker *worker) reject(requeue bool, delivery amqp.Delivery) error {
if !requeue {
metrics.ReportRejectedMessage()
}

worker.log().WithFields(logrus.Fields{"message_id": delivery.MessageId, "requeue": requeue}).Info("message rejected")
return err
}
Expand All @@ -205,34 +213,16 @@ func (worker *worker) isDead(delivery amqp.Delivery) bool {
}

func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) {
tx, txCreateErr := worker.txProvider.New()
if txCreateErr != nil {
worker.log().WithError(txCreateErr).Error("failed creating new tx")
worker.span.LogFields(slog.Error(txCreateErr))
_ = worker.reject(true, delivery)
return
}
err := metrics.RunHandlerWithMetric(func() error {
return worker.deadletterHandler(tx, &delivery)
}, worker.deadletterHandler.Name(), worker.log())

var reject bool
if err != nil {
worker.log().WithError(err).Error("failed handling deadletter")
worker.span.LogFields(slog.Error(err))
err = worker.SafeWithRetries(tx.Rollback, MaxRetryCount)
reject = true
} else {
err = worker.SafeWithRetries(tx.Commit, MaxRetryCount)
txWrapper := func(tx *sql.Tx) error {
handlerWrapper := func() error {
return worker.deadletterHandler(tx, &delivery)
}
return metrics.RunHandlerWithMetric(handlerWrapper, worker.deadletterHandler.Name(), worker.log())
}

err := worker.withTx(txWrapper)
if err != nil {
worker.log().WithError(err).Error("Rollback/Commit deadletter handler message")
worker.span.LogFields(slog.Error(err))
reject = true
}

if reject {
//we reject the deelivery but requeue it so the message will not be lost and recovered to the dlq
_ = worker.reject(true, delivery)
} else {
_ = worker.ack(delivery)
Expand Down
32 changes: 19 additions & 13 deletions tests/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,8 @@ func TestRPC(t *testing.T) {
}

func TestDeadlettering(t *testing.T) {
rejectedMessages, err := metrics.GetRejectedMessagesValue()
if err != nil {
t.Error("failed to get rejected messages value")
}
metrics.ResetRejectedMessagesCounter()

proceed := make(chan bool)
poison := gbus.NewBusMessage(PoisonMessage{})
service1 := createNamedBusForTest(testSvc1)
Expand Down Expand Up @@ -259,7 +257,7 @@ func TestDeadlettering(t *testing.T) {

<-proceed
count, _ := metrics.GetRejectedMessagesValue()
if count != rejectedMessages+1 {
if count != 1 {
t.Error("Should have one rejected message")
}

Expand Down Expand Up @@ -348,11 +346,7 @@ func TestReturnDeadToQueue(t *testing.T) {

func TestDeadLetterHandlerPanic(t *testing.T) {
proceed := make(chan bool)
rejectedMessages, err := metrics.GetRejectedMessagesValue()
if err != nil {
t.Error("failed to get rejected messages value")
}

metrics.ResetRejectedMessagesCounter()
poison := gbus.NewBusMessage(Command1{})
service1 := createBusWithConfig(testSvc1, "grabbit-dead", true, true,
gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0})
Expand All @@ -361,6 +355,14 @@ func TestDeadLetterHandlerPanic(t *testing.T) {
gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0})
visited := false
deadMessageHandler := func(tx *sql.Tx, poison *amqp.Delivery) error {
/*
this handler will be called more than once since when grabbit rejects
a message from a deadletter queue to rejects it with the requeu option set to
true and that is why this will be called more than once even though the retry count
is set to 0
*/

if !visited {
visited = true
panic("PANIC DEAD HANDLER aaahhh!!!!!!")
Expand All @@ -374,7 +376,7 @@ func TestDeadLetterHandlerPanic(t *testing.T) {
}

deadletterSvc.HandleDeadletter(deadMessageHandler)
err = service1.HandleMessage(Command1{}, faultyHandler)
err := service1.HandleMessage(Command1{}, faultyHandler)
if err != nil {
t.Error("failed to register faultyhandler")
}
Expand All @@ -388,8 +390,12 @@ func TestDeadLetterHandlerPanic(t *testing.T) {
select {
case <-proceed:
count, _ := metrics.GetRejectedMessagesValue()
if count != rejectedMessages+2 {
t.Error("Should have 2 rejected messages")
//we expect only 1 rejcted meessage from the counter since rejected messages that get
//requeued are not reported to the metric so the counter won't be increment when the message
//in the dlq gets rejected as it is rejected with the requeue option set to true
if count != 1 {

t.Errorf("Should have 1 rejected messages but was %v", count)
}
case <-time.After(2 * time.Second):
t.Fatal("timeout, dlq failed to reject message after handler panicked")
Expand Down

0 comments on commit eef69fb

Please sign in to comment.