Skip to content

Commit

Permalink
Switched to backlite for task queues.
Browse files Browse the repository at this point in the history
  • Loading branch information
mikestefanello committed Jul 25, 2024
1 parent 062d1f7 commit f54d9f8
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 370 deletions.
74 changes: 13 additions & 61 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
* [Flush tags](#flush-tags)
* [Tasks](#tasks)
* [Queues](#queues)
* [Runner](#runner)
* [Dispatcher](#dispatcher)
* [Cron](#cron)
* [Static files](#static-files)
* [Cache control headers](#cache-control-headers)
Expand Down Expand Up @@ -997,77 +997,29 @@ As shown in the previous examples, cache tags were provided because they can be
Tasks are queued operations to be executed in the background, either immediately, at a specfic time, or after a given amount of time has passed. Some examples of tasks could be long-running operations, bulk processing, cleanup, notifications, etc.
Since we're already using [SQLite](https://sqlite.org/) for our database, it's available to act as a persistent store for queued tasks so that tasks are never lost, can be retried until successful, and their concurrent execution can be managed. [Goqite](https://github.com/maragudk/goqite) is the library chosen to interface with [SQLite](https://sqlite.org/) and handle queueing tasks and processing them asynchronously.
Since we're already using [SQLite](https://sqlite.org/) for our database, it's available to act as a persistent store for queued tasks so that tasks are never lost, can be retried until successful, and their concurrent execution can be managed. [Backlite](https://github.com/mikestefanello/backlite) is the library chosen to interface with [SQLite](https://sqlite.org/) and handle queueing tasks and processing them asynchronously. I wrote that specifically to address the requirements I wanted to satisfy for this project.
To make things even easier, a custom client (`TaskClient`) is provided as a _Service_ on the `Container` which exposes a simple interface with [goqite](https://github.com/maragudk/goqite) that supports type-safe tasks and queues.
To make things easy, the _Backlite_ client is provided as a _Service_ on the `Container` which allows you to register queues and add tasks.
### Queues
A full example of a queue implementation can be found in `pkg/tasks` with an interactive form to create a task and add to the queue at `/task` (see `pkg/handlers/task.go`).
A queue starts by declaring a `Task` _type_, which is the object that gets placed in to a queue and eventually passed to a queue subscriber (a callback function to process the task). A `Task` must implement the `Name()` method which returns a unique name for the task. For example:
```go
type MyTask struct {
Text string
Num int
}

func (t MyTask) Name() string {
return "my_task"
}
```
Then, create the queue for `MyTask` tasks:
```go
q := services.NewQueue[MyTask](func(ctx context.Context, task MyTask) error {
// This is where you process the task
fmt.Println("Processed %s task!", task.Text)
return nil
})
```
And finally, register the queue with the `TaskClient`:
Configuration for the _Backlite_ client is exposed through the app's yaml configuration. The required database schema will be automatically installed when the app starts.
```go
c.Tasks.Register(q)
```
### Queues
See `pkg/tasks/register.go` for a simple way to register all of your queues and to easily pass the `Container` to them so the queue subscriber callbacks have access to all of your app's dependencies.
A full example of a queue implementation can be found in `pkg/tasks` with an interactive form to create a task and add to the queue at `/task` (see `pkg/handlers/task.go`). Also refer to the [Backlite](https://github.com/mikestefanello/backlite) documentation for reference and examples.
Now you can easily add a task to the queue using the `TaskClient`:
See `pkg/tasks/register.go` for a simple way to register all of your queues and to easily pass the `Container` to them so the queue processor callbacks have access to all of your app's dependencies.
```go
task := MyTask{Text: "Hello world!", Num: 10}
### Dispatcher
err := c.Tasks.
New(task).
Save()
```
#### Options
Tasks can be created and queued with various chained options:
The _task dispatcher_ is what manages the worker pool used for executing tasks in the background. It monitors incoming and scheduled tasks and handles sending them to the pool for execution by the queue's processor callback. This must be started in order for this to happen. In `cmd/web/main.go`, the _task dispatcher_ is automatically started when the app starts via:
```go
err := c.Tasks.
New(task).
Wait(30 * time.Second). // Wait 30 seconds before passing the task to the subscriber
At(time.Date(...)). // Wait until a given date before passing the task to the subscriber
Tx(tx). // Include the queueing of this task in a database transaction
Save()
c.Tasks.Start(ctx)
```
### Runner
The _task runner_ is what manages periodically polling the database for available queued tasks to process and passing them to the queue's subscriber callback. This must be started in order for this to happen. In `cmd/web/main.go`, the _task runner_ is started by using the `TaskClient`:
```go
go c.Tasks.StartRunner(ctx)
```
The app [configuration](#configuration) contains values to configure the client and dispatcher including how many goroutines to use, when to release stuck tasks back into the queue, and how often to cleanup retained tasks in the database.
The app [configuration](#configuration) contains values to configure the runner including how often to poll the database for tasks, the maximum amount of retries for a given task, and the amount of tasks that can be processed concurrently.
When the app is shutdown, the dispatcher is given 10 seconds to wait for any in-progress tasks to finish execution. This can be changed in `cmd/web/main.go`.
## Cron
Expand Down Expand Up @@ -1203,12 +1155,12 @@ Future work includes but is not limited to:
Thank you to all of the following amazing projects for making this possible.
- [alpinejs](https://github.com/alpinejs/alpine)
- [backlite](https://github.com/mikestefanello/backlite)
- [bulma](https://github.com/jgthms/bulma)
- [echo](https://github.com/labstack/echo)
- [ent](https://github.com/ent/ent)
- [go](https://go.dev/)
- [go-sqlite3](https://github.com/mattn/go-sqlite3)
- [goqite](https://github.com/maragudk/goqite)
- [goquery](https://github.com/PuerkitoBio/goquery)
- [htmx](https://github.com/bigskysoftware/htmx)
- [jwt](https://github.com/golang-jwt/jwt)
Expand Down
29 changes: 22 additions & 7 deletions cmd/web/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"os"
"os/signal"
"sync"
"time"

"github.com/mikestefanello/pagoda/pkg/handlers"
Expand Down Expand Up @@ -60,18 +61,32 @@ func main() {
tasks.Register(c)

// Start the task runner to execute queued tasks
ctx, cancel := context.WithCancel(context.Background())
go c.Tasks.StartRunner(ctx)
c.Tasks.Start(context.Background())

// Wait for interrupt signal to gracefully shut down the server with a timeout of 10 seconds.
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
signal.Notify(quit, os.Kill)
<-quit
cancel()
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)

// Shutdown both the task runner and web server
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := c.Web.Shutdown(ctx); err != nil {
log.Fatal(err)
}

wg := sync.WaitGroup{}
wg.Add(2)

go func() {
defer wg.Done()
c.Tasks.Stop(ctx)
}()

go func() {
defer wg.Done()
if err := c.Web.Shutdown(ctx); err != nil {
log.Fatal(err)
}
}()

wg.Wait()
}
6 changes: 3 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ type (

// TasksConfig stores the tasks configuration
TasksConfig struct {
PollInterval time.Duration
MaxRetries int
Goroutines int
Goroutines int
ReleaseAfter time.Duration
CleanupInterval time.Duration
}

// MailConfig stores the mail configuration
Expand Down
4 changes: 2 additions & 2 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ database:
testConnection: ":memory:?_journal=WAL&_timeout=5000&_fk=true"

tasks:
pollInterval: "1s"
maxRetries: 10
goroutines: 1
releaseAfter: "15m"
cleanupInterval: "1h"

mail:
hostname: "localhost"
Expand Down
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/mikestefanello/pagoda

go 1.22

toolchain go1.22.1
go 1.22.4

require (
entgo.io/ent v0.13.1
Expand All @@ -14,9 +12,9 @@ require (
github.com/gorilla/sessions v1.2.2
github.com/labstack/echo/v4 v4.12.0
github.com/labstack/gommon v0.4.2
github.com/maragudk/goqite v0.2.3
github.com/mattn/go-sqlite3 v1.14.22
github.com/maypok86/otter v1.2.1
github.com/mikestefanello/backlite v0.1.0
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.9.0
golang.org/x/crypto v0.22.0
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/maragudk/goqite v0.2.3 h1:R8oVD6IMCQfjhCKyGIYwWxR1w8yxjvT/3uwYtA656jE=
github.com/maragudk/goqite v0.2.3/go.mod h1:5430TCLkycUeLE314c9fifTrTbwcJqJXdU3iyEiF6hM=
github.com/maragudk/is v0.1.0 h1:obq9anZNmOYcaNbeT0LMyjIexdNeYTw/TLAPD/BnZHA=
github.com/maragudk/is v0.1.0/go.mod h1:W/r6+TpnISu+a88OLXQy5JQGCOhXQXXLD2e5b4xMn5c=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
Expand All @@ -94,6 +90,8 @@ github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/maypok86/otter v1.2.1 h1:xyvMW+t0vE1sKt/++GTkznLitEl7D/msqXkAbLwiC1M=
github.com/maypok86/otter v1.2.1/go.mod h1:mKLfoI7v1HOmQMwFgX4QkRk23mX6ge3RDvjdHOWG4R4=
github.com/mikestefanello/backlite v0.1.0 h1:bIiZJXPZB8V5PXWvDmkTepY015w3gJdeRrP3QrEV4Ls=
github.com/mikestefanello/backlite v0.1.0/go.mod h1:/vj8LPZWG/xqK/3uHaqOtu5JRLDEWqeyJKWTAlADTV0=
github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQflz0v0=
Expand Down
10 changes: 6 additions & 4 deletions pkg/handlers/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handlers

import (
"fmt"
"github.com/mikestefanello/backlite"
"github.com/mikestefanello/pagoda/pkg/msg"
"time"

Expand All @@ -21,7 +22,7 @@ const (

type (
Task struct {
tasks *services.TaskClient
tasks *backlite.Client
*services.TemplateRenderer
}

Expand Down Expand Up @@ -71,9 +72,10 @@ func (h *Task) Submit(ctx echo.Context) error {
}

// Insert the task
err = h.tasks.New(tasks.ExampleTask{
Message: input.Message,
}).
err = h.tasks.
Add(tasks.ExampleTask{
Message: input.Message,
}).
Wait(time.Duration(input.Delay) * time.Second).
Save()

Expand Down
22 changes: 17 additions & 5 deletions pkg/services/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ import (
"context"
"database/sql"
"fmt"
"github.com/mikestefanello/backlite"
"log/slog"
"os"
"strings"

entsql "entgo.io/ent/dialect/sql"
_ "github.com/mattn/go-sqlite3"
"github.com/mikestefanello/pagoda/pkg/funcmap"

"github.com/labstack/echo/v4"
_ "github.com/mattn/go-sqlite3"
"github.com/mikestefanello/pagoda/config"
"github.com/mikestefanello/pagoda/ent"
"github.com/mikestefanello/pagoda/pkg/funcmap"
"github.com/mikestefanello/pagoda/pkg/log"

// Require by ent
_ "github.com/mikestefanello/pagoda/ent/runtime"
Expand Down Expand Up @@ -51,7 +52,7 @@ type Container struct {
TemplateRenderer *TemplateRenderer

// Tasks stores the task client
Tasks *TaskClient
Tasks *backlite.Client
}

// NewContainer creates and initializes a new Container
Expand Down Expand Up @@ -177,10 +178,21 @@ func (c *Container) initTasks() {
var err error
// You could use a separate database for tasks, if you'd like. but using one
// makes transaction support easier
c.Tasks, err = NewTaskClient(c.Config.Tasks, c.Database)
c.Tasks, err = backlite.NewClient(backlite.ClientConfig{
DB: c.Database,
Logger: log.Default(),
NumWorkers: c.Config.Tasks.Goroutines,
ReleaseAfter: c.Config.Tasks.ReleaseAfter,
CleanupInterval: c.Config.Tasks.CleanupInterval,
})

if err != nil {
panic(fmt.Sprintf("failed to create task client: %v", err))
}

if err = c.Tasks.Install(); err != nil {
panic(fmt.Sprintf("failed to install task schema: %v", err))
}
}

// openDB opens a database connection
Expand Down
Loading

0 comments on commit f54d9f8

Please sign in to comment.