314 lines
6.9 KiB
Go
314 lines
6.9 KiB
Go
package badger
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"unsafe"
|
|
|
|
"git.loyso.art/frx/eway/internal/encoding/fbs"
|
|
"git.loyso.art/frx/eway/internal/entity"
|
|
|
|
badger "github.com/dgraph-io/badger/v4"
|
|
"github.com/dgraph-io/ristretto/z"
|
|
"github.com/rs/zerolog"
|
|
)
|
|
|
|
const useJSON = false
|
|
|
|
type goodsItemClient struct {
|
|
db *badger.DB
|
|
|
|
s itemSerializer[entity.GoodsItem]
|
|
}
|
|
|
|
func newGoodsItemClient(db *badger.DB, serializeAsJSON bool) *goodsItemClient {
|
|
var s itemSerializer[entity.GoodsItem]
|
|
if serializeAsJSON {
|
|
s = goodsItemJSONSerializer{}
|
|
} else {
|
|
s = goodsItemFlatbufSerializer{}
|
|
}
|
|
return &goodsItemClient{
|
|
db: db,
|
|
s: s,
|
|
}
|
|
}
|
|
|
|
func (*goodsItemClient) prefix() []byte {
|
|
return []byte("!!goodsitem!!")
|
|
}
|
|
|
|
func (c *goodsItemClient) prefixed(key []byte) []byte {
|
|
return append(c.prefix(), key...)
|
|
}
|
|
|
|
func (c *goodsItemClient) prefixedStr(key string) []byte {
|
|
keyBytes := unsafe.Slice(unsafe.StringData(key), len(key))
|
|
return c.prefixed(keyBytes)
|
|
}
|
|
|
|
func (c *goodsItemClient) prefixedIDByCartInt64(key int64) []byte {
|
|
var keyBytes [8]byte
|
|
binary.BigEndian.PutUint64(keyBytes[:], uint64(key))
|
|
return c.prefixedIDByCart(keyBytes[:])
|
|
}
|
|
|
|
func (*goodsItemClient) prefixedIDByCart(key []byte) []byte {
|
|
return append([]byte("!!goodsitem_card_idx!!"), key...)
|
|
}
|
|
|
|
func (c *goodsItemClient) ListIter(
|
|
ctx context.Context, prefetchCount int,
|
|
) (out <-chan entity.GoodsItem, err error) {
|
|
stream := c.db.NewStream()
|
|
stream.Prefix = c.prefix()
|
|
stream.LogPrefix = "list_iter"
|
|
|
|
bus := make(chan entity.GoodsItem, prefetchCount)
|
|
stream.Send = func(buf *z.Buffer) error {
|
|
list, err := badger.BufferToKVList(buf)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, kv := range list.GetKv() {
|
|
var gooditem entity.GoodsItem
|
|
gooditem, err = c.s.Deserialize(kv.GetValue())
|
|
if err != nil {
|
|
return fmt.Errorf("deserializing item: %w", err)
|
|
}
|
|
|
|
bus <- gooditem
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
go func(ctx context.Context) {
|
|
defer close(bus)
|
|
|
|
err := stream.Orchestrate(ctx)
|
|
if err != nil {
|
|
zerolog.Ctx(ctx).Warn().Err(err).Msg("unable to orchestrate")
|
|
}
|
|
}(ctx)
|
|
|
|
return bus, nil
|
|
}
|
|
|
|
func (c *goodsItemClient) List(
|
|
ctx context.Context,
|
|
) (out []entity.GoodsItem, err error) {
|
|
err = c.db.View(func(txn *badger.Txn) error {
|
|
opts := badger.DefaultIteratorOptions
|
|
opts.PrefetchValues = true
|
|
opts.PrefetchSize = 20
|
|
iter := txn.NewIterator(opts)
|
|
defer iter.Close()
|
|
|
|
prefix := c.prefix()
|
|
var cursor int
|
|
for iter.Seek(prefix); iter.ValidForPrefix(prefix); iter.Next() {
|
|
cursor++
|
|
current := iter.Item()
|
|
err = current.Value(func(val []byte) error {
|
|
var goodsItem entity.GoodsItem
|
|
goodsItem, err = c.s.Deserialize(val)
|
|
if err != nil {
|
|
return fmt.Errorf("deserializing: %w", err)
|
|
}
|
|
|
|
out = append(out, goodsItem)
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("getting value: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("viewing: %w", err)
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
func (c *goodsItemClient) Get(
|
|
ctx context.Context,
|
|
sku string,
|
|
) (out entity.GoodsItem, err error) {
|
|
err = c.db.View(func(txn *badger.Txn) error {
|
|
key := unsafe.Slice(unsafe.StringData(sku), len(sku))
|
|
out, err = c.getBySKU(key, txn)
|
|
return err
|
|
})
|
|
if err != nil {
|
|
if errors.Is(err, badger.ErrKeyNotFound) {
|
|
return out, entity.ErrNotFound
|
|
}
|
|
|
|
return out, fmt.Errorf("viewing: %w", err)
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
func (c *goodsItemClient) GetByCart(ctx context.Context, id int64) (out entity.GoodsItem, err error) {
|
|
err = c.db.View(func(txn *badger.Txn) error {
|
|
idxKey := c.prefixedIDByCartInt64(id)
|
|
skuByCartIDItem, err := txn.Get(idxKey)
|
|
if err != nil {
|
|
return fmt.Errorf("getting key: %w", err)
|
|
}
|
|
|
|
sku := make([]byte, skuByCartIDItem.ValueSize())
|
|
sku, err = skuByCartIDItem.ValueCopy(sku)
|
|
if err != nil {
|
|
return fmt.Errorf("getting value of idx: %w", err)
|
|
}
|
|
|
|
// well, yeah, that's kind of dumb to trim prefix here and
|
|
// and prefix later, but who cares.
|
|
sku = bytes.TrimPrefix(sku, c.prefix())
|
|
out, err = c.getBySKU(sku, txn)
|
|
return err
|
|
})
|
|
if err != nil {
|
|
if errors.Is(err, badger.ErrKeyNotFound) {
|
|
return out, entity.ErrNotFound
|
|
}
|
|
|
|
return out, fmt.Errorf("viewing: %w", err)
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
func (c *goodsItemClient) UpsertMany(ctx context.Context, items ...entity.GoodsItem) ([]entity.GoodsItem, error) {
|
|
return items, c.upsertByBatch(ctx, items)
|
|
}
|
|
|
|
func (c *goodsItemClient) Delete(ctx context.Context, sku string) (out entity.GoodsItem, err error) {
|
|
err = c.db.Update(func(txn *badger.Txn) error {
|
|
skuKey := c.prefixedStr(sku)
|
|
out, err = c.getBySKU(skuKey, txn)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = txn.Delete(skuKey)
|
|
if err != nil {
|
|
return fmt.Errorf("deleting key: %w", err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return entity.GoodsItem{}, err
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
func (c *goodsItemClient) upsertByBatch(ctx context.Context, items []entity.GoodsItem) error {
|
|
batch := c.db.NewWriteBatch()
|
|
defer batch.Cancel()
|
|
|
|
err := func() error {
|
|
for _, item := range items {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
key := c.prefixedStr(item.Articul)
|
|
value, err := c.s.Serialize(item)
|
|
if err != nil {
|
|
return fmt.Errorf("serializing item: %w", err)
|
|
}
|
|
|
|
idxValue := make([]byte, len(key))
|
|
copy(idxValue, key)
|
|
|
|
coreEntry := badger.NewEntry(key, value)
|
|
if err := batch.SetEntry(coreEntry); err != nil {
|
|
return fmt.Errorf("setting core entry: %w", err)
|
|
}
|
|
|
|
idxKey := c.prefixedIDByCartInt64(item.Cart)
|
|
idxEntry := badger.NewEntry(idxKey, idxValue)
|
|
if err := batch.SetEntry(idxEntry); err != nil {
|
|
return fmt.Errorf("setting index entry: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}()
|
|
if err != nil && !errors.Is(err, context.Canceled) {
|
|
return err
|
|
}
|
|
|
|
err = batch.Flush()
|
|
if err != nil {
|
|
return fmt.Errorf("flushing changes: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *goodsItemClient) getBySKU(sku []byte, txn *badger.Txn) (out entity.GoodsItem, err error) {
|
|
item, err := txn.Get(c.prefixed(sku))
|
|
if err != nil {
|
|
return out, fmt.Errorf("getting key: %w", err)
|
|
}
|
|
|
|
err = item.Value(func(val []byte) error {
|
|
out, err = c.s.Deserialize(val)
|
|
return err
|
|
})
|
|
if err != nil {
|
|
return out, fmt.Errorf("reading value: %w", err)
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
type itemSerializer[T any] interface {
|
|
Serialize(T) ([]byte, error)
|
|
Deserialize([]byte) (T, error)
|
|
}
|
|
|
|
type goodsItemJSONSerializer struct{}
|
|
|
|
func (goodsItemJSONSerializer) Serialize(in entity.GoodsItem) ([]byte, error) {
|
|
return json.Marshal(in)
|
|
}
|
|
|
|
func (goodsItemJSONSerializer) Deserialize(data []byte) (in entity.GoodsItem, err error) {
|
|
err = json.Unmarshal(data, &in)
|
|
return in, err
|
|
}
|
|
|
|
type goodsItemFlatbufSerializer struct{}
|
|
|
|
func (goodsItemFlatbufSerializer) Serialize(in entity.GoodsItem) ([]byte, error) {
|
|
out := fbs.MakeDomainGoodItemFinished(in)
|
|
return out, nil
|
|
}
|
|
|
|
func (goodsItemFlatbufSerializer) Deserialize(data []byte) (out entity.GoodsItem, err error) {
|
|
out, err = fbs.ParseGoodsItem(data)
|
|
if err != nil {
|
|
return entity.GoodsItem{}, err
|
|
}
|
|
return out, nil
|
|
}
|