131 lines
2.6 KiB
Go
131 lines
2.6 KiB
Go
package badger
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"git.loyso.art/frx/eway/internal/entity"
|
|
|
|
"github.com/dgraph-io/badger/v4"
|
|
)
|
|
|
|
type queueClient struct {
|
|
namedClient
|
|
|
|
seqGen *badger.Sequence
|
|
}
|
|
|
|
func newQueueClient(nc namedClient, seqGen *badger.Sequence) queueClient {
|
|
return queueClient{
|
|
namedClient: nc,
|
|
seqGen: seqGen,
|
|
}
|
|
}
|
|
|
|
type taskDB struct {
|
|
createdAt int64
|
|
body []byte
|
|
}
|
|
|
|
func (t taskDB) asBinary() []byte {
|
|
buf := make([]byte, 8+len(t.body))
|
|
binary.BigEndian.PutUint64(buf[:8], uint64(t.createdAt))
|
|
copy(buf[8:], t.body)
|
|
|
|
return buf
|
|
}
|
|
|
|
func (t *taskDB) fromBinary(data []byte) error {
|
|
if len(data) < 8 {
|
|
return errors.New("bad data")
|
|
}
|
|
|
|
t.createdAt = int64(binary.BigEndian.Uint64(data[:8]))
|
|
if len(data) == 8 {
|
|
return nil
|
|
}
|
|
|
|
t.body = make([]byte, len(t.body)-8)
|
|
copy(t.body, data[8:])
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c queueClient) Publish(ctx context.Context, params entity.PublishParams) (task entity.Task, err error) {
|
|
task.ID, err = c.seqGen.Next()
|
|
if err != nil {
|
|
return task, fmt.Errorf("generating id: %w", err)
|
|
}
|
|
|
|
task.CreatedAt = time.Now()
|
|
tdb := taskDB{
|
|
createdAt: task.CreatedAt.Unix(),
|
|
body: params.Body,
|
|
}
|
|
|
|
var keyData [8]byte
|
|
binary.BigEndian.PutUint64(keyData[:], task.ID)
|
|
opts := make([]putOpt, 0, 1)
|
|
if !params.ExpiresAt.IsZero() {
|
|
duration := time.Until(params.ExpiresAt)
|
|
opts = append(opts, withTTL(duration))
|
|
}
|
|
err = c.Put(keyData[:], tdb.asBinary(), opts...)
|
|
if err != nil {
|
|
return entity.Task{}, fmt.Errorf("saving data: %w", err)
|
|
}
|
|
|
|
return task, nil
|
|
}
|
|
|
|
func (c queueClient) Consume(ctx context.Context) (task entity.Task, err error) {
|
|
err = c.db.View(func(txn *badger.Txn) error {
|
|
iterOpts := badger.IteratorOptions{
|
|
PrefetchSize: 1,
|
|
PrefetchValues: true,
|
|
Reverse: false,
|
|
AllVersions: false,
|
|
Prefix: c.table,
|
|
}
|
|
iter := txn.NewIterator(iterOpts)
|
|
defer iter.Close()
|
|
|
|
iter.Seek(c.table)
|
|
if !iter.ValidForPrefix(c.table) {
|
|
return entity.ErrNotFound
|
|
}
|
|
|
|
item := iter.Item()
|
|
keyData := item.KeyCopy(nil)
|
|
valueData, err := item.ValueCopy(nil)
|
|
if err != nil {
|
|
return fmt.Errorf("getting value: %w", err)
|
|
}
|
|
|
|
var tdb taskDB
|
|
err = tdb.fromBinary(valueData)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
task.ID = binary.BigEndian.Uint64(bytes.TrimPrefix(keyData, c.table))
|
|
task.CreatedAt = time.Unix(tdb.createdAt, 0)
|
|
task.Body = tdb.body
|
|
if expiresAt := item.ExpiresAt(); expiresAt > 0 {
|
|
t := time.Now().Add(time.Second * time.Duration(expiresAt))
|
|
task.ExpiresAt = &t
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return entity.Task{}, err
|
|
}
|
|
|
|
return task, nil
|
|
}
|