diff --git a/gbus/abstractions.go b/gbus/abstractions.go index cf02687..3b3aa75 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -129,7 +129,7 @@ type Saga interface { //Deadlettering provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue type Deadlettering interface { - HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error) + HandleDeadletter(handler DeadLetterMessageHandler) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error } diff --git a/gbus/bus.go b/gbus/bus.go index 4443691..e3e6e8e 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -43,7 +43,7 @@ type DefaultBus struct { amqpOutbox *AMQPOutbox RPCHandlers map[string]MessageHandler - deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error + deadletterHandler DeadLetterMessageHandler HandlersLock *sync.Mutex RPCLock *sync.Mutex SenderLock *sync.Mutex @@ -548,8 +548,8 @@ func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler } //HandleDeadletter implements GBus.HandleDeadletter -func (b *DefaultBus) HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error) { - b.deadletterHandler = handler +func (b *DefaultBus) HandleDeadletter(handler DeadLetterMessageHandler) { + b.registerDeadLetterHandler(handler) } //ReturnDeadToQueue returns a message to its original destination @@ -691,6 +691,11 @@ func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Messag return nil } +func (b *DefaultBus) registerDeadLetterHandler(handler DeadLetterMessageHandler) { + metrics.AddHandlerMetrics(handler.Name()) + b.deadletterHandler = handler +} + func (b *DefaultBus) bindQueue(topic, exchange string) error { return b.ingressChannel.QueueBind(b.serviceQueue.Name, topic, exchange, false /*noWait*/, nil /*args*/) } diff --git a/gbus/message_handler.go b/gbus/message_handler.go index 67a7d6c..8111a87 100644 --- a/gbus/message_handler.go +++ b/gbus/message_handler.go @@ -1,6 +1,8 @@ package gbus import ( + "database/sql" + "github.com/streadway/amqp" "reflect" "runtime" "strings" @@ -9,9 +11,21 @@ import ( //MessageHandler signature for all command handlers type MessageHandler func(invocation Invocation, message *BusMessage) error +//DeadLetterMessageHandler signature for dead letter handler +type DeadLetterMessageHandler func(tx *sql.Tx, poison amqp.Delivery) error + //Name is a helper function returning the runtime name of the function bound to an instance of the MessageHandler type func (mg MessageHandler) Name() string { - funName := runtime.FuncForPC(reflect.ValueOf(mg).Pointer()).Name() + return nameFromFunc(mg) +} + +//Name is a helper function returning the runtime name of the function bound to an instance of the DeadLetterMessageHandler type +func (dlmg DeadLetterMessageHandler) Name() string { + return nameFromFunc(dlmg) +} + +func nameFromFunc(function interface{}) string { + funName := runtime.FuncForPC(reflect.ValueOf(function).Pointer()).Name() splits := strings.Split(funName, ".") fn := strings.Replace(splits[len(splits)-1], "-fm", "", -1) return fn diff --git a/gbus/worker.go b/gbus/worker.go index aa35fb9..216b771 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -2,7 +2,6 @@ package gbus import ( "context" - "database/sql" "errors" "fmt" "math/rand" @@ -36,7 +35,7 @@ type worker struct { handlersLock *sync.Mutex registrations []*Registration rpcHandlers map[string]MessageHandler - deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error + deadletterHandler DeadLetterMessageHandler b *DefaultBus serializer Serializer txProvider TxProvider @@ -215,7 +214,9 @@ func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) { _ = worker.reject(true, delivery) return } - err := worker.deadletterHandler(tx, delivery) + err := metrics.RunHandlerWithMetric(func() error { + return worker.deadletterHandler(tx, delivery) + }, worker.deadletterHandler.Name(), worker.log()) var reject bool if err != nil { worker.log().WithError(err).Error("failed handling deadletter") diff --git a/tests/bus_test.go b/tests/bus_test.go index b5d5293..265684e 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -231,11 +231,11 @@ func TestDeadlettering(t *testing.T) { var waitgroup sync.WaitGroup waitgroup.Add(2) - poision := gbus.NewBusMessage(PoisionMessage{}) + poison := gbus.NewBusMessage(PoisonMessage{}) service1 := createNamedBusForTest(testSvc1) deadletterSvc := createNamedBusForTest("deadletterSvc") - deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error { + deadMessageHandler := func(tx *sql.Tx, poison amqp.Delivery) error { waitgroup.Done() return nil } @@ -252,7 +252,7 @@ func TestDeadlettering(t *testing.T) { service1.Start() defer service1.Shutdown() - service1.Send(context.Background(), testSvc1, poision) + service1.Send(context.Background(), testSvc1, poison) service1.Send(context.Background(), testSvc1, gbus.NewBusMessage(Command1{})) waitgroup.Wait() @@ -260,13 +260,31 @@ func TestDeadlettering(t *testing.T) { if count != 1 { t.Error("Should have one rejected message") } + + //because deadMessageHandler is an anonymous function and is registered first its name will be "func1" + handlerMetrics := metrics.GetHandlerMetrics("func1") + if handlerMetrics == nil { + t.Fatal("DeadLetterHandler should be registered for metrics") + } + failureCount, _ := handlerMetrics.GetFailureCount() + if failureCount != 0 { + t.Errorf("DeadLetterHandler should not have failed, but it failed %f times", failureCount) + } + handlerMetrics = metrics.GetHandlerMetrics("func2") + if handlerMetrics == nil { + t.Fatal("faulty should be registered for metrics") + } + failureCount, _ = handlerMetrics.GetFailureCount() + if failureCount == 1 { + t.Errorf("faulty should have failed once, but it failed %f times", failureCount) + } } func TestReturnDeadToQueue(t *testing.T) { var visited bool proceed := make(chan bool, 0) - poision := gbus.NewBusMessage(Command1{}) + poison := gbus.NewBusMessage(Command1{}) service1 := createBusWithConfig(testSvc1, "grabbit-dead", true, true, gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) @@ -274,8 +292,8 @@ func TestReturnDeadToQueue(t *testing.T) { deadletterSvc := createBusWithConfig("deadletterSvc", "grabbit-dead", true, true, gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0}) - deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error { - pub := amqpDeliveryToPublishing(poision) + deadMessageHandler := func(tx *sql.Tx, poison amqp.Delivery) error { + pub := amqpDeliveryToPublishing(poison) deadletterSvc.ReturnDeadToQueue(context.Background(), &pub) return nil } @@ -297,7 +315,7 @@ func TestReturnDeadToQueue(t *testing.T) { service1.Start() defer service1.Shutdown() - service1.Send(context.Background(), testSvc1, poision) + service1.Send(context.Background(), testSvc1, poison) select { case <-proceed: diff --git a/tests/testMessages.go b/tests/testMessages.go index 1cc3c54..54aa9c0 100644 --- a/tests/testMessages.go +++ b/tests/testMessages.go @@ -9,11 +9,11 @@ var _ gbus.Message = &Reply2{} var _ gbus.Message = &Event1{} var _ gbus.Message = &Event2{} -type PoisionMessage struct { +type PoisonMessage struct { } -func (PoisionMessage) SchemaName() string { - //an empty schema name will result in a message being treated as poision +func (PoisonMessage) SchemaName() string { + //an empty schema name will result in a message being treated as poison return "" }