queue api

This commit is contained in:
Aleksandr Trushkin
2024-02-03 18:15:26 +03:00
parent 6acb3dc199
commit 6044e116f8
3 changed files with 220 additions and 0 deletions

23
internal/entity/task.go Normal file
View File

@ -0,0 +1,23 @@
package entity
import (
"context"
"time"
)
type PublishParams struct {
Body []byte
ExpiresAt time.Time
}
type MessageQueue interface {
Publish(context.Context, PublishParams) (Task, error)
Consume(context.Context) (Task, error)
}
type Task struct {
ID uint64
CreatedAt time.Time
ExpiresAt *time.Time
Body []byte
}

View File

@ -2,6 +2,8 @@ package badger
import (
"fmt"
"time"
"unsafe"
"git.loyso.art/frx/eway/internal/entity"
badger "github.com/dgraph-io/badger/v4"
@ -9,11 +11,13 @@ import (
var (
categorySequenceIDKey = []byte("!!cat_seq!!")
queueSequenceIDKey = []byte("!!que_seq!!")
)
type client struct {
db *badger.DB
nextCategoryIDSeq *badger.Sequence
nextQueueIDSeq *badger.Sequence
}
func NewClient(db *badger.DB) (*client, error) {
@ -21,10 +25,15 @@ func NewClient(db *badger.DB) (*client, error) {
if err != nil {
return nil, fmt.Errorf("getting sequence for categories: %w", err)
}
queueSeqGen, err := db.GetSequence(queueSequenceIDKey, 10)
if err != nil {
return nil, fmt.Errorf("getting sequence for queues: %w", err)
}
return &client{
db: db,
nextCategoryIDSeq: categorySeqGen,
nextQueueIDSeq: queueSeqGen,
}, nil
}
@ -46,3 +55,64 @@ func (c *client) Category() entity.CategoryRepository {
func (c *client) GoodsItem() entity.GoodsItemRepository {
return newGoodsItemClient(c.db)
}
func (c *client) QueueClient() entity.MessageQueue {
nc := c.Table("queues")
return newQueueClient(nc, c.nextQueueIDSeq)
}
func (c *client) Table(name string) namedClient {
tableBytes := unsafe.Slice(unsafe.StringData("!!"+name+"!!"), len(name)+4)
return namedClient{
table: tableBytes,
db: c.db,
}
}
type namedClient struct {
table []byte
db *badger.DB
}
type putOpt func(*badger.Entry)
func withTTL(duration time.Duration) putOpt {
return func(e *badger.Entry) {
e.WithTTL(duration)
}
}
func (c *namedClient) Put(key, value []byte, opts ...putOpt) error {
return c.db.Update(func(txn *badger.Txn) error {
tableKey := c.makeKey(key)
entry := badger.NewEntry(tableKey, value)
for _, opt := range opts {
opt(entry)
}
return txn.SetEntry(entry)
})
}
func (c *namedClient) Get(key []byte) ([]byte, error) {
var out []byte
err := c.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(c.makeKey(key))
if err != nil {
return err
}
out = make([]byte, item.ValueSize())
out, err = item.ValueCopy(out)
return err
})
if err != nil {
return nil, err
}
return out, nil
}
func (c *namedClient) makeKey(key []byte) (out []byte) {
return append(c.table, key...)
}

View File

@ -0,0 +1,127 @@
package badger
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"time"
"git.loyso.art/frx/eway/internal/entity"
"github.com/dgraph-io/badger/v4"
)
type queueClient struct {
namedClient
seqGen *badger.Sequence
}
func newQueueClient(nc namedClient, seqGen *badger.Sequence) queueClient {
return queueClient{
namedClient: nc,
seqGen: seqGen,
}
}
type taskDB struct {
createdAt int64
body []byte
}
func (t taskDB) asBinary() []byte {
buf := make([]byte, 8+len(t.body))
binary.BigEndian.PutUint64(buf[:8], uint64(t.createdAt))
copy(buf[8:], t.body)
return buf
}
func (t *taskDB) fromBinary(data []byte) error {
if len(data) < 8 {
return errors.New("bad data")
}
t.createdAt = int64(binary.BigEndian.Uint64(data[:8]))
if len(data) == 8 {
return nil
}
t.body = make([]byte, len(t.body)-8)
copy(t.body, data[8:])
return nil
}
func (c queueClient) Publish(ctx context.Context, params entity.PublishParams) (task entity.Task, err error) {
task.ID, err = c.seqGen.Next()
if err != nil {
return task, fmt.Errorf("generating id: %w", err)
}
task.CreatedAt = time.Now()
tdb := taskDB{
createdAt: task.CreatedAt.Unix(),
body: params.Body,
}
var keyData [8]byte
binary.BigEndian.PutUint64(keyData[:], task.ID)
opts := make([]putOpt, 0, 1)
if !params.ExpiresAt.IsZero() {
duration := params.ExpiresAt.Sub(time.Now())
opts = append(opts, withTTL(duration))
}
err = c.namedClient.Put(keyData[:], tdb.asBinary(), opts...)
if err != nil {
return entity.Task{}, fmt.Errorf("saving data: %w", err)
}
return task, nil
}
func (c queueClient) Consume(ctx context.Context) (task entity.Task, err error) {
err = c.db.View(func(txn *badger.Txn) error {
iterOpts := badger.DefaultIteratorOptions
iterOpts.PrefetchSize = 1
iterOpts.PrefetchValues = true
iterOpts.Prefix = c.table
iterOpts.Reverse = false
iter := txn.NewIterator(iterOpts)
defer iter.Close()
iter.Seek(c.table)
if !iter.ValidForPrefix(c.table) {
return entity.ErrNotFound
}
item := iter.Item()
keyData := item.KeyCopy(nil)
valueData, err := item.ValueCopy(nil)
if err != nil {
return fmt.Errorf("getting value: %w", err)
}
var tdb taskDB
err = tdb.fromBinary(valueData)
if err != nil {
return err
}
task.ID = binary.BigEndian.Uint64(bytes.TrimPrefix(keyData, c.table))
task.CreatedAt = time.Unix(tdb.createdAt, 0)
task.Body = tdb.body
if expiresAt := item.ExpiresAt(); expiresAt > 0 {
t := time.Now().Add(time.Second * time.Duration(expiresAt))
task.ExpiresAt = &t
}
return nil
})
if err != nil {
return entity.Task{}, err
}
return task, nil
}