Skip to content

Commit

Permalink
Merge pull request #125 from meandnano/feature/queues
Browse files Browse the repository at this point in the history
Add message producer
  • Loading branch information
syumai authored Nov 7, 2024
2 parents ad33cfb + ae2bcf1 commit dfdd7e7
Show file tree
Hide file tree
Showing 13 changed files with 875 additions and 1 deletion.
3 changes: 3 additions & 0 deletions _examples/queues/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
build
node_modules
.wrangler
12 changes: 12 additions & 0 deletions _examples/queues/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
.PHONY: dev
dev:
npx wrangler dev --port 8787

.PHONY: build
build:
go run ../../cmd/workers-assets-gen
tinygo build -o ./build/app.wasm -target wasm -no-debug ./...

.PHONY: deploy
deploy:
npx wrangler deploy
38 changes: 38 additions & 0 deletions _examples/queues/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# queues

An example of using Cloudflare Workers that interact with [Cloudflare Queues](https://developers.cloudflare.com/queues/).

## Running

### Requirements

This project requires these tools to be installed globally.

* wrangler
* tinygo

### Supported commands

```
make dev # run dev server
make build # build Go Wasm binary
make deploy # deploy worker
```

### Interacting with the local queue

1. Start the dev server.
```sh
make dev
```

2. Send a message to the queue.
```sh
curl -v -X POST http://localhost:8787/ -d '{"message": "Hello, World!"}' -H "Content-Type: application/json"
```

3. Observe the response and server logs

4. You can pass `text/plain` content type to write queue message as the string or omit the `Content-Type` header to write queue message as
byte array.

7 changes: 7 additions & 0 deletions _examples/queues/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module github.com/syumai/workers/_examples/queues

go 1.22.8

require github.com/syumai/workers v0.0.0

replace github.com/syumai/workers => ../../
Empty file added _examples/queues/go.sum
Empty file.
105 changes: 105 additions & 0 deletions _examples/queues/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package main

import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"

"github.com/syumai/workers"
"github.com/syumai/workers/cloudflare/queues"
)

const queueName = "QUEUE"

func handleErr(w http.ResponseWriter, msg string, err error) {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(msg))
}

func main() {
http.HandleFunc("/", handleProduce)
workers.Serve(nil)
}
func handleProduce(w http.ResponseWriter, req *http.Request) {
if req.URL.Path != "/" {
w.WriteHeader(http.StatusNotFound)
return
}

if req.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

defer req.Body.Close()

q, err := queues.NewProducer(queueName)
if err != nil {
handleErr(w, "failed to init queue", err)
}

contentType := req.Header.Get("Content-Type")
switch contentType {
case "text/plain":
log.Println("Handling text content type")
err = produceText(q, req)
case "application/json":
log.Println("Handling json content type")
err = produceJson(q, req)
default:
log.Println("Handling bytes content type")
err = produceBytes(q, req)
}

if err != nil {
handleErr(w, "failed to handle request", err)
return
}

w.WriteHeader(http.StatusOK)
w.Write([]byte("message sent\n"))
}

func produceText(q *queues.Producer, req *http.Request) error {
content, err := io.ReadAll(req.Body)
if err != nil {
return fmt.Errorf("failed to read request body: %w", err)
}
if len(content) == 0 {
return fmt.Errorf("empty request body")
}

// text content type supports string and []byte messages
if err := q.Send(content, queues.WithContentType(queues.QueueContentTypeText)); err != nil {
return fmt.Errorf("failed to send message: %w", err)
}

return nil
}

func produceJson(q *queues.Producer, req *http.Request) error {
var data any
if err := json.NewDecoder(req.Body).Decode(&data); err != nil {
return fmt.Errorf("failed to read request body: %w", err)
}

// json content type is default and therefore can be omitted
// json content type supports messages of types that can be serialized to json
if err := q.Send(data); err != nil {
return fmt.Errorf("failed to send message: %w", err)
}

return nil
}

func produceBytes(q *queues.Producer, req *http.Request) error {
// bytes content type support messages of type []byte, string, and io.Reader
if err := q.Send(req.Body, queues.WithContentType(queues.QueueContentTypeBytes)); err != nil {
return fmt.Errorf("failed to send message: %w", err)
}

return nil
}
13 changes: 13 additions & 0 deletions _examples/queues/wrangler.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name = "queues-producer"
main = "./build/worker.mjs"
compatibility_date = "2022-05-13"
compatibility_flags = [
"streams_enable_constructors"
]

[[queues.producers]]
queue = "my-queue"
binding = "QUEUE"

[build]
command = "make build"
82 changes: 82 additions & 0 deletions cloudflare/queues/content_type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package queues

import (
"fmt"
"io"
"syscall/js"

"github.com/syumai/workers/internal/jsutil"
)

// QueueContentType represents the content type of a message produced to a queue.
// This information mostly affects how the message body is represented in the Cloudflare UI and is NOT
// propagated to the consumer side.
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#queuescontenttype
type QueueContentType string

const (
// QueueContentTypeJSON is the default content type for the produced queue message.
// The message body is NOT being marshaled before sending and is passed to js.ValueOf directly.
// Make sure the body is serializable to JSON.
// - https://pkg.go.dev/syscall/js#ValueOf
QueueContentTypeJSON QueueContentType = "json"

// QueueContentTypeV8 is currently treated the same as QueueContentTypeJSON.
QueueContentTypeV8 QueueContentType = "v8"

// QueueContentTypeText is used to send a message as a string.
// Supported body types are string, []byte and io.Reader.
QueueContentTypeText QueueContentType = "text"

// QueueContentTypeBytes is used to send a message as a byte array.
// Supported body types are string, []byte, and io.Reader.
QueueContentTypeBytes QueueContentType = "bytes"
)

func (o QueueContentType) mapValue(val any) (js.Value, error) {
switch o {
case QueueContentTypeText:
switch v := val.(type) {
case string:
return js.ValueOf(v), nil
case []byte:
return js.ValueOf(string(v)), nil
case io.Reader:
b, err := io.ReadAll(v)
if err != nil {
return js.Undefined(), fmt.Errorf("failed to read bytes from reader: %w", err)
}
return js.ValueOf(string(b)), nil
default:
return js.Undefined(), fmt.Errorf("invalid value type for text content type: %T", val)
}

case QueueContentTypeBytes:
var b []byte
switch v := val.(type) {
case string:
b = []byte(v)
case []byte:
b = v
case io.Reader:
var err error
b, err = io.ReadAll(v)
if err != nil {
return js.Undefined(), fmt.Errorf("failed to read bytes from reader: %w", err)
}
default:
return js.Undefined(), fmt.Errorf("invalid value type for bytes content type: %T", val)
}

ua := jsutil.NewUint8Array(len(b))
js.CopyBytesToJS(ua, b)
// accortind to docs, "bytes" type requires an ArrayBuffer to be sent, however practical experience shows that ArrayBufferView should
// be used instead and with Uint8Array.buffer as a value, the send simply fails
return ua, nil

case QueueContentTypeJSON, QueueContentTypeV8:
return js.ValueOf(val), nil
}

return js.Undefined(), fmt.Errorf("unknown content type: %s", o)
}
Loading

0 comments on commit dfdd7e7

Please sign in to comment.