Files
eway/internal/storage/badger/queue.go
Aleksandr Trushkin b0a185561d queue api
2024-02-03 18:15:26 +03:00

128 lines
2.6 KiB
Go

package badger
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"time"
"git.loyso.art/frx/eway/internal/entity"
"github.com/dgraph-io/badger/v4"
)
type queueClient struct {
namedClient
seqGen *badger.Sequence
}
func newQueueClient(nc namedClient, seqGen *badger.Sequence) queueClient {
return queueClient{
namedClient: nc,
seqGen: seqGen,
}
}
type taskDB struct {
createdAt int64
body []byte
}
func (t taskDB) asBinary() []byte {
buf := make([]byte, 8+len(t.body))
binary.BigEndian.PutUint64(buf[:8], uint64(t.createdAt))
copy(buf[8:], t.body)
return buf
}
func (t *taskDB) fromBinary(data []byte) error {
if len(data) < 8 {
return errors.New("bad data")
}
t.createdAt = int64(binary.BigEndian.Uint64(data[:8]))
if len(data) == 8 {
return nil
}
t.body = make([]byte, len(t.body)-8)
copy(t.body, data[8:])
return nil
}
func (c queueClient) Publish(ctx context.Context, params entity.PublishParams) (task entity.Task, err error) {
task.ID, err = c.seqGen.Next()
if err != nil {
return task, fmt.Errorf("generating id: %w", err)
}
task.CreatedAt = time.Now()
tdb := taskDB{
createdAt: task.CreatedAt.Unix(),
body: params.Body,
}
var keyData [8]byte
binary.BigEndian.PutUint64(keyData[:], task.ID)
opts := make([]putOpt, 0, 1)
if !params.ExpiresAt.IsZero() {
duration := params.ExpiresAt.Sub(time.Now())
opts = append(opts, withTTL(duration))
}
err = c.namedClient.Put(keyData[:], tdb.asBinary(), opts...)
if err != nil {
return entity.Task{}, fmt.Errorf("saving data: %w", err)
}
return task, nil
}
func (c queueClient) Consume(ctx context.Context) (task entity.Task, err error) {
err = c.db.View(func(txn *badger.Txn) error {
iterOpts := badger.DefaultIteratorOptions
iterOpts.PrefetchSize = 1
iterOpts.PrefetchValues = true
iterOpts.Prefix = c.table
iterOpts.Reverse = false
iter := txn.NewIterator(iterOpts)
defer iter.Close()
iter.Seek(c.table)
if !iter.ValidForPrefix(c.table) {
return entity.ErrNotFound
}
item := iter.Item()
keyData := item.KeyCopy(nil)
valueData, err := item.ValueCopy(nil)
if err != nil {
return fmt.Errorf("getting value: %w", err)
}
var tdb taskDB
err = tdb.fromBinary(valueData)
if err != nil {
return err
}
task.ID = binary.BigEndian.Uint64(bytes.TrimPrefix(keyData, c.table))
task.CreatedAt = time.Unix(tdb.createdAt, 0)
task.Body = tdb.body
if expiresAt := item.ExpiresAt(); expiresAt > 0 {
t := time.Now().Add(time.Second * time.Duration(expiresAt))
task.ExpiresAt = &t
}
return nil
})
if err != nil {
return entity.Task{}, err
}
return task, nil
}