Skip to content

Commit

Permalink
Add more docs on requeue (#509)
Browse files Browse the repository at this point in the history
  • Loading branch information
m110 authored Oct 29, 2024
1 parent 94c0209 commit 48e2ba0
Show file tree
Hide file tree
Showing 14 changed files with 123 additions and 54 deletions.
4 changes: 2 additions & 2 deletions _examples/real-world-examples/delayed-messages/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ module delayed-messsages
go 1.23.0

require (
github.com/ThreeDotsLabs/watermill v1.4.0-rc.1.0.20241024100330-cb068b72e948
github.com/ThreeDotsLabs/watermill v1.4.0-rc.2
github.com/ThreeDotsLabs/watermill-redisstream v1.4.2
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024102321-584a6f7dab93
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-rc.1
github.com/brianvoe/gofakeit/v6 v6.28.0
github.com/google/uuid v1.6.0
github.com/lib/pq v1.10.2
Expand Down
10 changes: 4 additions & 6 deletions _examples/real-world-examples/delayed-messages/go.sum
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
github.com/Rican7/retry v0.3.1 h1:scY4IbO8swckzoA/11HgBwaZRJEyY9vaNJshcdhp1Mc=
github.com/Rican7/retry v0.3.1/go.mod h1:CxSDrhAyXmTMeEuRAnArMu1FHu48vtfjLREWqVl7Vw0=
github.com/ThreeDotsLabs/watermill v1.4.0-rc.1.0.20241024100330-cb068b72e948 h1:b8qRHpWtlO94x6dVzSulrO2znSQqz8iYsxUyrdTixHo=
github.com/ThreeDotsLabs/watermill v1.4.0-rc.1.0.20241024100330-cb068b72e948/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/ThreeDotsLabs/watermill v1.4.0-rc.2 h1:K62uIAKOkCHTXtAwY+Nj95vyLR0y25UMBsbf/FuWCeQ=
github.com/ThreeDotsLabs/watermill v1.4.0-rc.2/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/ThreeDotsLabs/watermill-redisstream v1.4.2 h1:FY6tsBcbhbJpKDOssU4bfybstqY0hQHwiZmVq9qyILQ=
github.com/ThreeDotsLabs/watermill-redisstream v1.4.2/go.mod h1:69++855LyB+ckYDe60PiJLBcUrpckfDE2WwyzuVJRCk=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024101952-75257d7d0602 h1:CKdW3wb3+C36mMa44DF53KUyM5L6mGOjI3hikBOlAl4=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024101952-75257d7d0602/go.mod h1:GMWcpauehgI40EeoKPxLnXBWjT7oOm7dJfzk5uU4IOc=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024102321-584a6f7dab93 h1:KeRk2EG5AtdxfpjqIVPigZqscMvIcy0E2h8k7y38OAE=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024102321-584a6f7dab93/go.mod h1:GMWcpauehgI40EeoKPxLnXBWjT7oOm7dJfzk5uU4IOc=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-rc.1 h1:uYfnh1EoqXrzHu+bX/TboRyv4ou+EFcmkC1MABeQ0lI=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-rc.1/go.mod h1:ttA/lhzSh0YyDkosq1Cgc7IYz6Arrba0jIWfdnON0WA=
github.com/brianvoe/gofakeit/v6 v6.28.0 h1:Xib46XXuQfmlLS2EXRuJpqcw8St6qSZz75OUo0tgAW4=
github.com/brianvoe/gofakeit/v6 v6.28.0/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
Expand Down
13 changes: 6 additions & 7 deletions _examples/real-world-examples/delayed-messages/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
stdSQL "database/sql"
"fmt"
"strings"
"time"

"github.com/brianvoe/gofakeit/v6"
Expand Down Expand Up @@ -115,7 +116,7 @@ func main() {
cqrs.NewEventHandler(
"OnOrderPlacedHandler",
func(ctx context.Context, event *OrderPlaced) error {
fmt.Printf("Received order placed from %v\n", event.Customer.Name)
fmt.Printf("💰 Received order from %v <%v>\n", event.Customer.Name, event.Customer.Email)

cmd := SendFeedbackForm{
To: event.Customer.Email,
Expand All @@ -142,10 +143,7 @@ func main() {
cqrs.NewCommandHandler(
"OnSendFeedbackForm",
func(ctx context.Context, cmd *SendFeedbackForm) error {
msg := fmt.Sprintf("Hello %s! It's been a while since you placed your order, how did you like it? Let us know!", cmd.Name)

fmt.Println("Sending feedback form to:", cmd.To)
fmt.Println("\tMessage:", msg)
fmt.Printf("📧 Sending feedback form to %v <%v>\n", cmd.Name, cmd.To)

// In a real world scenario, we would send an email to the customer here

Expand Down Expand Up @@ -188,11 +186,12 @@ func main() {
<-router.Running()

for {
name := gofakeit.FirstName()
e := OrderPlaced{
OrderID: uuid.NewString(),
Customer: Customer{
Name: gofakeit.FirstName(),
Email: gofakeit.Email(),
Name: name,
Email: fmt.Sprintf("%v@%v", strings.ToLower(name), gofakeit.DomainName()),
},
}

Expand Down
4 changes: 2 additions & 2 deletions _examples/real-world-examples/delayed-requeue/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ module delayed-requeue
go 1.23.0

require (
github.com/ThreeDotsLabs/watermill v1.4.0-rc.1.0.20241024100330-cb068b72e948
github.com/ThreeDotsLabs/watermill v1.4.0-rc.2
github.com/ThreeDotsLabs/watermill-redisstream v1.4.2
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024102321-584a6f7dab93
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-rc.1
github.com/brianvoe/gofakeit/v6 v6.28.0
github.com/lib/pq v1.10.9
github.com/redis/go-redis/v9 v9.7.0
Expand Down
10 changes: 4 additions & 6 deletions _examples/real-world-examples/delayed-requeue/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/Rican7/retry v0.3.1 h1:scY4IbO8swckzoA/11HgBwaZRJEyY9vaNJshcdhp1Mc=
github.com/Rican7/retry v0.3.1/go.mod h1:CxSDrhAyXmTMeEuRAnArMu1FHu48vtfjLREWqVl7Vw0=
github.com/ThreeDotsLabs/watermill v1.4.0-rc.1.0.20241024100330-cb068b72e948 h1:b8qRHpWtlO94x6dVzSulrO2znSQqz8iYsxUyrdTixHo=
github.com/ThreeDotsLabs/watermill v1.4.0-rc.1.0.20241024100330-cb068b72e948/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/ThreeDotsLabs/watermill v1.4.0-rc.2 h1:K62uIAKOkCHTXtAwY+Nj95vyLR0y25UMBsbf/FuWCeQ=
github.com/ThreeDotsLabs/watermill v1.4.0-rc.2/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/ThreeDotsLabs/watermill-redisstream v1.4.2 h1:FY6tsBcbhbJpKDOssU4bfybstqY0hQHwiZmVq9qyILQ=
github.com/ThreeDotsLabs/watermill-redisstream v1.4.2/go.mod h1:69++855LyB+ckYDe60PiJLBcUrpckfDE2WwyzuVJRCk=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024101952-75257d7d0602 h1:CKdW3wb3+C36mMa44DF53KUyM5L6mGOjI3hikBOlAl4=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024101952-75257d7d0602/go.mod h1:GMWcpauehgI40EeoKPxLnXBWjT7oOm7dJfzk5uU4IOc=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024102321-584a6f7dab93 h1:KeRk2EG5AtdxfpjqIVPigZqscMvIcy0E2h8k7y38OAE=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024102321-584a6f7dab93/go.mod h1:GMWcpauehgI40EeoKPxLnXBWjT7oOm7dJfzk5uU4IOc=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-rc.1 h1:uYfnh1EoqXrzHu+bX/TboRyv4ou+EFcmkC1MABeQ0lI=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-rc.1/go.mod h1:ttA/lhzSh0YyDkosq1Cgc7IYz6Arrba0jIWfdnON0WA=
github.com/brianvoe/gofakeit/v6 v6.28.0 h1:Xib46XXuQfmlLS2EXRuJpqcw8St6qSZz75OUo0tgAW4=
github.com/brianvoe/gofakeit/v6 v6.28.0/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
Expand Down
35 changes: 17 additions & 18 deletions _examples/real-world-examples/delayed-requeue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"math/rand"
"time"

"github.com/ThreeDotsLabs/watermill/message/router/middleware"

"github.com/brianvoe/gofakeit/v6"
_ "github.com/lib/pq"
"github.com/redis/go-redis/v9"
Expand All @@ -15,8 +17,6 @@ import (
"github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream"
"github.com/ThreeDotsLabs/watermill-sql/v4/pkg/sql"
"github.com/ThreeDotsLabs/watermill/components/cqrs"
"github.com/ThreeDotsLabs/watermill/components/delay"
"github.com/ThreeDotsLabs/watermill/components/requeuer"
"github.com/ThreeDotsLabs/watermill/message"
)

Expand All @@ -40,7 +40,12 @@ func main() {
delayedRequeuer, err := sql.NewPostgreSQLDelayedRequeuer(sql.DelayedRequeuerConfig{
DB: db,
Publisher: redisPublisher,
Logger: logger,
DelayOnError: &middleware.DelayOnError{
InitialInterval: 10 * time.Second,
MaxInterval: 3 * time.Minute,
Multiplier: 2,
},
Logger: logger,
})
if err != nil {
panic(err)
Expand Down Expand Up @@ -85,23 +90,13 @@ func main() {
cqrs.NewEventHandler(
"OnOrderPlacedHandler",
func(ctx context.Context, event *OrderPlaced) error {
fmt.Println("Received order placed:", event.OrderID)

msg := cqrs.OriginalMessageFromCtx(ctx)
retries := msg.Metadata.Get(requeuer.RetriesKey)
delayedUntil := msg.Metadata.Get(delay.DelayedUntilKey)
delayedFor := msg.Metadata.Get(delay.DelayedForKey)

if retries != "" {
fmt.Println("\tRetries:", retries)
fmt.Println("\tDelayed until:", delayedUntil)
fmt.Println("\tDelayed for:", delayedFor)
}

if event.OrderID == "" {
fmt.Println("ERROR: Received order placed without order_id")
return fmt.Errorf("empty order_id")
}

fmt.Println("Received order placed:", event.OrderID)

return nil
},
),
Expand All @@ -126,12 +121,16 @@ func main() {

<-router.Running()

i := 0

for {
e := newFakeOrderPlaced()

chance := rand.Intn(10)
if chance < 2 {
i++

if i == 10 {
e.OrderID = ""
i = 0
}

err = eventBus.Publish(context.Background(), e)
Expand Down
2 changes: 2 additions & 0 deletions docs/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ else
"components/delay/delay.go"
"components/delay/publisher.go"

"components/requeuer/requeuer.go"

"components/metrics/builder.go"
"components/metrics/http.go"

Expand Down
55 changes: 55 additions & 0 deletions docs/content/advanced/requeuing-after-error.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
+++
title = "Requeuing After Error"
description = "How to requeue a message after it fails to process"
weight = -20
draft = false
bref = "How to requeue a message after it fails to process"
+++

When a message fails to process (a nack is sent), it usually blocks other messages on the same topic (within the same consumer group or partition).

Depending on your setup, it may be useful to requeue the failed message back to the tail of the queue.

Consider this if:
* You don't care about the order of messages.
* Your system isn't resilient to blocked messages.

## Requeuer

The `Requeuer` component is a wrapper on the `Router` that moves messages from one topic to another.

{{% load-snippet-partial file="src-link/components/requeuer/requeuer.go" first_line_contains="type Config" last_line_contains="}" %}}

A trivial usage can look like this. It requeues messages from one topic to the same topic after a delay.

{{< callout "danger" >}}
Using the delay this way is not recommended, as it blocks the entire requeue process for the given time.
{{< /callout >}}

```go
req, err := requeuer.NewRequeuer(requeuer.Config{
Subscriber: sub,
SubscribeTopic: "topic",
Publisher: pub,
GeneratePublishTopic: func(params requeuer.GeneratePublishTopicParams) (string, error) {
return "topic", nil
},
Delay: time.Millisecond * 200,
}, logger)
if err != nil {
return err
}

err := req.Run(context.Background())
if err != nil {
return err
}
```

A better way to use the `Requeuer` is to combine it with the `Poison` middleware.
The middleware moves messages to a separate "poison" topic.
Then, the requeuer moves them back to the original topic based on the metadata.

You combine this with a Pub/Sub that supports delayed messages.
See the [full example based on PostgreSQL](https://github.com/ThreeDotsLabs/watermill/blob/master/_examples/real-world-examples/delayed-requeue/main.go).

1 change: 0 additions & 1 deletion docs/content/docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ if err != nil {
}
```


{{< tabs "publishing" >}}

{{< tab "Go Channel" "go-channel" >}}
Expand Down
35 changes: 27 additions & 8 deletions docs/content/pubsubs/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ go get github.com/ThreeDotsLabs/watermill-sql/v3

### Characteristics

| Feature | Implements | Note |
|---------------------|------------|-------------------------------------------|
| ConsumerGroups | yes | See `ConsumerGroup` in `SubscriberConfig` |
| ExactlyOnceDelivery | yes* | Just for MySQL implementation |
| GuaranteedOrder | yes | |
| Persistent | yes | |
| Feature | Implements | Note |
|---------------------|------------|-------------------------------------------------------------------------------|
| ConsumerGroups | yes | See `ConsumerGroup` in `SubscriberConfig` (not supported by the queue schema) |
| ExactlyOnceDelivery | yes* | Just for MySQL implementation |
| GuaranteedOrder | yes | |
| Persistent | yes | |

### Schema

Expand Down Expand Up @@ -83,7 +83,7 @@ constructor. You have to create one publisher for each transaction.
Example:
{{% load-snippet-partial file="src-link/_examples/real-world-examples/transactional-events/main.go" first_line_contains="func simulateEvents" last_line_contains="return pub.Publish(" padding_after="3" %}}

### Subscribing
## Subscribing

To create a subscriber, you need to pass not only proper schema adapter, but also an offsets adapter.

Expand All @@ -97,9 +97,28 @@ Example:

{{% load-snippet-partial file="src-link/watermill-sql/pkg/sql/subscriber.go" first_line_contains="func (s *Subscriber) Subscribe" last_line_contains="func (s *Subscriber) Subscribe" %}}

### Offsets Adapter
## Offsets Adapter

The logic for storing offsets of messages is provided by the `OffsetsAdapter`. If your schema uses auto-incremented integer as the row ID,
it should work out of the box with default offset adapters.

{{% load-snippet-partial file="src-link/watermill-sql/pkg/sql/offsets_adapter.go" first_line_contains="type OffsetsAdapter" %}}

## Queue

Instead of the default Pub/Sub schema, you can use the *queue* schema and offsets adapters.

It's a simpler schema that doesn't support consumer groups.
However, it has other advantages.

It lets you specify a custom `WHERE` clause for getting the messages.
You can use it to filter messages by some condition in the payload or in the metadata.

Additionally, you can choose to delete messages from the table after they are acknowledged.
Thanks to this, the table doesn't grow in size with time.

Currently, this schema is supported only for PostgreSQL.

{{% load-snippet-partial file="src-link/watermill-sql/pkg/sql/queue_schema_adapter_postgresql.go" first_line_contains="// PostgreSQLQueueSchema" last_line_contains="}" %}}

{{% load-snippet-partial file="src-link/watermill-sql/pkg/sql/queue_offsets_adapter_postgresql.go" first_line_contains="// PostgreSQLQueueOffsetsAdapter" last_line_contains="}" %}}
Binary file not shown.
Binary file not shown.
6 changes: 2 additions & 4 deletions tools/pq/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ module github.com/ThreeDotsLabs/watermill/tools/pq
go 1.23.0

require (
github.com/ThreeDotsLabs/watermill v1.4.0-rc.1.0.20241011082756-1cb09cdf7d08
github.com/ThreeDotsLabs/watermill v1.4.0-rc.2
github.com/charmbracelet/bubbles v0.19.0
github.com/charmbracelet/bubbletea v0.27.1
github.com/charmbracelet/lipgloss v0.13.0
github.com/jmoiron/sqlx v1.4.0
github.com/lib/pq v1.10.9
github.com/pkg/errors v0.9.1
golang.org/x/exp v0.0.0-20220909182711-5c715a9e8561
)

Expand All @@ -32,13 +33,10 @@ require (
github.com/muesli/cancelreader v0.2.2 // indirect
github.com/muesli/termenv v0.15.2 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/sony/gobreaker v1.0.0 // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.14.0 // indirect
)

replace github.com/ThreeDotsLabs/watermill => ../..
2 changes: 2 additions & 0 deletions tools/pq/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/ThreeDotsLabs/watermill v1.4.0-rc.2 h1:K62uIAKOkCHTXtAwY+Nj95vyLR0y25UMBsbf/FuWCeQ=
github.com/ThreeDotsLabs/watermill v1.4.0-rc.2/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k=
github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8=
github.com/aymanbagabas/go-udiff v0.2.0 h1:TK0fH4MteXUDspT88n8CKzvK0X9O2xu9yQjWpi6yML8=
Expand Down

0 comments on commit 48e2ba0

Please sign in to comment.