From c32fdc44caf4d87ad240e02ea69b044cb330e8ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=C5=82osz=20Sm=C3=B3=C5=82ka?= Date: Fri, 25 Oct 2024 14:13:30 +0200 Subject: [PATCH] Requeuer and Message Delay (#469) * Added the `Requeuer` component. * It works as a simpler version of the Forwarder, routing messages from one topic to another (a dynamic one). * Can be used to move messages that failed to process back to the original topic, so they don't block other messages. * Can be used together with the poison middleware and the `delay` component to delay the forwarding. * Added the `delay` package. It contains helpers for setting delay metadata on messages. * **Does not do anything by itself.** A Pub/Sub needs to support it explicitly. For now, that's the delayed postgres Pub/Sub implemented in https://github.com/ThreeDotsLabs/watermill-sql/pull/34 * Use case 1: publishing a message after a given delay or at given time (see the example). * Use case 2: automatically moving messages out of the poison queue to the original topic after a delay (used together with the `Requeuer` component). * Added the `pq` CLI tool for working with poison queues. --- .../delayed-messages/docker-compose.yml | 25 ++ .../delayed-messages/go.mod | 32 ++ .../delayed-messages/go.sum | 144 +++++++ .../delayed-messages/main.go | 221 ++++++++++ .../delayed-requeue/docker-compose.yml | 25 ++ .../delayed-requeue/go.mod | 33 ++ .../delayed-requeue/go.sum | 146 +++++++ .../delayed-requeue/main.go | 198 +++++++++ components/delay/delay.go | 68 +++ components/delay/publisher.go | 83 ++++ components/delay/publisher_test.go | 178 ++++++++ components/requeuer/requeuer.go | 158 +++++++ components/requeuer/requeuer_test.go | 102 +++++ docs/build.sh | 3 + docs/content/advanced/delayed-messages.md | 43 ++ docs/hugo_stats.json | 13 + message/router/middleware/delay_on_error.go | 47 ++ .../router/middleware/delay_on_error_test.go | 58 +++ message/router/middleware/retry_test.go | 6 +- tools/pq/README.md | 38 ++ tools/pq/backend/postgres.go | 99 +++++ tools/pq/cli/backend.go | 32 ++ tools/pq/cli/message.go | 45 ++ tools/pq/cli/model.go | 403 ++++++++++++++++++ tools/pq/go.mod | 44 ++ tools/pq/go.sum | 90 ++++ tools/pq/main.go | 51 +++ 27 files changed, 2381 insertions(+), 4 deletions(-) create mode 100644 _examples/real-world-examples/delayed-messages/docker-compose.yml create mode 100644 _examples/real-world-examples/delayed-messages/go.mod create mode 100644 _examples/real-world-examples/delayed-messages/go.sum create mode 100644 _examples/real-world-examples/delayed-messages/main.go create mode 100644 _examples/real-world-examples/delayed-requeue/docker-compose.yml create mode 100644 _examples/real-world-examples/delayed-requeue/go.mod create mode 100644 _examples/real-world-examples/delayed-requeue/go.sum create mode 100644 _examples/real-world-examples/delayed-requeue/main.go create mode 100644 components/delay/delay.go create mode 100644 components/delay/publisher.go create mode 100644 components/delay/publisher_test.go create mode 100644 components/requeuer/requeuer.go create mode 100644 components/requeuer/requeuer_test.go create mode 100644 docs/content/advanced/delayed-messages.md create mode 100644 message/router/middleware/delay_on_error.go create mode 100644 message/router/middleware/delay_on_error_test.go create mode 100644 tools/pq/README.md create mode 100644 tools/pq/backend/postgres.go create mode 100644 tools/pq/cli/backend.go create mode 100644 tools/pq/cli/message.go create mode 100644 tools/pq/cli/model.go create mode 100644 tools/pq/go.mod create mode 100644 tools/pq/go.sum create mode 100644 tools/pq/main.go diff --git a/_examples/real-world-examples/delayed-messages/docker-compose.yml b/_examples/real-world-examples/delayed-messages/docker-compose.yml new file mode 100644 index 000000000..b8310f17c --- /dev/null +++ b/_examples/real-world-examples/delayed-messages/docker-compose.yml @@ -0,0 +1,25 @@ +services: + server: + image: golang:1.23 + restart: unless-stopped + volumes: + - .:/app + - $GOPATH/pkg/mod:/go/pkg/mod + working_dir: /app + command: go run main.go + + redis: + image: redis:7 + ports: + - 6379:6379 + restart: unless-stopped + + postgres: + image: postgres:15 + restart: unless-stopped + ports: + - 5432:5432 + environment: + POSTGRES_USER: watermill + POSTGRES_DB: watermill + POSTGRES_PASSWORD: "password" diff --git a/_examples/real-world-examples/delayed-messages/go.mod b/_examples/real-world-examples/delayed-messages/go.mod new file mode 100644 index 000000000..ee8d58d6d --- /dev/null +++ b/_examples/real-world-examples/delayed-messages/go.mod @@ -0,0 +1,32 @@ +module delayed-messsages + +go 1.23.0 + +require ( + github.com/ThreeDotsLabs/watermill v1.4.0-rc.1.0.20241024100330-cb068b72e948 + github.com/ThreeDotsLabs/watermill-redisstream v1.4.2 + github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024102321-584a6f7dab93 + github.com/brianvoe/gofakeit/v6 v6.28.0 + github.com/google/uuid v1.6.0 + github.com/lib/pq v1.10.2 + github.com/redis/go-redis/v9 v9.6.1 +) + +require ( + github.com/Rican7/retry v0.3.1 // indirect + github.com/cenkalti/backoff/v3 v3.2.2 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/lithammer/shortuuid/v3 v3.0.7 // indirect + github.com/oklog/ulid v1.3.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/sony/gobreaker v1.0.0 // indirect + github.com/vmihailenco/msgpack v4.0.4+incompatible // indirect + google.golang.org/appengine v1.6.8 // indirect + google.golang.org/protobuf v1.34.2 // indirect + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect +) diff --git a/_examples/real-world-examples/delayed-messages/go.sum b/_examples/real-world-examples/delayed-messages/go.sum new file mode 100644 index 000000000..841fdf6a2 --- /dev/null +++ b/_examples/real-world-examples/delayed-messages/go.sum @@ -0,0 +1,144 @@ +github.com/Rican7/retry v0.3.1 h1:scY4IbO8swckzoA/11HgBwaZRJEyY9vaNJshcdhp1Mc= +github.com/Rican7/retry v0.3.1/go.mod h1:CxSDrhAyXmTMeEuRAnArMu1FHu48vtfjLREWqVl7Vw0= +github.com/ThreeDotsLabs/watermill v1.4.0-rc.1.0.20241024100330-cb068b72e948 h1:b8qRHpWtlO94x6dVzSulrO2znSQqz8iYsxUyrdTixHo= +github.com/ThreeDotsLabs/watermill v1.4.0-rc.1.0.20241024100330-cb068b72e948/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to= +github.com/ThreeDotsLabs/watermill-redisstream v1.4.2 h1:FY6tsBcbhbJpKDOssU4bfybstqY0hQHwiZmVq9qyILQ= +github.com/ThreeDotsLabs/watermill-redisstream v1.4.2/go.mod h1:69++855LyB+ckYDe60PiJLBcUrpckfDE2WwyzuVJRCk= +github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024101952-75257d7d0602 h1:CKdW3wb3+C36mMa44DF53KUyM5L6mGOjI3hikBOlAl4= +github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024101952-75257d7d0602/go.mod h1:GMWcpauehgI40EeoKPxLnXBWjT7oOm7dJfzk5uU4IOc= +github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024102321-584a6f7dab93 h1:KeRk2EG5AtdxfpjqIVPigZqscMvIcy0E2h8k7y38OAE= +github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024102321-584a6f7dab93/go.mod h1:GMWcpauehgI40EeoKPxLnXBWjT7oOm7dJfzk5uU4IOc= +github.com/brianvoe/gofakeit/v6 v6.28.0 h1:Xib46XXuQfmlLS2EXRuJpqcw8St6qSZz75OUo0tgAW4= +github.com/brianvoe/gofakeit/v6 v6.28.0/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= +github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= +github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= +github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/pgconn v1.14.3 h1:bVoTr12EGANZz66nZPkMInAV/KHD2TxH9npjXXgiB3w= +github.com/jackc/pgconn v1.14.3/go.mod h1:RZbme4uasqzybK2RK5c65VsHxoyaml09lx3tXOcO/VM= +github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= +github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgproto3/v2 v2.3.3 h1:1HLSx5H+tXR9pW3in3zaztoEwQYRC9SQaYUHjTSUOag= +github.com/jackc/pgproto3/v2 v2.3.3/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgtype v1.14.0 h1:y+xUdabmyMkJLyApYuPj38mW+aAIqCe5uuBB51rH3Vw= +github.com/jackc/pgtype v1.14.0/go.mod h1:LUMuVrfsFfdKGLw+AFFVv6KtHOFMwRgDDzBt76IqCA4= +github.com/jackc/pgx/v4 v4.18.2 h1:xVpYkNR5pk5bMCZGfClbO962UIqVABcAGt7ha1s/FeU= +github.com/jackc/pgx/v4 v4.18.2/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8= +github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= +github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= +github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= +github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= +github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= +github.com/sony/gobreaker v1.0.0 h1:feX5fGGXSl3dYd4aHZItw+FpHLvvoaqkawKjVNiFMNQ= +github.com/sony/gobreaker v1.0.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI= +github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.20.0 h1:jmAMJJZXr5KiCw05dfYK9QnqaqKLYXijU23lsEdcQqg= +golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= +google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/_examples/real-world-examples/delayed-messages/main.go b/_examples/real-world-examples/delayed-messages/main.go new file mode 100644 index 000000000..520235274 --- /dev/null +++ b/_examples/real-world-examples/delayed-messages/main.go @@ -0,0 +1,221 @@ +package main + +import ( + "context" + stdSQL "database/sql" + "fmt" + "time" + + "github.com/brianvoe/gofakeit/v6" + "github.com/google/uuid" + _ "github.com/lib/pq" + "github.com/redis/go-redis/v9" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream" + "github.com/ThreeDotsLabs/watermill-sql/v4/pkg/sql" + "github.com/ThreeDotsLabs/watermill/components/cqrs" + "github.com/ThreeDotsLabs/watermill/components/delay" + "github.com/ThreeDotsLabs/watermill/components/forwarder" + "github.com/ThreeDotsLabs/watermill/message" +) + +func main() { + db, err := stdSQL.Open("postgres", "postgres://watermill:password@postgres:5432/watermill?sslmode=disable") + if err != nil { + panic(err) + } + + logger := watermill.NewStdLogger(false, false) + + redisClient := redis.NewClient(&redis.Options{Addr: "redis:6379"}) + marshaler := cqrs.JSONMarshaler{ + GenerateName: cqrs.StructName, + } + + redisPublisher, err := redisstream.NewPublisher(redisstream.PublisherConfig{ + Client: redisClient, + }, logger) + if err != nil { + panic(err) + } + + var sqlPublisher message.Publisher + sqlPublisher, err = sql.NewDelayedPostgreSQLPublisher(db, sql.DelayedPostgreSQLPublisherConfig{ + DelayPublisherConfig: delay.PublisherConfig{}, + Logger: logger, + }) + if err != nil { + panic(err) + } + + sqlPublisher = forwarder.NewPublisher(sqlPublisher, forwarder.PublisherConfig{ + ForwarderTopic: "forwarder", + }) + + eventBus, err := cqrs.NewEventBusWithConfig(redisPublisher, cqrs.EventBusConfig{ + GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) { + return params.EventName, nil + }, + Marshaler: marshaler, + Logger: logger, + }) + if err != nil { + panic(err) + } + + commandBus, err := cqrs.NewCommandBusWithConfig(sqlPublisher, cqrs.CommandBusConfig{ + GeneratePublishTopic: func(params cqrs.CommandBusGeneratePublishTopicParams) (string, error) { + return params.CommandName, nil + }, + Marshaler: marshaler, + Logger: logger, + }) + if err != nil { + panic(err) + } + + router := message.NewDefaultRouter(logger) + + eventProcessor, err := cqrs.NewEventProcessorWithConfig(router, cqrs.EventProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) { + return params.EventName, nil + }, + SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return redisstream.NewSubscriber(redisstream.SubscriberConfig{ + Client: redisClient, + ConsumerGroup: params.HandlerName, + }, logger) + }, + Marshaler: marshaler, + Logger: logger, + }) + if err != nil { + panic(err) + } + + commandProcessor, err := cqrs.NewCommandProcessorWithConfig(router, cqrs.CommandProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) { + return params.CommandName, nil + }, + SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return redisstream.NewSubscriber(redisstream.SubscriberConfig{ + Client: redisClient, + ConsumerGroup: params.HandlerName, + }, logger) + }, + Marshaler: marshaler, + Logger: logger, + }) + if err != nil { + panic(err) + } + + err = eventProcessor.AddHandlers( + cqrs.NewEventHandler( + "OnOrderPlacedHandler", + func(ctx context.Context, event *OrderPlaced) error { + fmt.Printf("Received order placed from %v\n", event.Customer.Name) + + cmd := SendFeedbackForm{ + To: event.Customer.Email, + Name: event.Customer.Name, + } + + // In a real world scenario, we would delay the command by a few days + ctx = delay.WithContext(ctx, delay.For(8*time.Second)) + + err := commandBus.Send(ctx, cmd) + if err != nil { + return err + } + + return nil + }, + ), + ) + if err != nil { + panic(err) + } + + err = commandProcessor.AddHandlers( + cqrs.NewCommandHandler( + "OnSendFeedbackForm", + func(ctx context.Context, cmd *SendFeedbackForm) error { + msg := fmt.Sprintf("Hello %s! It's been a while since you placed your order, how did you like it? Let us know!", cmd.Name) + + fmt.Println("Sending feedback form to:", cmd.To) + fmt.Println("\tMessage:", msg) + + // In a real world scenario, we would send an email to the customer here + + return nil + }, + ), + ) + if err != nil { + panic(err) + } + + sqlSubscriber, err := sql.NewDelayedPostgreSQLSubscriber(db, sql.DelayedPostgreSQLSubscriberConfig{ + DeleteOnAck: true, + Logger: logger, + }) + if err != nil { + panic(err) + } + + _, err = forwarder.NewForwarder( + sqlSubscriber, + redisPublisher, + logger, + forwarder.Config{ + ForwarderTopic: "forwarder", + Router: router, + }, + ) + if err != nil { + panic(err) + } + + go func() { + err = router.Run(context.Background()) + if err != nil { + panic(err) + } + }() + + <-router.Running() + + for { + e := OrderPlaced{ + OrderID: uuid.NewString(), + Customer: Customer{ + Name: gofakeit.FirstName(), + Email: gofakeit.Email(), + }, + } + + err = eventBus.Publish(context.Background(), e) + if err != nil { + panic(err) + } + + time.Sleep(5 * time.Second) + } +} + +type Customer struct { + Name string `json:"name"` + Email string `json:"email"` +} + +type OrderPlaced struct { + OrderID string `json:"order_id"` + Customer Customer `json:"customer"` +} + +type SendFeedbackForm struct { + To string `json:"to"` + Name string `json:"name"` +} diff --git a/_examples/real-world-examples/delayed-requeue/docker-compose.yml b/_examples/real-world-examples/delayed-requeue/docker-compose.yml new file mode 100644 index 000000000..b8310f17c --- /dev/null +++ b/_examples/real-world-examples/delayed-requeue/docker-compose.yml @@ -0,0 +1,25 @@ +services: + server: + image: golang:1.23 + restart: unless-stopped + volumes: + - .:/app + - $GOPATH/pkg/mod:/go/pkg/mod + working_dir: /app + command: go run main.go + + redis: + image: redis:7 + ports: + - 6379:6379 + restart: unless-stopped + + postgres: + image: postgres:15 + restart: unless-stopped + ports: + - 5432:5432 + environment: + POSTGRES_USER: watermill + POSTGRES_DB: watermill + POSTGRES_PASSWORD: "password" diff --git a/_examples/real-world-examples/delayed-requeue/go.mod b/_examples/real-world-examples/delayed-requeue/go.mod new file mode 100644 index 000000000..f76ffb215 --- /dev/null +++ b/_examples/real-world-examples/delayed-requeue/go.mod @@ -0,0 +1,33 @@ +module delayed-requeue + +go 1.23.0 + +require ( + github.com/ThreeDotsLabs/watermill v1.4.0-rc.1.0.20241024100330-cb068b72e948 + github.com/ThreeDotsLabs/watermill-redisstream v1.4.2 + github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024102321-584a6f7dab93 + github.com/brianvoe/gofakeit/v6 v6.28.0 + github.com/lib/pq v1.10.9 + github.com/redis/go-redis/v9 v9.7.0 +) + +require ( + github.com/Rican7/retry v0.3.1 // indirect + github.com/cenkalti/backoff/v3 v3.2.2 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/go-sql-driver/mysql v1.8.1 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/lithammer/shortuuid/v3 v3.0.7 // indirect + github.com/oklog/ulid v1.3.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/sony/gobreaker v1.0.0 // indirect + github.com/vmihailenco/msgpack v4.0.4+incompatible // indirect + google.golang.org/appengine v1.6.8 // indirect + google.golang.org/protobuf v1.34.2 // indirect + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect +) diff --git a/_examples/real-world-examples/delayed-requeue/go.sum b/_examples/real-world-examples/delayed-requeue/go.sum new file mode 100644 index 000000000..ae0952ed3 --- /dev/null +++ b/_examples/real-world-examples/delayed-requeue/go.sum @@ -0,0 +1,146 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/Rican7/retry v0.3.1 h1:scY4IbO8swckzoA/11HgBwaZRJEyY9vaNJshcdhp1Mc= +github.com/Rican7/retry v0.3.1/go.mod h1:CxSDrhAyXmTMeEuRAnArMu1FHu48vtfjLREWqVl7Vw0= +github.com/ThreeDotsLabs/watermill v1.4.0-rc.1.0.20241024100330-cb068b72e948 h1:b8qRHpWtlO94x6dVzSulrO2znSQqz8iYsxUyrdTixHo= +github.com/ThreeDotsLabs/watermill v1.4.0-rc.1.0.20241024100330-cb068b72e948/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to= +github.com/ThreeDotsLabs/watermill-redisstream v1.4.2 h1:FY6tsBcbhbJpKDOssU4bfybstqY0hQHwiZmVq9qyILQ= +github.com/ThreeDotsLabs/watermill-redisstream v1.4.2/go.mod h1:69++855LyB+ckYDe60PiJLBcUrpckfDE2WwyzuVJRCk= +github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024101952-75257d7d0602 h1:CKdW3wb3+C36mMa44DF53KUyM5L6mGOjI3hikBOlAl4= +github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024101952-75257d7d0602/go.mod h1:GMWcpauehgI40EeoKPxLnXBWjT7oOm7dJfzk5uU4IOc= +github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024102321-584a6f7dab93 h1:KeRk2EG5AtdxfpjqIVPigZqscMvIcy0E2h8k7y38OAE= +github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024102321-584a6f7dab93/go.mod h1:GMWcpauehgI40EeoKPxLnXBWjT7oOm7dJfzk5uU4IOc= +github.com/brianvoe/gofakeit/v6 v6.28.0 h1:Xib46XXuQfmlLS2EXRuJpqcw8St6qSZz75OUo0tgAW4= +github.com/brianvoe/gofakeit/v6 v6.28.0/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= +github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= +github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/pgconn v1.14.3 h1:bVoTr12EGANZz66nZPkMInAV/KHD2TxH9npjXXgiB3w= +github.com/jackc/pgconn v1.14.3/go.mod h1:RZbme4uasqzybK2RK5c65VsHxoyaml09lx3tXOcO/VM= +github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= +github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgproto3/v2 v2.3.3 h1:1HLSx5H+tXR9pW3in3zaztoEwQYRC9SQaYUHjTSUOag= +github.com/jackc/pgproto3/v2 v2.3.3/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgtype v1.14.0 h1:y+xUdabmyMkJLyApYuPj38mW+aAIqCe5uuBB51rH3Vw= +github.com/jackc/pgtype v1.14.0/go.mod h1:LUMuVrfsFfdKGLw+AFFVv6KtHOFMwRgDDzBt76IqCA4= +github.com/jackc/pgx/v4 v4.18.2 h1:xVpYkNR5pk5bMCZGfClbO962UIqVABcAGt7ha1s/FeU= +github.com/jackc/pgx/v4 v4.18.2/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= +github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= +github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= +github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= +github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= +github.com/sony/gobreaker v1.0.0 h1:feX5fGGXSl3dYd4aHZItw+FpHLvvoaqkawKjVNiFMNQ= +github.com/sony/gobreaker v1.0.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI= +github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.20.0 h1:jmAMJJZXr5KiCw05dfYK9QnqaqKLYXijU23lsEdcQqg= +golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= +google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/_examples/real-world-examples/delayed-requeue/main.go b/_examples/real-world-examples/delayed-requeue/main.go new file mode 100644 index 000000000..edd62b588 --- /dev/null +++ b/_examples/real-world-examples/delayed-requeue/main.go @@ -0,0 +1,198 @@ +package main + +import ( + "context" + stdSQL "database/sql" + "fmt" + "math/rand" + "time" + + "github.com/brianvoe/gofakeit/v6" + _ "github.com/lib/pq" + "github.com/redis/go-redis/v9" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream" + "github.com/ThreeDotsLabs/watermill-sql/v4/pkg/sql" + "github.com/ThreeDotsLabs/watermill/components/cqrs" + "github.com/ThreeDotsLabs/watermill/components/delay" + "github.com/ThreeDotsLabs/watermill/components/requeuer" + "github.com/ThreeDotsLabs/watermill/message" +) + +func main() { + db, err := stdSQL.Open("postgres", "postgres://watermill:password@postgres:5432/watermill?sslmode=disable") + if err != nil { + panic(err) + } + + logger := watermill.NewStdLogger(false, false) + + redisClient := redis.NewClient(&redis.Options{Addr: "redis:6379"}) + + redisPublisher, err := redisstream.NewPublisher(redisstream.PublisherConfig{ + Client: redisClient, + }, logger) + if err != nil { + panic(err) + } + + delayedRequeuer, err := sql.NewPostgreSQLDelayedRequeuer(sql.DelayedRequeuerConfig{ + DB: db, + Publisher: redisPublisher, + Logger: logger, + }) + if err != nil { + panic(err) + } + + marshaler := cqrs.JSONMarshaler{ + GenerateName: cqrs.StructName, + } + + eventBus, err := cqrs.NewEventBusWithConfig(redisPublisher, cqrs.EventBusConfig{ + GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) { + return params.EventName, nil + }, + Marshaler: marshaler, + Logger: logger, + }) + if err != nil { + panic(err) + } + + router := message.NewDefaultRouter(logger) + router.AddMiddleware(delayedRequeuer.Middleware()...) + + eventProcessor, err := cqrs.NewEventProcessorWithConfig(router, cqrs.EventProcessorConfig{ + GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) { + return params.EventName, nil + }, + SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) { + return redisstream.NewSubscriber(redisstream.SubscriberConfig{ + Client: redisClient, + ConsumerGroup: params.HandlerName, + }, logger) + }, + Marshaler: marshaler, + Logger: logger, + }) + if err != nil { + panic(err) + } + + err = eventProcessor.AddHandlers( + cqrs.NewEventHandler( + "OnOrderPlacedHandler", + func(ctx context.Context, event *OrderPlaced) error { + fmt.Println("Received order placed:", event.OrderID) + + msg := cqrs.OriginalMessageFromCtx(ctx) + retries := msg.Metadata.Get(requeuer.RetriesKey) + delayedUntil := msg.Metadata.Get(delay.DelayedUntilKey) + delayedFor := msg.Metadata.Get(delay.DelayedForKey) + + if retries != "" { + fmt.Println("\tRetries:", retries) + fmt.Println("\tDelayed until:", delayedUntil) + fmt.Println("\tDelayed for:", delayedFor) + } + + if event.OrderID == "" { + return fmt.Errorf("empty order_id") + } + + return nil + }, + ), + ) + if err != nil { + panic(err) + } + + go func() { + err = delayedRequeuer.Run(context.Background()) + if err != nil { + panic(err) + } + }() + + go func() { + err = router.Run(context.Background()) + if err != nil { + panic(err) + } + }() + + <-router.Running() + + for { + e := newFakeOrderPlaced() + + chance := rand.Intn(10) + if chance < 2 { + e.OrderID = "" + } + + err = eventBus.Publish(context.Background(), e) + if err != nil { + panic(err) + } + + time.Sleep(1 * time.Second) + } +} + +func newFakeOrderPlaced() OrderPlaced { + var products []Product + + for i := 0; i < rand.Intn(5)+1; i++ { + products = append(products, Product{ + ID: watermill.NewShortUUID(), + Name: gofakeit.ProductName(), + }) + } + + return OrderPlaced{ + OrderID: watermill.NewUUID(), + Customer: Customer{ + ID: watermill.NewULID(), + Name: gofakeit.Name(), + Email: gofakeit.Email(), + Phone: gofakeit.Phone(), + }, + Address: Address{ + Street: gofakeit.Street(), + City: gofakeit.City(), + Zip: gofakeit.Zip(), + Country: gofakeit.Country(), + }, + Products: products, + } +} + +type OrderPlaced struct { + OrderID string `json:"order_id"` + Customer Customer `json:"customer"` + Address Address `json:"address"` + Products []Product `json:"products"` +} + +type Customer struct { + ID string `json:"id"` + Name string `json:"name"` + Email string `json:"email"` + Phone string `json:"phone"` +} + +type Address struct { + Street string `json:"street"` + City string `json:"city"` + Zip string `json:"zip"` + Country string `json:"country"` +} + +type Product struct { + ID string `json:"id"` + Name string `json:"name"` +} diff --git a/components/delay/delay.go b/components/delay/delay.go new file mode 100644 index 000000000..255419af1 --- /dev/null +++ b/components/delay/delay.go @@ -0,0 +1,68 @@ +package delay + +import ( + "context" + "time" + + "github.com/ThreeDotsLabs/watermill/message" +) + +// Delay represents a message's delay. +// It can be either a delay until a specific time or a delay for a specific duration. +// The zero value of Delay is a zero delay. +// +// IMPORTANT: Delay doesn't work with all Pub/Subs! Using it won't have any effect on Pub/Subs that don't support it. +// See the list of supported Pub/Subs in the documentation: https://watermill.io/advanced/delayed-messages/ +type Delay struct { + time time.Time + duration time.Duration +} + +func (d Delay) IsZero() bool { + return d.time.IsZero() +} + +// Until returns a delay of the given time. +func Until(delayedUntil time.Time) Delay { + return Delay{ + time: delayedUntil, + duration: delayedUntil.Sub(time.Now().UTC()), + } +} + +// For returns a delay of now plus the given duration. +func For(delayedFor time.Duration) Delay { + return Delay{ + time: time.Now().UTC().Add(delayedFor), + duration: delayedFor, + } +} + +type contextKey string + +var ( + delayContextKey = contextKey("delay") +) + +// WithContext returns a new context with the given delay. +// If used together with a publisher wrapped with NewPublisher, the delay will be applied to the message. +// +// IMPORTANT: Delay doesn't work with all Pub/Subs! Using it won't have any effect on Pub/Subs that don't support it. +// See the list of supported Pub/Subs in the documentation: https://watermill.io/advanced/delayed-messages/ +func WithContext(ctx context.Context, delay Delay) context.Context { + return context.WithValue(ctx, delayContextKey, delay) +} + +const ( + DelayedUntilKey = "_watermill_delayed_until" + DelayedForKey = "_watermill_delayed_for" +) + +// Message sets the delay metadata on the message. +// +// IMPORTANT: Delay doesn't work with all Pub/Subs! Using it won't have any effect on Pub/Subs that don't support it. +// See the list of supported Pub/Subs in the documentation: https://watermill.io/advanced/delayed-messages/ +func Message(msg *message.Message, delay Delay) { + msg.Metadata.Set(DelayedUntilKey, delay.time.Format(time.RFC3339)) + msg.Metadata.Set(DelayedForKey, delay.duration.String()) +} diff --git a/components/delay/publisher.go b/components/delay/publisher.go new file mode 100644 index 000000000..7318ecfb0 --- /dev/null +++ b/components/delay/publisher.go @@ -0,0 +1,83 @@ +package delay + +import ( + "errors" + + "github.com/ThreeDotsLabs/watermill/message" +) + +type DefaultDelayGeneratorParams struct { + Topic string + Message *message.Message +} + +// PublisherConfig is a configuration for the delay publisher. +type PublisherConfig struct { + // DefaultDelayGenerator is a function that generates the default delay for a message. + // If the message doesn't have the delay metadata set, the default delay will be applied. + DefaultDelayGenerator func(params DefaultDelayGeneratorParams) (Delay, error) + + // AllowNoDelay allows publishing messages without a delay set. + // By default, the publisher returns an error when a message is published without a delay and no default delay generator is provided. + AllowNoDelay bool +} + +// NewPublisher wraps a publisher with a delay mechanism. +// A message can be published with delay metadata set in the context by using the WithContext function. +// If the message doesn't have the delay metadata set, the default delay will be applied, if provided. +func NewPublisher(pub message.Publisher, config PublisherConfig) (message.Publisher, error) { + return &publisher{ + pub: pub, + config: config, + }, nil +} + +type publisher struct { + pub message.Publisher + config PublisherConfig +} + +func (p *publisher) Publish(topic string, messages ...*message.Message) error { + for i := range messages { + err := p.applyDelay(topic, messages[i]) + if err != nil { + return err + } + } + return p.pub.Publish(topic, messages...) +} + +func (p *publisher) Close() error { + return p.pub.Close() +} + +func (p *publisher) applyDelay(topic string, msg *message.Message) error { + if msg.Metadata.Get(DelayedForKey) != "" { + return nil + } + + if msg.Context().Value(delayContextKey) != nil { + delay := msg.Context().Value(delayContextKey).(Delay) + Message(msg, delay) + return nil + } + + if p.config.DefaultDelayGenerator != nil { + delay, err := p.config.DefaultDelayGenerator(DefaultDelayGeneratorParams{ + Topic: topic, + Message: msg, + }) + if err != nil { + return err + } + Message(msg, delay) + + return nil + } + + if !p.config.AllowNoDelay { + return errors.New("message doesn't have a delay set") + } + + return nil +} diff --git a/components/delay/publisher_test.go b/components/delay/publisher_test.go new file mode 100644 index 000000000..bd6ba3e58 --- /dev/null +++ b/components/delay/publisher_test.go @@ -0,0 +1,178 @@ +package delay_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ThreeDotsLabs/watermill/components/delay" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" +) + +func TestPublisher(t *testing.T) { + pubSub := gochannel.NewGoChannel(gochannel.Config{}, nil) + + messages, err := pubSub.Subscribe(context.Background(), "test") + require.NoError(t, err) + + pub, err := delay.NewPublisher(pubSub, delay.PublisherConfig{}) + require.NoError(t, err) + + pubAllowNoDelay, err := delay.NewPublisher(pubSub, delay.PublisherConfig{ + AllowNoDelay: true, + }) + require.NoError(t, err) + + defaultDelayPub, err := delay.NewPublisher(pubSub, delay.PublisherConfig{ + DefaultDelayGenerator: func(params delay.DefaultDelayGeneratorParams) (delay.Delay, error) { + return delay.For(1 * time.Second), nil + }, + }) + require.NoError(t, err) + + testCases := []struct { + name string + publisher message.Publisher + messageConstructor func(id string) *message.Message + expectedError bool + expectedDelay time.Duration + }{ + { + name: "no delay", + publisher: pub, + messageConstructor: func(id string) *message.Message { + return message.NewMessage(id, nil) + }, + expectedError: true, + expectedDelay: 0, + }, + { + name: "no delay but allowed", + publisher: pubAllowNoDelay, + messageConstructor: func(id string) *message.Message { + return message.NewMessage(id, nil) + }, + expectedDelay: 0, + }, + { + name: "default delay", + publisher: defaultDelayPub, + messageConstructor: func(id string) *message.Message { + return message.NewMessage(id, nil) + }, + expectedDelay: 1 * time.Second, + }, + { + name: "delay from metadata", + publisher: pub, + messageConstructor: func(id string) *message.Message { + msg := message.NewMessage(id, nil) + delay.Message(msg, delay.For(2*time.Second)) + return msg + }, + expectedDelay: 2 * time.Second, + }, + { + name: "default delay override with metadata", + publisher: defaultDelayPub, + messageConstructor: func(id string) *message.Message { + msg := message.NewMessage(id, nil) + delay.Message(msg, delay.For(2*time.Second)) + return msg + }, + expectedDelay: 2 * time.Second, + }, + { + name: "delay from context", + publisher: pub, + messageConstructor: func(id string) *message.Message { + msg := message.NewMessage(id, nil) + ctx := delay.WithContext(context.Background(), delay.For(3*time.Second)) + msg.SetContext(ctx) + return msg + }, + expectedDelay: 3 * time.Second, + }, + { + name: "default delay override with context", + publisher: defaultDelayPub, + messageConstructor: func(id string) *message.Message { + msg := message.NewMessage(id, nil) + ctx := delay.WithContext(context.Background(), delay.For(3*time.Second)) + msg.SetContext(ctx) + return msg + }, + expectedDelay: 3 * time.Second, + }, + { + name: "delay with until", + publisher: pub, + messageConstructor: func(id string) *message.Message { + msg := message.NewMessage(id, nil) + delay.Message(msg, delay.Until(time.Now().UTC().Add(4*time.Second))) + return msg + }, + expectedDelay: 4 * time.Second, + }, + { + name: "both metadata and context set", + publisher: defaultDelayPub, + messageConstructor: func(id string) *message.Message { + msg := message.NewMessage(id, nil) + delay.Message(msg, delay.For(5*time.Second)) + ctx := delay.WithContext(context.Background(), delay.For(6*time.Second)) + msg.SetContext(ctx) + return msg + }, + expectedDelay: 5 * time.Second, + }, + } + + for i, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + id := fmt.Sprint(i) + + msg := testCase.messageConstructor(id) + err = testCase.publisher.Publish("test", msg) + + if testCase.expectedError { + require.Error(t, err) + return + } + + require.NoError(t, err) + assertMessage(t, messages, id, testCase.expectedDelay) + }) + } +} + +func assertMessage(t *testing.T, messages <-chan *message.Message, expectedID string, expectedDelay time.Duration) { + t.Helper() + select { + case msg := <-messages: + assert.Equal(t, expectedID, msg.UUID) + + if expectedDelay == 0 { + assert.Empty(t, msg.Metadata.Get(delay.DelayedUntilKey)) + assert.Empty(t, msg.Metadata.Get(delay.DelayedForKey)) + } else { + delayedFor, err := time.ParseDuration(msg.Metadata.Get(delay.DelayedForKey)) + require.NoError(t, err) + assert.Equal(t, expectedDelay, delayedFor.Round(time.Second)) + + delayedUntil, err := time.Parse(time.RFC3339, msg.Metadata.Get(delay.DelayedUntilKey)) + require.NoError(t, err) + + assert.WithinDuration(t, time.Now().UTC().Add(expectedDelay), delayedUntil, 1*time.Second) + } + + msg.Ack() + case <-time.After(100 * time.Millisecond): + require.Fail(t, "timeout") + } +} diff --git a/components/requeuer/requeuer.go b/components/requeuer/requeuer.go new file mode 100644 index 000000000..623b43fe0 --- /dev/null +++ b/components/requeuer/requeuer.go @@ -0,0 +1,158 @@ +package requeuer + +import ( + "context" + "errors" + "fmt" + "strconv" + "time" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" +) + +const RetriesKey = "_watermill_requeuer_retries" + +// Requeuer is a component that moves messages from one topic to another. +// It can be used to requeue messages that failed to process. +type Requeuer struct { + config Config +} + +// GeneratePublishTopicParams are the parameters passed to the GeneratePublishTopic function. +type GeneratePublishTopicParams struct { + Message *message.Message +} + +// Config is the configuration for the Requeuer. +type Config struct { + // Subscriber is the subscriber to consume messages from. Required. + Subscriber message.Subscriber + + // SubscribeTopic is the topic related to the Subscriber to consume messages from. Required. + SubscribeTopic string + + // Publisher is the publisher to publish requeued messages to. Required. + Publisher message.Publisher + + // GeneratePublishTopic is the topic related to the Publisher to publish the requeued message to. + // For example, it could be a constant, or taken from the message's metadata. + // Required. + GeneratePublishTopic func(params GeneratePublishTopicParams) (string, error) + + // Delay is the duration to wait before requeueing the message. Optional. + // The default is no delay. + // + // This can be useful to avoid requeueing messages too quickly, for example, to avoid + // requeueing a message that failed to process due to a temporary issue. + // + // Avoid setting this to a very high value, as it will block the message processing. + Delay time.Duration + + // Router is the custom router to run the requeue handler on. Optional. + Router *message.Router +} + +func (c *Config) setDefaults(logger watermill.LoggerAdapter) error { + if c.Router == nil { + router, err := message.NewRouter(message.RouterConfig{}, logger) + if err != nil { + return fmt.Errorf("could not create router: %w", err) + } + + c.Router = router + } + + return nil +} + +func (c *Config) validate() error { + if c.Subscriber == nil { + return errors.New("subscriber is required") + } + + if c.SubscribeTopic == "" { + return errors.New("subscribe topic is required") + } + + if c.Publisher == nil { + return errors.New("publisher is required") + } + + if c.GeneratePublishTopic == nil { + return errors.New("generate publish topic is required") + } + + return nil +} + +// NewRequeuer creates a new Requeuer with the provided Config. +// It's not started automatically. You need to call Run on the returned Requeuer. +func NewRequeuer( + config Config, + logger watermill.LoggerAdapter, +) (*Requeuer, error) { + if logger == nil { + logger = watermill.NewStdLogger(false, false) + } + + err := config.setDefaults(logger) + if err != nil { + return nil, err + } + + err = config.validate() + if err != nil { + return nil, fmt.Errorf("invalid config: %w", err) + } + + r := &Requeuer{ + config: config, + } + + config.Router.AddNoPublisherHandler( + "requeuer", + config.SubscribeTopic, + config.Subscriber, + r.handler, + ) + + return r, nil +} + +func (r *Requeuer) handler(msg *message.Message) error { + if r.config.Delay > 0 { + select { + case <-msg.Context().Done(): + return msg.Context().Err() + case <-time.After(r.config.Delay): + } + } + + topic, err := r.config.GeneratePublishTopic(GeneratePublishTopicParams{Message: msg}) + if err != nil { + return err + } + + retriesStr := msg.Metadata.Get(RetriesKey) + retries, err := strconv.Atoi(retriesStr) + if err != nil { + retries = 0 + } + + retries++ + + msg.Metadata.Set(RetriesKey, strconv.Itoa(retries)) + + err = r.config.Publisher.Publish(topic, msg) + if err != nil { + return err + } + + return nil +} + +// Run runs the Requeuer. +func (r *Requeuer) Run(ctx context.Context) error { + return r.config.Router.Run(ctx) +} diff --git a/components/requeuer/requeuer_test.go b/components/requeuer/requeuer_test.go new file mode 100644 index 000000000..bf48c8d8c --- /dev/null +++ b/components/requeuer/requeuer_test.go @@ -0,0 +1,102 @@ +package requeuer_test + +import ( + "context" + "errors" + "fmt" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/components/requeuer" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/message/router/middleware" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" +) + +func TestRequeue(t *testing.T) { + logger := watermill.NewStdLogger(false, false) + + pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger) + + requeue, err := requeuer.NewRequeuer(requeuer.Config{ + Subscriber: pubSub, + SubscribeTopic: "requeue", + Publisher: pubSub, + GeneratePublishTopic: func(params requeuer.GeneratePublishTopicParams) (string, error) { + return "test", nil + }, + Delay: time.Millisecond * 200, + }, logger) + require.NoError(t, err) + + go func() { + err := requeue.Run(context.Background()) + require.NoError(t, err) + }() + + router, err := message.NewRouter(message.RouterConfig{}, logger) + require.NoError(t, err) + + pq, err := middleware.PoisonQueue(pubSub, "requeue") + require.NoError(t, err) + + router.AddMiddleware(pq) + + receivedMessages := make(chan int, 10) + + counter := 0 + + router.AddNoPublisherHandler( + "test", + "test", + pubSub, + func(msg *message.Message) error { + i, err := strconv.Atoi(string(msg.Payload)) + if err != nil { + return err + } + + counter++ + + if counter < 10 && i%2 == 0 { + return errors.New("error") + } + + receivedMessages <- i + + return nil + }, + ) + + go func() { + err := router.Run(context.Background()) + require.NoError(t, err) + }() + + time.Sleep(time.Second) + + for i := 0; i < 10; i++ { + msg := message.NewMessage(watermill.NewUUID(), []byte(fmt.Sprint(i))) + err := pubSub.Publish("test", msg) + require.NoError(t, err) + } + + var received []int + + timeout := false + for !timeout { + select { + case i := <-receivedMessages: + received = append(received, i) + case <-time.After(5 * time.Second): + timeout = true + break + } + } + + require.ElementsMatch(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, received) +} diff --git a/docs/build.sh b/docs/build.sh index 2ea03e0fa..7024a56ea 100755 --- a/docs/build.sh +++ b/docs/build.sh @@ -46,6 +46,9 @@ else "components/cqrs/cqrs.go" "components/cqrs/marshaler.go" + "components/delay/delay.go" + "components/delay/publisher.go" + "components/metrics/builder.go" "components/metrics/http.go" diff --git a/docs/content/advanced/delayed-messages.md b/docs/content/advanced/delayed-messages.md new file mode 100644 index 000000000..f5d6ff729 --- /dev/null +++ b/docs/content/advanced/delayed-messages.md @@ -0,0 +1,43 @@ ++++ +title = "Delayed Messages" +description = "Receive messages with a delay" +weight = -40 +draft = false +bref = "Receive messages with a delay" ++++ + +Delaying events or commands is a common use case in many applications. +For example, you may want to send the user a reminder after a few days of signing up. +It's not a complex logic to implement, but you can leverage messages to use it out of the box. + +## Delay Metadata + +Watermill's [`delay`](https://github.com/ThreeDotsLabs/watermill/tree/master/components/delay) package allows you to +*add delay metadata* to messages. + +{{< callout "danger" >}} +**The delay metadata does nothing by itself. You need to use a Pub/Sub implementation that supports it to make it work.** + +See below for supported Pub/Subs. +{{< /callout >}} + +There are two APIs you can use. If you work with raw messages, use `delay.Message`: + +```go +msg := message.NewMessage(watermill.NewUUID(), []byte("hello")) +delay.Message(msg, delay.For(time.Second * 10)) +``` + +If you use the CQRS component, use `delay.WithContext` instead (since you can't access the message directly): + +{{% load-snippet-partial file="src-link/_examples/real-world-examples/delayed-messages/main.go" first_line_contains="cmd := SendFeedbackForm" last_line_contains="return err" padding_after="1" %}} + +You can also use `delay.Until` instead of `delay.For` to specify `time.Time` instead of `time.Duration`. + +## Supported Pub/Subs + +* [PostgreSQL](/pubsubs/sql/) + +## Full Example + +See the [full example](https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/delayed-messages) in the Watermill repository. diff --git a/docs/hugo_stats.json b/docs/hugo_stats.json index 391e6403f..cb541b1e1 100644 --- a/docs/hugo_stats.json +++ b/docs/hugo_stats.json @@ -80,6 +80,10 @@ "btn-link", "btn-outline-primary", "btn-primary", + "callout", + "callout-body", + "callout-content", + "callout-danger", "card", "card-body", "card-list", @@ -190,6 +194,7 @@ "ms-auto", "ms-lg-2", "mt-3", + "mt-4", "mt-5", "mt-n3", "mx-2", @@ -220,7 +225,11 @@ "page-footer-meta", "page-links", "page-nav", + "pb-2", "pb-3", + "pe-4", + "ps-3", + "pt-4", "pubsubs", "px-0", "query-no-results", @@ -307,6 +316,8 @@ "customization", "debugging-pubsub-tests", "deduplicator", + "delay-metadata", + "delay-on-error", "doks-docs-nav", "duplicator", "ensuring-that-the-router-is-running", @@ -329,6 +340,7 @@ "fanin-component", "fanout-component", "forwarder-component", + "full-example", "generic-handlers", "grafana-dashboard", "grep-is-your-friend", @@ -408,6 +420,7 @@ "subscribing-for-messages", "subscription-name", "support", + "supported-pubsubs", "tabs-getting-started-0", "tabs-getting-started-0-tab", "tabs-getting-started-1", diff --git a/message/router/middleware/delay_on_error.go b/message/router/middleware/delay_on_error.go new file mode 100644 index 000000000..9b70370e5 --- /dev/null +++ b/message/router/middleware/delay_on_error.go @@ -0,0 +1,47 @@ +package middleware + +import ( + "time" + + "github.com/ThreeDotsLabs/watermill/components/delay" + "github.com/ThreeDotsLabs/watermill/message" +) + +// DelayOnError is a middleware that adds the delay metadata to the message if an error occurs. +// +// IMPORTANT: The delay metadata doesn't cause delays with all Pub/Subs! Using it won't have any effect on Pub/Subs that don't support it. +// See the list of supported Pub/Subs in the documentation: https://watermill.io/advanced/delayed-messages/ +type DelayOnError struct { + // InitialInterval is the first interval between retries. Subsequent intervals will be scaled by Multiplier. + InitialInterval time.Duration + // MaxInterval sets the limit for the exponential backoff of retries. The interval will not be increased beyond MaxInterval. + MaxInterval time.Duration + // Multiplier is the factor by which the waiting interval will be multiplied between retries. + Multiplier float64 +} + +func (d *DelayOnError) Middleware(h message.HandlerFunc) message.HandlerFunc { + return func(msg *message.Message) ([]*message.Message, error) { + msgs, err := h(msg) + if err != nil { + d.applyDelay(msg) + } + + return msgs, err + } +} + +func (d *DelayOnError) applyDelay(msg *message.Message) { + delayedForStr := msg.Metadata.Get(delay.DelayedForKey) + delayedFor, err := time.ParseDuration(delayedForStr) + if delayedForStr != "" && err == nil { + delayedFor *= time.Duration(d.Multiplier) + if delayedFor > d.MaxInterval { + delayedFor = d.MaxInterval + } + + delay.Message(msg, delay.For(delayedFor)) + } else { + delay.Message(msg, delay.For(d.InitialInterval)) + } +} diff --git a/message/router/middleware/delay_on_error_test.go b/message/router/middleware/delay_on_error_test.go new file mode 100644 index 000000000..e8fc89fa1 --- /dev/null +++ b/message/router/middleware/delay_on_error_test.go @@ -0,0 +1,58 @@ +package middleware_test + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/ThreeDotsLabs/watermill/components/delay" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/message/router/middleware" +) + +func TestDelayOnError(t *testing.T) { + m := middleware.DelayOnError{ + InitialInterval: time.Second, + MaxInterval: time.Second * 10, + Multiplier: 2, + } + + msg := message.NewMessage("1", []byte("test")) + + getDelayFor := func(msg *message.Message) string { + return msg.Metadata.Get(delay.DelayedForKey) + } + + okHandler := func(msg *message.Message) ([]*message.Message, error) { + return nil, nil + } + + errHandler := func(msg *message.Message) ([]*message.Message, error) { + return nil, errors.New("error") + } + + assert.Equal(t, "", getDelayFor(msg)) + + _, _ = m.Middleware(okHandler)(msg) + assert.Equal(t, "", getDelayFor(msg)) + + _, _ = m.Middleware(errHandler)(msg) + assert.Equal(t, "1s", getDelayFor(msg)) + + _, _ = m.Middleware(errHandler)(msg) + assert.Equal(t, "2s", getDelayFor(msg)) + + _, _ = m.Middleware(errHandler)(msg) + assert.Equal(t, "4s", getDelayFor(msg)) + + _, _ = m.Middleware(errHandler)(msg) + assert.Equal(t, "8s", getDelayFor(msg)) + + _, _ = m.Middleware(errHandler)(msg) + assert.Equal(t, "10s", getDelayFor(msg)) + + _, _ = m.Middleware(errHandler)(msg) + assert.Equal(t, "10s", getDelayFor(msg)) +} diff --git a/message/router/middleware/retry_test.go b/message/router/middleware/retry_test.go index cf7bd84f1..8c2fc91f7 100644 --- a/message/router/middleware/retry_test.go +++ b/message/router/middleware/retry_test.go @@ -5,13 +5,11 @@ import ( "testing" "time" - "github.com/ThreeDotsLabs/watermill" - + "github.com/pkg/errors" "github.com/stretchr/testify/assert" + "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" - "github.com/pkg/errors" - "github.com/ThreeDotsLabs/watermill/message/router/middleware" ) diff --git a/tools/pq/README.md b/tools/pq/README.md new file mode 100644 index 000000000..b482cbbe6 --- /dev/null +++ b/tools/pq/README.md @@ -0,0 +1,38 @@ +# pq + +pq is a CLI tool for working with delayed messages on poison queues. + +For now, it supports the PostgreSQL Pub/Sub implementation. + +## Install + +```bash +go install github.com/ThreeDotsLabs/watermill/tools/pq@latest +``` + +## Usage + +Set the `DATABASE_URL` environment variable to your PostgreSQL connection string. + +For example, to connect to the database used for the [delayed requeue example](../../_examples/real-world-examples/delayed-requeue): + +```bash +export DATABASE_URL="postgres://watermill:password@postgres:5432/watermill?sslmode=disable" +``` + +```bash +pq -backend postgres -topic requeue +``` + +This will use the default `watermill_` prefix, so will use the `watermill_requeue` table. + +If you use a custom prefix, use the `-raw-topic` flag instead: + +```bash +pq -backend postgres -raw-topic my_prefix_requeue +``` + +## Commands + +- Requeue — Updates the `_watermill_delayed_until` metadata to the current time, so the message will be instantly requeued. +- Ack — Removes the message from the queue (be careful — you will lose the message forever). diff --git a/tools/pq/backend/postgres.go b/tools/pq/backend/postgres.go new file mode 100644 index 000000000..f4418c127 --- /dev/null +++ b/tools/pq/backend/postgres.go @@ -0,0 +1,99 @@ +package backend + +import ( + "context" + "encoding/json" + "fmt" + "os" + "time" + + "github.com/jmoiron/sqlx" + _ "github.com/lib/pq" + + "github.com/ThreeDotsLabs/watermill/components/delay" + "github.com/ThreeDotsLabs/watermill/tools/pq/cli" +) + +type PostgresMessage struct { + Offset int `db:"offset"` + UUID string `db:"uuid"` + Payload string `db:"payload"` + Metadata string `db:"metadata"` +} + +type PostgresBackend struct { + db *sqlx.DB + config cli.BackendConfig +} + +func NewPostgresBackend(ctx context.Context, config cli.BackendConfig) (*PostgresBackend, error) { + dbURL := os.Getenv("DATABASE_URL") + if dbURL == "" { + return nil, fmt.Errorf("missing DATABASE_URL") + } + + db, err := sqlx.Connect("postgres", dbURL) + if err != nil { + return nil, err + } + + return &PostgresBackend{ + db: db, + config: config, + }, nil +} + +func (r *PostgresBackend) AllMessages(ctx context.Context) ([]cli.Message, error) { + var dbMessages []PostgresMessage + err := r.db.SelectContext(ctx, &dbMessages, fmt.Sprintf(`SELECT "offset", uuid, payload, metadata FROM %v WHERE acked = false ORDER BY "offset"`, r.topic())) + if err != nil { + return nil, err + } + + var messages []cli.Message + + for _, dbMsg := range dbMessages { + var metadata map[string]string + err := json.Unmarshal([]byte(dbMsg.Metadata), &metadata) + if err != nil { + return nil, err + } + + msg, err := cli.NewMessage(fmt.Sprint(dbMsg.Offset), dbMsg.UUID, dbMsg.Payload, metadata) + if err != nil { + return nil, err + } + + messages = append(messages, msg) + } + + return messages, nil +} + +func (r *PostgresBackend) Requeue(ctx context.Context, msg cli.Message) error { + _, err := r.db.ExecContext(ctx, fmt.Sprintf(`UPDATE %v SET metadata = metadata::jsonb || jsonb_build_object($1::text, $2::text) WHERE "offset" = $3`, r.topic()), + delay.DelayedUntilKey, time.Now().UTC().Format(time.RFC3339), msg.ID, + ) + if err != nil { + return err + } + + return nil +} + +func (r *PostgresBackend) Ack(ctx context.Context, msg cli.Message) error { + _, err := r.db.ExecContext(ctx, fmt.Sprintf(`UPDATE %v SET acked = true WHERE "offset" = %v`, r.topic(), msg.ID)) + if err != nil { + return err + } + + return nil +} + +func (r *PostgresBackend) topic() string { + if r.config.Topic != "" { + return fmt.Sprintf(`"watermill_%v"`, r.config.Topic) + } + + return fmt.Sprintf(`"%v"`, r.config.RawTopic) +} diff --git a/tools/pq/cli/backend.go b/tools/pq/cli/backend.go new file mode 100644 index 000000000..9aad18c66 --- /dev/null +++ b/tools/pq/cli/backend.go @@ -0,0 +1,32 @@ +package cli + +import ( + "context" + + "github.com/pkg/errors" +) + +type BackendConfig struct { + Topic string + RawTopic string +} + +func (c BackendConfig) Validate() error { + if c.Topic == "" && c.RawTopic == "" { + return errors.New("topic or raw topic must be provided") + } + + if c.Topic != "" && c.RawTopic != "" { + return errors.New("only one of topic or raw topic must be provided") + } + + return nil +} + +type BackendConstructor func(ctx context.Context, cfg BackendConfig) (Backend, error) + +type Backend interface { + AllMessages(ctx context.Context) ([]Message, error) + Requeue(ctx context.Context, msg Message) error + Ack(ctx context.Context, msg Message) error +} diff --git a/tools/pq/cli/message.go b/tools/pq/cli/message.go new file mode 100644 index 000000000..654171ed5 --- /dev/null +++ b/tools/pq/cli/message.go @@ -0,0 +1,45 @@ +package cli + +import ( + "time" + + "github.com/ThreeDotsLabs/watermill/components/delay" + "github.com/ThreeDotsLabs/watermill/message/router/middleware" +) + +type Message struct { + // ID is a unique message ID across the Pub/Sub's topic. + ID string + UUID string + Payload string + Metadata map[string]string + + OriginalTopic string + DelayedUntil string + DelayedFor string + RequeueIn time.Duration +} + +func NewMessage(id string, uuid string, payload string, metadata map[string]string) (Message, error) { + originalTopic := metadata[middleware.PoisonedTopicKey] + + // Calculate the time until the message should be requeued + delayedUntil, err := time.Parse(time.RFC3339, metadata[delay.DelayedUntilKey]) + if err != nil { + return Message{}, err + } + + delayedFor := metadata[delay.DelayedForKey] + requeueIn := delayedUntil.Sub(time.Now().UTC()).Round(time.Second) + + return Message{ + ID: id, + UUID: uuid, + Payload: payload, + Metadata: metadata, + OriginalTopic: originalTopic, + DelayedUntil: delayedUntil.String(), + DelayedFor: delayedFor, + RequeueIn: requeueIn, + }, nil +} diff --git a/tools/pq/cli/model.go b/tools/pq/cli/model.go new file mode 100644 index 000000000..05593b831 --- /dev/null +++ b/tools/pq/cli/model.go @@ -0,0 +1,403 @@ +package cli + +import ( + "context" + "encoding/json" + "fmt" + "slices" + "time" + + "github.com/charmbracelet/bubbles/table" + "github.com/charmbracelet/bubbles/viewport" + tea "github.com/charmbracelet/bubbletea" + "github.com/charmbracelet/lipgloss" + "golang.org/x/exp/maps" +) + +var warningStyle = lipgloss.NewStyle(). + Background(lipgloss.Color("196")). + Align(lipgloss.Center). + Padding(1, 10) + +var dialogStyle = lipgloss.NewStyle(). + Border(lipgloss.RoundedBorder()). + BorderForeground(lipgloss.Color("241")). + Padding(1, 4) + +var buttonStyle = lipgloss.NewStyle() + +var buttonSelectedStyle = lipgloss.NewStyle(). + Background(lipgloss.Color("57")) + +var readOnlyMessageActions = []string{"<- Back", "Show payload"} +var writeMessageActions = []string{"Requeue", "Ack (drop)"} + +var dialogActions = []string{"Cancel", "Confirm"} + +type MessagesUpdated struct { + Messages []Message +} + +type DialogResult struct { + Err error +} + +func (m Model) FetchMessages() tea.Cmd { + return func() tea.Msg { + for { + msgs, err := m.backend.AllMessages(context.Background()) + if err != nil { + panic(err) + } + + m.sub <- MessagesUpdated{ + Messages: msgs, + } + + time.Sleep(time.Second) + } + } +} + +func (m Model) WaitForMessages() tea.Cmd { + return func() tea.Msg { + return <-m.sub + } +} + +var baseStyle = lipgloss.NewStyle(). + BorderStyle(lipgloss.NormalBorder()). + BorderForeground(lipgloss.Color("240")) + +type Model struct { + backend Backend + sub chan MessagesUpdated + + chosenMessage *Message + chosenMessageGone bool + + table table.Model + messages []Message + + chosenAction int + currentDialog *Dialog + + showingPayload bool + payloadViewport viewport.Model +} + +func (m Model) Init() tea.Cmd { + return tea.Batch( + m.FetchMessages(), + m.WaitForMessages(), + ) +} + +func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { + switch msg := msg.(type) { + case MessagesUpdated: + rows := make([]table.Row, len(msg.Messages)) + for i, message := range msg.Messages { + rows[i] = table.Row{ + message.ID, + message.UUID, + message.OriginalTopic, + message.DelayedFor, + message.RequeueIn.String(), + } + } + m.table.SetRows(rows) + m.messages = msg.Messages + + // If the chosen message is no longer in the list, go back to the table. + // This is to avoid accidentally making an action on a message that has been requeued or deleted. + if m.chosenMessage != nil { + found := false + for _, message := range m.messages { + if message.ID == m.chosenMessage.ID { + foundMessage := message + m.chosenMessage = &foundMessage + found = true + break + } + } + + if found { + m.chosenMessageGone = false + } else { + if !m.chosenMessageGone { + m.chosenAction = 0 + } + + m.chosenMessageGone = true + } + } + + return m, m.WaitForMessages() + } + + if m.chosenMessage == nil { + switch msg := msg.(type) { + case tea.KeyMsg: + switch msg.String() { + case "ctrl+c", "q": + return m, tea.Quit + case " ", "enter": + c := m.table.Cursor() + m.chosenAction = 0 + chosenMessage := m.messages[c] + m.chosenMessage = &chosenMessage + m.chosenMessageGone = false + } + } + + var cmd tea.Cmd + m.table, cmd = m.table.Update(msg) + return m, cmd + } else if m.showingPayload { + switch msg := msg.(type) { + case tea.KeyMsg: + switch msg.String() { + case "ctrl+c", "q": + return m, tea.Quit + case "esc", "backspace": + m.showingPayload = false + } + } + + var cmd tea.Cmd + m.payloadViewport, cmd = m.payloadViewport.Update(msg) + return m, cmd + } else if m.currentDialog != nil { + switch msg := msg.(type) { + case tea.KeyMsg: + switch msg.String() { + case "ctrl+c", "q": + return m, tea.Quit + case "esc", "backspace": + m.currentDialog = nil + case "h", "left": + m.currentDialog.Choice-- + if m.currentDialog.Choice < 0 { + m.currentDialog.Choice = 0 + } + case "l", "right": + m.currentDialog.Choice++ + if m.currentDialog.Choice >= len(dialogActions) { + m.currentDialog.Choice = len(dialogActions) - 1 + } + case " ", "enter": + switch m.currentDialog.Choice { + case 0: + m.currentDialog = nil + case 1: + m.currentDialog.Running = true + return m, m.currentDialog.Action + } + } + case DialogResult: + if msg.Err != nil { + // TODO Could be handled better + panic(msg.Err) + } + + m.currentDialog = nil + } + + return m, nil + } else { + messageActions := len(readOnlyMessageActions) + if !m.chosenMessageGone { + messageActions += len(writeMessageActions) + } + + switch msg := msg.(type) { + case tea.KeyMsg: + switch msg.String() { + case "ctrl+c", "q": + return m, tea.Quit + case "esc", "backspace": + m.chosenMessage = nil + m.chosenMessageGone = false + case "j", "down": + m.chosenAction++ + if m.chosenAction >= messageActions { + m.chosenAction = messageActions - 1 + } + case "k", "up": + m.chosenAction-- + if m.chosenAction < 0 { + m.chosenAction = 0 + } + case " ", "enter": + switch m.chosenAction { + case 0: + m.chosenMessage = nil + m.chosenMessageGone = false + case 1: + // Show payload + m.showingPayload = true + m.payloadViewport = viewport.New(80, 20) + b := lipgloss.RoundedBorder() + m.payloadViewport.Style = lipgloss.NewStyle().BorderStyle(b).Padding(0, 1) + + payload := m.chosenMessage.Payload + + var jsonPayload any + err := json.Unmarshal([]byte(payload), &jsonPayload) + if err == nil { + pretty, err := json.MarshalIndent(jsonPayload, "", " ") + if err == nil { + payload = string(pretty) + } + } + + m.payloadViewport.SetContent(payload) + case 2: + chosenMessage := *m.chosenMessage + m.currentDialog = &Dialog{ + Prompt: "Requeue message? It will go back to the original topic.", + Action: func() tea.Msg { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + return DialogResult{ + Err: m.backend.Requeue(ctx, chosenMessage), + } + }, + } + case 3: + chosenMessage := *m.chosenMessage + m.currentDialog = &Dialog{ + Prompt: "Acknowledge message? It will be dropped from the topic.", + Action: func() tea.Msg { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + return DialogResult{ + Err: m.backend.Ack(ctx, chosenMessage), + } + }, + } + } + } + } + + return m, nil + } +} + +func (m Model) View() string { + if m.chosenMessage == nil { + return baseStyle.Render(m.table.View()) + "\n " + m.table.HelpView() + "\n" + } + + msg := m.chosenMessage + + var out string + + if m.chosenMessageGone { + out += warningStyle.Render("Read only — the message is gone.") + out += "\n" + } + + out += fmt.Sprintf( + "ID: %v\nUUID: %v\nOriginal Topic: %v\nDelayed For: %v\nDelayed Until: %v\nRequeue In: %v\n\n", + msg.ID, + msg.UUID, + msg.OriginalTopic, + msg.DelayedFor, + msg.DelayedUntil, + msg.RequeueIn, + ) + + if m.showingPayload { + out += m.payloadViewport.View() + return out + } + + out += "Metadata:\n" + + keys := maps.Keys(msg.Metadata) + slices.Sort(keys) + for _, k := range keys { + v := msg.Metadata[k] + out += fmt.Sprintf(" %v: %v\n", k, v) + } + + if m.currentDialog != nil { + prompt := m.currentDialog.Prompt + "\n\n" + + if m.currentDialog.Running { + prompt += "Running..." + } else { + for i, action := range dialogActions { + style := buttonStyle + if i == m.currentDialog.Choice { + style = buttonSelectedStyle + } + + prompt += fmt.Sprintf("%v", style.MarginLeft(13).Render(action)) + } + } + + out += dialogStyle.Render(prompt) + } else { + out += "\nActions:\n" + + messageActions := readOnlyMessageActions + if !m.chosenMessageGone { + messageActions = append(messageActions, writeMessageActions...) + } + + for i, action := range messageActions { + style := buttonStyle + if i == m.chosenAction { + style = buttonSelectedStyle + } + + out += fmt.Sprintf("%v\n", style.MarginLeft(2).Render(action)) + } + } + + return out +} + +func NewModel(backend Backend) Model { + columns := []table.Column{ + {Title: "ID", Width: 8}, + {Title: "UUID", Width: 40}, + {Title: "Original Topic", Width: 20}, + {Title: "Delayed For", Width: 14}, + {Title: "Requeue In", Width: 14}, + } + + t := table.New( + table.WithColumns(columns), + table.WithFocused(true), + table.WithHeight(20), + ) + + s := table.DefaultStyles() + s.Header = s.Header. + BorderStyle(lipgloss.NormalBorder()). + BorderForeground(lipgloss.Color("240")). + BorderBottom(true). + Bold(false) + s.Selected = s.Selected. + Foreground(lipgloss.Color("229")). + Background(lipgloss.Color("57")). + Bold(false) + t.SetStyles(s) + + return Model{ + backend: backend, + sub: make(chan MessagesUpdated), + table: t, + } +} + +type Dialog struct { + Prompt string + Action func() tea.Msg + Choice int + Running bool +} diff --git a/tools/pq/go.mod b/tools/pq/go.mod new file mode 100644 index 000000000..5f2f50e39 --- /dev/null +++ b/tools/pq/go.mod @@ -0,0 +1,44 @@ +module github.com/ThreeDotsLabs/watermill/tools/pq + +go 1.23.0 + +require ( + github.com/ThreeDotsLabs/watermill v1.4.0-rc.1.0.20241011082756-1cb09cdf7d08 + github.com/charmbracelet/bubbles v0.19.0 + github.com/charmbracelet/bubbletea v0.27.1 + github.com/charmbracelet/lipgloss v0.13.0 + github.com/jmoiron/sqlx v1.4.0 + github.com/lib/pq v1.10.9 + golang.org/x/exp v0.0.0-20220909182711-5c715a9e8561 +) + +require ( + github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect + github.com/cenkalti/backoff/v3 v3.2.2 // indirect + github.com/charmbracelet/x/ansi v0.1.4 // indirect + github.com/charmbracelet/x/input v0.1.0 // indirect + github.com/charmbracelet/x/term v0.1.1 // indirect + github.com/charmbracelet/x/windows v0.1.0 // indirect + github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/lithammer/shortuuid/v3 v3.0.7 // indirect + github.com/lucasb-eyer/go-colorful v1.2.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-localereader v0.0.1 // indirect + github.com/mattn/go-runewidth v0.0.16 // indirect + github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 // indirect + github.com/muesli/cancelreader v0.2.2 // indirect + github.com/muesli/termenv v0.15.2 // indirect + github.com/oklog/ulid v1.3.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/rivo/uniseg v0.4.7 // indirect + github.com/sony/gobreaker v1.0.0 // indirect + github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/text v0.14.0 // indirect +) + +replace github.com/ThreeDotsLabs/watermill => ../.. diff --git a/tools/pq/go.sum b/tools/pq/go.sum new file mode 100644 index 000000000..9585210df --- /dev/null +++ b/tools/pq/go.sum @@ -0,0 +1,90 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= +github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= +github.com/aymanbagabas/go-udiff v0.2.0 h1:TK0fH4MteXUDspT88n8CKzvK0X9O2xu9yQjWpi6yML8= +github.com/aymanbagabas/go-udiff v0.2.0/go.mod h1:RE4Ex0qsGkTAJoQdQQCA0uG+nAzJO/pI/QwceO5fgrA= +github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= +github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= +github.com/charmbracelet/bubbles v0.19.0 h1:gKZkKXPP6GlDk6EcfujDK19PCQqRjaJZQ7QRERx1UF0= +github.com/charmbracelet/bubbles v0.19.0/go.mod h1:WILteEqZ+krG5c3ntGEMeG99nCupcuIk7V0/zOP0tOA= +github.com/charmbracelet/bubbletea v0.27.1 h1:/yhaJKX52pxG4jZVKCNWj/oq0QouPdXycriDRA6m6r8= +github.com/charmbracelet/bubbletea v0.27.1/go.mod h1:xc4gm5yv+7tbniEvQ0naiG9P3fzYhk16cTgDZQQW6YE= +github.com/charmbracelet/lipgloss v0.13.0 h1:4X3PPeoWEDCMvzDvGmTajSyYPcZM4+y8sCA/SsA3cjw= +github.com/charmbracelet/lipgloss v0.13.0/go.mod h1:nw4zy0SBX/F/eAO1cWdcvy6qnkDUxr8Lw7dvFrAIbbY= +github.com/charmbracelet/x/ansi v0.1.4 h1:IEU3D6+dWwPSgZ6HBH+v6oUuZ/nVawMiWj5831KfiLM= +github.com/charmbracelet/x/ansi v0.1.4/go.mod h1:dk73KoMTT5AX5BsX0KrqhsTqAnhZZoCBjs7dGWp4Ktw= +github.com/charmbracelet/x/exp/golden v0.0.0-20240815200342-61de596daa2b h1:MnAMdlwSltxJyULnrYbkZpp4k58Co7Tah3ciKhSNo0Q= +github.com/charmbracelet/x/exp/golden v0.0.0-20240815200342-61de596daa2b/go.mod h1:wDlXFlCrmJ8J+swcL/MnGUuYnqgQdW9rhSD61oNMb6U= +github.com/charmbracelet/x/input v0.1.0 h1:TEsGSfZYQyOtp+STIjyBq6tpRaorH0qpwZUj8DavAhQ= +github.com/charmbracelet/x/input v0.1.0/go.mod h1:ZZwaBxPF7IG8gWWzPUVqHEtWhc1+HXJPNuerJGRGZ28= +github.com/charmbracelet/x/term v0.1.1 h1:3cosVAiPOig+EV4X9U+3LDgtwwAoEzJjNdwbXDjF6yI= +github.com/charmbracelet/x/term v0.1.1/go.mod h1:wB1fHt5ECsu3mXYusyzcngVWWlu1KKUmmLhfgr/Flxw= +github.com/charmbracelet/x/windows v0.1.0 h1:gTaxdvzDM5oMa/I2ZNF7wN78X/atWemG9Wph7Ika2k4= +github.com/charmbracelet/x/windows v0.1.0/go.mod h1:GLEO/l+lizvFDBPLIOk+49gdX49L9YWMB5t+DZd0jkQ= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f h1:Y/CXytFA4m6baUTXGLOoWe4PQhGxaX0KpnayAqC48p4= +github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f/go.mod h1:vw97MGsxSvLiUE2X8qFplwetxpGLQrlU1Q9AUEIzCaM= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= +github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= +github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= +github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= +github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-localereader v0.0.1 h1:ygSAOl7ZXTx4RdPYinUpg6W99U8jWvWi9Ye2JC/oIi4= +github.com/mattn/go-localereader v0.0.1/go.mod h1:8fBrzywKY7BI3czFoHkuzRoWE9C+EiG4R1k4Cjx5p88= +github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= +github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 h1:ZK8zHtRHOkbHy6Mmr5D264iyp3TiX5OmNcI5cIARiQI= +github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6/go.mod h1:CJlz5H+gyd6CUWT45Oy4q24RdLyn7Md9Vj2/ldJBSIo= +github.com/muesli/cancelreader v0.2.2 h1:3I4Kt4BQjOR54NavqnDogx/MIoWBFa0StPA8ELUXHmA= +github.com/muesli/cancelreader v0.2.2/go.mod h1:3XuTXfFS2VjM+HTLZY9Ak0l6eUKfijIfMUZ4EgX0QYo= +github.com/muesli/termenv v0.15.2 h1:GohcuySI0QmI3wN8Ok9PtKGkgkFIk7y6Vpb5PvrY+Wo= +github.com/muesli/termenv v0.15.2/go.mod h1:Epx+iuz8sNs7mNKhxzH4fWXGNpZwUaJKRS1noLXviQ8= +github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= +github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/sony/gobreaker v1.0.0 h1:feX5fGGXSl3dYd4aHZItw+FpHLvvoaqkawKjVNiFMNQ= +github.com/sony/gobreaker v1.0.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= +github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM= +golang.org/x/exp v0.0.0-20220909182711-5c715a9e8561 h1:MDc5xs78ZrZr3HMQugiXOAkSZtfTpbJLDr/lwfgO53E= +golang.org/x/exp v0.0.0-20220909182711-5c715a9e8561/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/tools/pq/main.go b/tools/pq/main.go new file mode 100644 index 000000000..292d50d9d --- /dev/null +++ b/tools/pq/main.go @@ -0,0 +1,51 @@ +package main + +import ( + "context" + "flag" + "log" + + "github.com/ThreeDotsLabs/watermill/tools/pq/backend" + "github.com/ThreeDotsLabs/watermill/tools/pq/cli" + + tea "github.com/charmbracelet/bubbletea" +) + +var ( + backendFlag = flag.String("backend", "", "backend to use") + topicFlag = flag.String("topic", "", "topic to use") + rawTopicFlag = flag.String("raw-topic", "", "raw topic to use") +) + +func main() { + flag.Parse() + + config := cli.BackendConfig{ + Topic: *topicFlag, + RawTopic: *rawTopicFlag, + } + + err := config.Validate() + if err != nil { + log.Fatal(err) + } + + var b cli.Backend + switch *backendFlag { + case "postgres": + b, err = backend.NewPostgresBackend(context.Background(), config) + if err != nil { + log.Fatal(err) + } + default: + log.Fatalf("unknown backend: %s", *backendFlag) + } + + m := cli.NewModel(b) + + p := tea.NewProgram(m, tea.WithAltScreen()) + _, err = p.Run() + if err != nil { + log.Fatal(err) + } +}