From e1665cbd594133778a388e0823dd6101adf801b0 Mon Sep 17 00:00:00 2001 From: "mike.art" Date: Fri, 11 Oct 2024 11:41:24 +0200 Subject: [PATCH] Add message producer --- _examples/queues/.gitignore | 3 + _examples/queues/Makefile | 12 ++++ _examples/queues/go.mod | 7 ++ _examples/queues/go.sum | 0 _examples/queues/main.go | 105 +++++++++++++++++++++++++++++ _examples/queues/wrangler.toml | 13 ++++ cloudflare/queues/content_type.go | 58 ++++++++++++++++ cloudflare/queues/producer.go | 105 +++++++++++++++++++++++++++++ cloudflare/queues/producer_opts.go | 51 ++++++++++++++ internal/jsutil/jsutil.go | 6 +- 10 files changed, 359 insertions(+), 1 deletion(-) create mode 100644 _examples/queues/.gitignore create mode 100644 _examples/queues/Makefile create mode 100644 _examples/queues/go.mod create mode 100644 _examples/queues/go.sum create mode 100644 _examples/queues/main.go create mode 100644 _examples/queues/wrangler.toml create mode 100644 cloudflare/queues/content_type.go create mode 100644 cloudflare/queues/producer.go create mode 100644 cloudflare/queues/producer_opts.go diff --git a/_examples/queues/.gitignore b/_examples/queues/.gitignore new file mode 100644 index 0000000..aee7b7e --- /dev/null +++ b/_examples/queues/.gitignore @@ -0,0 +1,3 @@ +build +node_modules +.wrangler diff --git a/_examples/queues/Makefile b/_examples/queues/Makefile new file mode 100644 index 0000000..db68197 --- /dev/null +++ b/_examples/queues/Makefile @@ -0,0 +1,12 @@ +.PHONY: dev +dev: + npx wrangler dev --port 8787 + +.PHONY: build +build: + go run ../../cmd/workers-assets-gen + tinygo build -o ./build/app.wasm -target wasm -no-debug ./... + +.PHONY: deploy +deploy: + npx wrangler deploy diff --git a/_examples/queues/go.mod b/_examples/queues/go.mod new file mode 100644 index 0000000..779f83b --- /dev/null +++ b/_examples/queues/go.mod @@ -0,0 +1,7 @@ +module github.com/syumai/workers/_examples/queues + +go 1.22.8 + +require github.com/syumai/workers v0.0.0 + +replace github.com/syumai/workers => ../../ diff --git a/_examples/queues/go.sum b/_examples/queues/go.sum new file mode 100644 index 0000000..e69de29 diff --git a/_examples/queues/main.go b/_examples/queues/main.go new file mode 100644 index 0000000..c899b15 --- /dev/null +++ b/_examples/queues/main.go @@ -0,0 +1,105 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + + "github.com/syumai/workers" + "github.com/syumai/workers/cloudflare/queues" +) + +const queueName = "QUEUE" + +func handleErr(w http.ResponseWriter, msg string, err error) { + log.Println(err) + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(msg)) +} + +func main() { + http.HandleFunc("/", handleProduce) + workers.Serve(nil) +} +func handleProduce(w http.ResponseWriter, req *http.Request) { + if req.URL.Path != "/" { + w.WriteHeader(http.StatusNotFound) + return + } + + if req.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + defer req.Body.Close() + + q, err := queues.NewProducer(queueName) + if err != nil { + handleErr(w, "failed to init queue", err) + } + + contentType := req.Header.Get("Content-Type") + switch contentType { + case "text/plain": + log.Println("Handling text content type") + err = produceText(q, req) + case "application/json": + log.Println("Handling json content type") + err = produceJson(q, req) + default: + log.Println("Handling bytes content type") + err = produceBytes(q, req) + } + + if err != nil { + handleErr(w, "failed to handle request", err) + return + } + + w.WriteHeader(http.StatusOK) + w.Write([]byte("message sent\n")) +} + +func produceText(q *queues.Producer, req *http.Request) error { + content, err := io.ReadAll(req.Body) + if err != nil { + return fmt.Errorf("failed to read request body: %w", err) + } + if len(content) == 0 { + return fmt.Errorf("empty request body") + } + + // text content type supports string and []byte messages + if err := q.Send(content, queues.WithContentType(queues.QueueContentTypeText)); err != nil { + return fmt.Errorf("failed to send message: %w", err) + } + + return nil +} + +func produceJson(q *queues.Producer, req *http.Request) error { + var data any + if err := json.NewDecoder(req.Body).Decode(&data); err != nil { + return fmt.Errorf("failed to read request body: %w", err) + } + + // json content type is default and therefore can be omitted + // json content type supports messages of types that can be serialized to json + if err := q.Send(data); err != nil { + return fmt.Errorf("failed to send message: %w", err) + } + + return nil +} + +func produceBytes(q *queues.Producer, req *http.Request) error { + // bytes content type support messages of type []byte, string, and io.Reader + if err := q.Send(req.Body, queues.WithContentType(queues.QueueContentTypeBytes)); err != nil { + return fmt.Errorf("failed to send message: %w", err) + } + + return nil +} diff --git a/_examples/queues/wrangler.toml b/_examples/queues/wrangler.toml new file mode 100644 index 0000000..0965832 --- /dev/null +++ b/_examples/queues/wrangler.toml @@ -0,0 +1,13 @@ +name = "queues-producer" +main = "./build/worker.mjs" +compatibility_date = "2022-05-13" +compatibility_flags = [ + "streams_enable_constructors" +] + +[[queues.producers]] +queue = "my-queue" +binding = "QUEUE" + +[build] +command = "make build" diff --git a/cloudflare/queues/content_type.go b/cloudflare/queues/content_type.go new file mode 100644 index 0000000..63cc82b --- /dev/null +++ b/cloudflare/queues/content_type.go @@ -0,0 +1,58 @@ +package queues + +import ( + "fmt" + "io" + "syscall/js" + + "github.com/syumai/workers/internal/jsutil" +) + +type QueueContentType string + +const ( + QueueContentTypeJSON QueueContentType = "json" + QueueContentTypeText QueueContentType = "text" + QueueContentTypeBytes QueueContentType = "bytes" + QueueContentTypeV8 QueueContentType = "v8" +) + +func (o QueueContentType) mapValue(val any) (js.Value, error) { + switch o { + case QueueContentTypeText: + switch v := val.(type) { + case string: + return js.ValueOf(v), nil + case []byte: + return js.ValueOf(string(v)), nil + default: + return js.Undefined(), fmt.Errorf("invalid value type for text content type: %T", val) + } + + case QueueContentTypeBytes: + var b []byte + switch v := val.(type) { + case string: + b = []byte(v) + case []byte: + b = v + case io.Reader: + var err error + b, err = io.ReadAll(v) + if err != nil { + return js.Undefined(), fmt.Errorf("failed to read bytes from reader: %w", err) + } + default: + return js.Undefined(), fmt.Errorf("invalid value type for bytes content type: %T", val) + } + + ua := jsutil.NewUint8Array(len(b)) + js.CopyBytesToJS(ua, b) + return ua.Get("buffer"), nil + + case QueueContentTypeJSON, QueueContentTypeV8: + return js.ValueOf(val), nil + } + + return js.Undefined(), fmt.Errorf("unknown content type: %s", o) +} diff --git a/cloudflare/queues/producer.go b/cloudflare/queues/producer.go new file mode 100644 index 0000000..972408c --- /dev/null +++ b/cloudflare/queues/producer.go @@ -0,0 +1,105 @@ +package queues + +import ( + "errors" + "fmt" + "syscall/js" + + "github.com/syumai/workers/cloudflare/internal/cfruntimecontext" + "github.com/syumai/workers/internal/jsutil" +) + +type BatchMessage struct { + body any + options *sendOptions +} + +func NewBatchMessage(body any, opts ...SendOption) *BatchMessage { + options := defaultSendOptions() + for _, opt := range opts { + opt(options) + } + return &BatchMessage{body: body, options: options} +} + +func (m *BatchMessage) toJS() (js.Value, error) { + if m == nil { + return js.Undefined(), errors.New("message is nil") + } + + jsValue, err := m.options.ContentType.mapValue(m.body) + if err != nil { + return js.Undefined(), err + } + + obj := jsutil.NewObject() + obj.Set("body", jsValue) + obj.Set("options", m.options.toJS()) + + return obj, nil +} + +type Producer struct { + // queue - Objects that Queue API belongs to. Default is Global + queue js.Value +} + +func NewProducer(queueName string) (*Producer, error) { + inst := cfruntimecontext.MustGetRuntimeContextEnv().Get(queueName) + if inst.IsUndefined() { + return nil, fmt.Errorf("%s is undefined", queueName) + } + return &Producer{queue: inst}, nil +} + +func (p *Producer) Send(content any, opts ...SendOption) error { + if p.queue.IsUndefined() { + return errors.New("queue object not found") + } + + options := defaultSendOptions() + for _, opt := range opts { + opt(options) + } + + jsValue, err := options.ContentType.mapValue(content) + if err != nil { + return err + } + + prom := p.queue.Call("send", jsValue, options.toJS()) + _, err = jsutil.AwaitPromise(prom) + return err +} + +func (p *Producer) SendBatch(messages []*BatchMessage) error { + if p.queue.IsUndefined() { + return errors.New("queue object not found") + } + + if len(messages) == 0 { + return nil + } + + jsArray := jsutil.NewArray(len(messages)) + for i, message := range messages { + jsValue, err := message.toJS() + if err != nil { + return fmt.Errorf("failed to convert message %d to JS: %w", i, err) + } + jsArray.SetIndex(i, jsValue) + } + + prom := p.queue.Call("sendBatch", jsArray) + _, err := jsutil.AwaitPromise(prom) + return err +} + +func (p *Producer) SendJsonBatch(messages ...any) error { + batch := make([]*BatchMessage, len(messages)) + for i, message := range messages { + batch[i] = NewBatchMessage(message) + } + + return p.SendBatch(batch) +} diff --git a/cloudflare/queues/producer_opts.go b/cloudflare/queues/producer_opts.go new file mode 100644 index 0000000..8ef07f9 --- /dev/null +++ b/cloudflare/queues/producer_opts.go @@ -0,0 +1,51 @@ +package queues + +import ( + "syscall/js" + "time" + + "github.com/syumai/workers/internal/jsutil" +) + +type sendOptions struct { + // ContentType - Content type of the message + // Default is "json" + ContentType QueueContentType + + // DelaySeconds - The number of seconds to delay the message. + // Default is 0 + DelaySeconds int +} + +func defaultSendOptions() *sendOptions { + return &sendOptions{ + ContentType: QueueContentTypeJSON, + } +} + +func (o *sendOptions) toJS() js.Value { + obj := jsutil.NewObject() + obj.Set("contentType", string(QueueContentTypeJSON)) + + if o.DelaySeconds != 0 { + obj.Set("delaySeconds", o.DelaySeconds) + } + + return obj +} + +type SendOption func(*sendOptions) + +// WithContentType changes the content type of the message. +func WithContentType(contentType QueueContentType) SendOption { + return func(o *sendOptions) { + o.ContentType = contentType + } +} + +// WithDelay changes the number of seconds to delay the message. +func (q *Producer) WithDelay(d time.Duration) SendOption { + return func(o *sendOptions) { + o.DelaySeconds = int(d.Seconds()) + } +} diff --git a/internal/jsutil/jsutil.go b/internal/jsutil/jsutil.go index ececd04..5210d86 100644 --- a/internal/jsutil/jsutil.go +++ b/internal/jsutil/jsutil.go @@ -26,6 +26,10 @@ func NewObject() js.Value { return ObjectClass.New() } +func NewArray(size int) js.Value { + return ArrayClass.New(size) +} + func NewUint8Array(size int) js.Value { return Uint8ArrayClass.New(size) } @@ -89,7 +93,7 @@ func StrRecordToMap(v js.Value) map[string]string { return result } -// MaybeString returns string value of given JavaScript value or returns nil if the value is undefined. +// MaybeString returns string value of given JavaScript value or returns "" if the value is undefined. func MaybeString(v js.Value) string { if v.IsUndefined() { return ""