Skip to content

Commit

Permalink
Merge pull request #1 from josebalius/jg/initial-functionality
Browse files Browse the repository at this point in the history
Initial implementation of azqlite
  • Loading branch information
josebalius authored Mar 12, 2022
2 parents 5c847fa + 8df61c6 commit bb93708
Show file tree
Hide file tree
Showing 7 changed files with 636 additions and 0 deletions.
58 changes: 58 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1 +1,59 @@
# azqlite

azqlite is lightweight wrapper around github.com/Azure/azure-storage-queue-go/azqueue to interact with the Azure Storage Queue service in a simpler and more idiomatic way.

## Install

```
go get github.com/josebalius/azqlite
```

## How to use

### Instantiate a service
```
storageService, err := azqlite.NewService(azqlite.Config{
AccountName: "YOUR_AZURE_STORAGE_ACCOUNT_NAME_HERE",
AccountKey: "YOUR_AZURE_STORAGE_ACCOUNT_KEY_HERE",
})
```

### Create a queue
```
q, err := storageService.CreateQueue(ctx, "test")
```

### Delete a queue
```
err = s.DeleteQueue(ctx, "test")
```

### Instantiate an existing queue
```
q := s.NewQueue("test")
```

### Get message account
```
c, err := q.MessageCount(ctx)
```

### Enqueue a message
```
m, err := q.Enqueue(ctx, "my message", 1*time.Second, -time.Second)
```

### Dequeue messages
```
messages, err := q.Dequeue(ctx, 30, 1*time.Second)
```

### Peek messages
```
messages, err := q.Peek(ctx, 30)
```

### Delete a message
```
err := q.Delete(ctx, &Message{ID: "1"})
```
13 changes: 13 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module github.com/josebalius/azqlite

go 1.17

require github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd

require (
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/mattn/go-ieproxy v0.0.1 // indirect
golang.org/x/net v0.0.0-20191112182307-2180aed22343 // indirect
golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea // indirect
golang.org/x/text v0.3.2 // indirect
)
21 changes: 21 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd h1:b3wyxBl3vvr15tUAziPBPK354y+LSdfPCpex5oBttHo=
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd/go.mod h1:K6am8mT+5iFXgingS9LUc7TmbsW6XBw3nxaRyaMyWc8=
github.com/mattn/go-ieproxy v0.0.1 h1:qiyop7gCflfhwCzGyeT0gro3sF9AIg9HU98JORTkqfI=
github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20191112182307-2180aed22343 h1:00ohfJ4K98s3m6BGUoBd8nyfp4Yl0GoIKvw5abItTjI=
golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea h1:Mz1TMnfJDRJLk8S8OPCoJYgrsp/Se/2TBre2+vwX128=
golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405 h1:829vOVxxusYHC+IqBtkX5mbKtsY9fheQiQn0MZRVLfQ=
gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
106 changes: 106 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package azqlite

import (
"context"
"fmt"
"time"

"github.com/Azure/azure-storage-queue-go/azqueue"
)

var _ Queue = &queue{}

type queue struct {
queueURL azqueue.QueueURL
messagesURL azqueue.MessagesURL
}

func newQueue(queueURL azqueue.QueueURL) *queue {
messagesURL := queueURL.NewMessagesURL()
return &queue{queueURL, messagesURL}
}

func (q *queue) MessageCount(ctx context.Context) (int, error) {
props, err := q.queueURL.GetProperties(ctx)
if err != nil {
return 0, fmt.Errorf("failed to get queue properties: %w", err)
}
return int(props.ApproximateMessagesCount()), nil
}

type Message struct {
ID string
PopReceipt string
DequeueCount int
Body string
}

func (q *queue) Dequeue(ctx context.Context, count int, timeout time.Duration) ([]*Message, error) {
dequeue, err := q.messagesURL.Dequeue(ctx, int32(count), timeout)
if err != nil {
return nil, fmt.Errorf("failed to dequeue messages: %w", err)
}

numMessages := dequeue.NumMessages()
if numMessages == 0 {
return nil, nil
}

messages := make([]*Message, numMessages)
for i := int32(0); i < numMessages; i++ {
msg := dequeue.Message(i)
messages[i] = &Message{
ID: string(msg.ID),
PopReceipt: string(msg.PopReceipt),
DequeueCount: int(msg.DequeueCount),
Body: msg.Text,
}
}

return messages, nil
}

func (q *queue) Enqueue(ctx context.Context, message string, timeout, ttl time.Duration) (*Message, error) {
msg, err := q.messagesURL.Enqueue(ctx, message, timeout, ttl)
if err != nil {
return nil, fmt.Errorf("failed to enqueue message: %w", err)
}

return &Message{
ID: string(msg.MessageID),
PopReceipt: string(msg.PopReceipt),
DequeueCount: 0,
Body: message,
}, nil
}

func (q *queue) Peek(ctx context.Context, count int) ([]*Message, error) {
peek, err := q.messagesURL.Peek(ctx, int32(count))
if err != nil {
return nil, fmt.Errorf("failed to peek messages: %w", err)
}

numMessages := peek.NumMessages()
if numMessages == 0 {
return nil, nil
}

messages := make([]*Message, numMessages)
for i := int32(0); i < numMessages; i++ {
msg := peek.Message(i)
messages[i] = &Message{
ID: string(msg.ID),
PopReceipt: "",
DequeueCount: int(msg.DequeueCount),
Body: msg.Text,
}
}

return messages, nil
}

func (q *queue) Delete(ctx context.Context, m *Message) error {
msgIDURL := q.messagesURL.NewMessageIDURL(azqueue.MessageID(m.ID))
_, err := msgIDURL.Delete(ctx, azqueue.PopReceipt(m.PopReceipt))
return err
}
Loading

0 comments on commit bb93708

Please sign in to comment.