From b0a185561d5a134bd837f6ce4a12ffaf3d6e6807 Mon Sep 17 00:00:00 2001 From: Aleksandr Trushkin Date: Sat, 3 Feb 2024 18:15:26 +0300 Subject: [PATCH] queue api --- internal/entity/task.go | 23 ++++++ internal/storage/badger/client.go | 70 ++++++++++++++++ internal/storage/badger/queue.go | 127 ++++++++++++++++++++++++++++++ 3 files changed, 220 insertions(+) create mode 100644 internal/entity/task.go create mode 100644 internal/storage/badger/queue.go diff --git a/internal/entity/task.go b/internal/entity/task.go new file mode 100644 index 0000000..c6745c5 --- /dev/null +++ b/internal/entity/task.go @@ -0,0 +1,23 @@ +package entity + +import ( + "context" + "time" +) + +type PublishParams struct { + Body []byte + ExpiresAt time.Time +} + +type MessageQueue interface { + Publish(context.Context, PublishParams) (Task, error) + Consume(context.Context) (Task, error) +} + +type Task struct { + ID uint64 + CreatedAt time.Time + ExpiresAt *time.Time + Body []byte +} diff --git a/internal/storage/badger/client.go b/internal/storage/badger/client.go index e12d8d6..0b92d90 100644 --- a/internal/storage/badger/client.go +++ b/internal/storage/badger/client.go @@ -2,6 +2,8 @@ package badger import ( "fmt" + "time" + "unsafe" "git.loyso.art/frx/eway/internal/entity" badger "github.com/dgraph-io/badger/v4" @@ -9,11 +11,13 @@ import ( var ( categorySequenceIDKey = []byte("!!cat_seq!!") + queueSequenceIDKey = []byte("!!que_seq!!") ) type client struct { db *badger.DB nextCategoryIDSeq *badger.Sequence + nextQueueIDSeq *badger.Sequence } func NewClient(db *badger.DB) (*client, error) { @@ -21,10 +25,15 @@ func NewClient(db *badger.DB) (*client, error) { if err != nil { return nil, fmt.Errorf("getting sequence for categories: %w", err) } + queueSeqGen, err := db.GetSequence(queueSequenceIDKey, 10) + if err != nil { + return nil, fmt.Errorf("getting sequence for queues: %w", err) + } return &client{ db: db, nextCategoryIDSeq: categorySeqGen, + nextQueueIDSeq: queueSeqGen, }, nil } @@ -46,3 +55,64 @@ func (c *client) Category() entity.CategoryRepository { func (c *client) GoodsItem() entity.GoodsItemRepository { return newGoodsItemClient(c.db) } + +func (c *client) QueueClient() entity.MessageQueue { + nc := c.Table("queues") + return newQueueClient(nc, c.nextQueueIDSeq) +} + +func (c *client) Table(name string) namedClient { + tableBytes := unsafe.Slice(unsafe.StringData("!!"+name+"!!"), len(name)+4) + return namedClient{ + table: tableBytes, + db: c.db, + } +} + +type namedClient struct { + table []byte + db *badger.DB +} + +type putOpt func(*badger.Entry) + +func withTTL(duration time.Duration) putOpt { + return func(e *badger.Entry) { + e.WithTTL(duration) + } +} + +func (c *namedClient) Put(key, value []byte, opts ...putOpt) error { + return c.db.Update(func(txn *badger.Txn) error { + tableKey := c.makeKey(key) + entry := badger.NewEntry(tableKey, value) + for _, opt := range opts { + opt(entry) + } + + return txn.SetEntry(entry) + }) +} + +func (c *namedClient) Get(key []byte) ([]byte, error) { + var out []byte + err := c.db.View(func(txn *badger.Txn) error { + item, err := txn.Get(c.makeKey(key)) + if err != nil { + return err + } + + out = make([]byte, item.ValueSize()) + out, err = item.ValueCopy(out) + return err + }) + if err != nil { + return nil, err + } + + return out, nil +} + +func (c *namedClient) makeKey(key []byte) (out []byte) { + return append(c.table, key...) +} diff --git a/internal/storage/badger/queue.go b/internal/storage/badger/queue.go new file mode 100644 index 0000000..9482ce0 --- /dev/null +++ b/internal/storage/badger/queue.go @@ -0,0 +1,127 @@ +package badger + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "time" + + "git.loyso.art/frx/eway/internal/entity" + "github.com/dgraph-io/badger/v4" +) + +type queueClient struct { + namedClient + + seqGen *badger.Sequence +} + +func newQueueClient(nc namedClient, seqGen *badger.Sequence) queueClient { + return queueClient{ + namedClient: nc, + seqGen: seqGen, + } +} + +type taskDB struct { + createdAt int64 + body []byte +} + +func (t taskDB) asBinary() []byte { + buf := make([]byte, 8+len(t.body)) + binary.BigEndian.PutUint64(buf[:8], uint64(t.createdAt)) + copy(buf[8:], t.body) + + return buf +} + +func (t *taskDB) fromBinary(data []byte) error { + if len(data) < 8 { + return errors.New("bad data") + } + + t.createdAt = int64(binary.BigEndian.Uint64(data[:8])) + if len(data) == 8 { + return nil + } + + t.body = make([]byte, len(t.body)-8) + copy(t.body, data[8:]) + + return nil +} + +func (c queueClient) Publish(ctx context.Context, params entity.PublishParams) (task entity.Task, err error) { + task.ID, err = c.seqGen.Next() + if err != nil { + return task, fmt.Errorf("generating id: %w", err) + } + + task.CreatedAt = time.Now() + tdb := taskDB{ + createdAt: task.CreatedAt.Unix(), + body: params.Body, + } + + var keyData [8]byte + binary.BigEndian.PutUint64(keyData[:], task.ID) + opts := make([]putOpt, 0, 1) + if !params.ExpiresAt.IsZero() { + duration := params.ExpiresAt.Sub(time.Now()) + opts = append(opts, withTTL(duration)) + } + err = c.namedClient.Put(keyData[:], tdb.asBinary(), opts...) + if err != nil { + return entity.Task{}, fmt.Errorf("saving data: %w", err) + } + + return task, nil +} + +func (c queueClient) Consume(ctx context.Context) (task entity.Task, err error) { + err = c.db.View(func(txn *badger.Txn) error { + iterOpts := badger.DefaultIteratorOptions + iterOpts.PrefetchSize = 1 + iterOpts.PrefetchValues = true + iterOpts.Prefix = c.table + iterOpts.Reverse = false + iter := txn.NewIterator(iterOpts) + defer iter.Close() + + iter.Seek(c.table) + if !iter.ValidForPrefix(c.table) { + return entity.ErrNotFound + } + + item := iter.Item() + keyData := item.KeyCopy(nil) + valueData, err := item.ValueCopy(nil) + if err != nil { + return fmt.Errorf("getting value: %w", err) + } + + var tdb taskDB + err = tdb.fromBinary(valueData) + if err != nil { + return err + } + + task.ID = binary.BigEndian.Uint64(bytes.TrimPrefix(keyData, c.table)) + task.CreatedAt = time.Unix(tdb.createdAt, 0) + task.Body = tdb.body + if expiresAt := item.ExpiresAt(); expiresAt > 0 { + t := time.Now().Add(time.Second * time.Duration(expiresAt)) + task.ExpiresAt = &t + } + + return nil + }) + if err != nil { + return entity.Task{}, err + } + + return task, nil +}