diff --git a/.gitignore b/.gitignore index 3714b60..3d5bdff 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,2 @@ *.json *.zst -badger/ diff --git a/internal/storage/badger/category.go b/internal/storage/badger/category.go new file mode 100644 index 0000000..1441c62 --- /dev/null +++ b/internal/storage/badger/category.go @@ -0,0 +1,143 @@ +package badger + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + + "git.loyso.art/frx/eway/internal/encoding/fbs" + "git.loyso.art/frx/eway/internal/entity" + + badger "github.com/dgraph-io/badger/v4" + "github.com/rs/zerolog" +) + +type categoryClient struct { + db *badger.DB + seqGen *badger.Sequence +} + +func newCategoryClient(db *badger.DB, seqGen *badger.Sequence) categoryClient { + return categoryClient{ + db: db, + seqGen: seqGen, + } +} + +func (categoryClient) prefix() []byte { + return []byte("!!category!!") +} + +func (c categoryClient) prefixed(key []byte) []byte { + return append(c.prefix(), key...) +} + +func (c categoryClient) prefixedInt(key int64) []byte { + var keyBytes [8]byte + binary.BigEndian.PutUint64(keyBytes[:], uint64(key)) + + return c.prefixed(keyBytes[:]) +} + +func (c categoryClient) List( + ctx context.Context, +) (out []entity.Category, err error) { + err = c.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.PrefetchSize = 10 + opts.PrefetchValues = true + + iter := txn.NewIterator(opts) + defer iter.Close() + + prefix := c.prefix() + for iter.Seek(prefix); iter.ValidForPrefix(prefix); iter.Next() { + current := iter.Item() + err = current.Value(func(val []byte) error { + category := fbs.ParseCategory(val) + out = append(out, category) + + return nil + }) + if err != nil { + return fmt.Errorf("getting value: %w", err) + } + } + + return nil + }) + if err != nil { + return nil, fmt.Errorf("viewing: %w", err) + } + + return out, nil +} + +func (c categoryClient) Get(ctx context.Context, id int64) (out entity.Category, err error) { + err = c.db.View(func(txn *badger.Txn) error { + key := c.prefixedInt(id) + item, err := txn.Get(key) + if err != nil { + return fmt.Errorf("getting key: %w", err) + } + + err = item.Value(func(val []byte) error { + out = fbs.ParseCategory(val) + return nil + }) + if err != nil { + return fmt.Errorf("reading value: %w", err) + } + + return nil + }) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return out, entity.ErrNotFound + } + + return out, fmt.Errorf("viewing: %w", err) + } + + return out, nil +} + +// Create new category inside DB. It also applies new id to it. +func (c categoryClient) Create(ctx context.Context, name string) (out entity.Category, err error) { + seqGen, err := c.db.GetSequence(categorySequenceIDKey, 1) + if err != nil { + return out, fmt.Errorf("getting sequence for categories: %w", err) + } + defer func() { + errRelese := seqGen.Release() + if errRelese != nil { + zerolog.Ctx(ctx).Warn().Err(err).Msg("unable to release seq") + } + }() + + nextid, err := seqGen.Next() + if err != nil { + return out, fmt.Errorf("getting next id: %w", err) + } + + out = entity.Category{ + ID: int64(nextid), + Name: name, + } + + err = c.db.Update(func(txn *badger.Txn) error { + key := c.prefixedInt(out.ID) + err = txn.Set(key, fbs.MakeCategoryFinished(out)) + if err != nil { + return fmt.Errorf("setting: %w", err) + } + + return nil + }) + if err != nil { + return out, fmt.Errorf("updating: %w", err) + } + + return out, nil +} diff --git a/internal/storage/badger/client.go b/internal/storage/badger/client.go new file mode 100644 index 0000000..6091f89 --- /dev/null +++ b/internal/storage/badger/client.go @@ -0,0 +1,47 @@ +package badger + +import ( + "git.loyso.art/frx/eway/internal/entity" + badger "github.com/dgraph-io/badger/v4" +) + +var ( + categorySequenceIDKey = []byte("cat:") +) + +type client struct { + db *badger.DB + + // nextCategoryIDSeq *badger.Sequence +} + +func NewClient(db *badger.DB) (*client, error) { + // categorySeqGen, err := db.GetSequence(categorySequenceIDKey, 10) + // if err != nil { + // return nil, fmt.Errorf("getting sequence for categories: %w", err) + // } + // + return &client{ + db: db, + // nextCategoryIDSeq: categorySeqGen, + }, nil +} + +// Close closes the underlying sequences in the client. Should be called right before +// underlying *badger.DB closed. +func (c *client) Close() error { + // err := c.nextCategoryIDSeq.Release() + // if err != nil { + // return fmt.Errorf("releasing next_category_sequence: %w", err) + // } + + return nil +} + +func (c *client) Category() entity.CategoryRepository { + return newCategoryClient(c.db, nil) +} + +func (c *client) GoodsItem() entity.GoodsItemRepository { + return newGoodsItemClient(c.db) +} diff --git a/internal/storage/badger/db.go b/internal/storage/badger/db.go new file mode 100644 index 0000000..0c0fe16 --- /dev/null +++ b/internal/storage/badger/db.go @@ -0,0 +1,55 @@ +package badger + +import ( + "context" + "fmt" + "strings" + + badger "github.com/dgraph-io/badger/v4" + "github.com/rs/zerolog" +) + +type zerologAdapter struct { + log zerolog.Logger +} + +func (za zerologAdapter) Debugf(format string, args ...any) { + za.fmt(za.log.Debug(), format, args...) +} +func (za zerologAdapter) Infof(format string, args ...any) { + za.fmt(za.log.Info(), format, args...) +} +func (za zerologAdapter) Warningf(format string, args ...any) { + za.fmt(za.log.Warn(), format, args...) +} +func (za zerologAdapter) Errorf(format string, args ...any) { + za.fmt(za.log.Error(), format, args...) +} + +func (za zerologAdapter) fmt(event *zerolog.Event, format string, args ...any) { + event.Msgf(strings.TrimSuffix(format, "\n"), args...) +} + +func Open(ctx context.Context, path string, log zerolog.Logger) (*badger.DB, error) { + bl := zerologAdapter{ + log: log.With().Str("db", "badger").Logger(), + } + opts := badger.DefaultOptions(path). + WithLogger(bl). + WithLoggingLevel(badger.INFO). + WithValueLogFileSize(4 << 20). + WithDir(path). + WithValueDir(path) + // WithMaxLevels(4). + // WithMemTableSize(8 << 20). + // WithMetricsEnabled(true). + // WithCompactL0OnClose(true). + // WithBlockCacheSize(8 << 20) + + db, err := badger.Open(opts) + if err != nil { + return nil, fmt.Errorf("opening badger: %w", err) + } + + return db, nil +} diff --git a/internal/storage/badger/goodsitem.go b/internal/storage/badger/goodsitem.go new file mode 100644 index 0000000..6b39e86 --- /dev/null +++ b/internal/storage/badger/goodsitem.go @@ -0,0 +1,290 @@ +package badger + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "runtime" + "unsafe" + + "git.loyso.art/frx/eway/internal/encoding/fbs" + "git.loyso.art/frx/eway/internal/entity" + + badger "github.com/dgraph-io/badger/v4" + "github.com/dgraph-io/badger/v4/pb" + "github.com/dgraph-io/ristretto/z" + "github.com/rs/zerolog" +) + +type goodsItemClient struct { + db *badger.DB +} + +func newGoodsItemClient(db *badger.DB) *goodsItemClient { + return &goodsItemClient{ + db: db, + } +} + +func (*goodsItemClient) prefix() []byte { + return []byte("!!goodsitem!!") +} + +func (c *goodsItemClient) prefixed(key []byte) []byte { + return append(c.prefix(), key...) +} + +func (c *goodsItemClient) prefixedStr(key string) []byte { + keyBytes := unsafe.Slice(unsafe.StringData(key), len(key)) + return c.prefixed(keyBytes) +} + +func (c *goodsItemClient) prefixedIDByCartStr(key int64) []byte { + var keyBytes [8]byte + binary.BigEndian.PutUint64(keyBytes[:], uint64(key)) + return c.prefixedIDByCart(keyBytes[:]) +} + +func (*goodsItemClient) prefixedIDByCart(key []byte) []byte { + return append([]byte("!!goodsitem_card_idx!!"), key...) +} + +func (c *goodsItemClient) ListIter( + ctx context.Context, prefetchCount int, +) (out <-chan entity.GoodsItem, err error) { + stream := c.db.NewStream() + stream.Prefix = c.prefix() + stream.LogPrefix = "list_iter" + + bus := make(chan entity.GoodsItem, prefetchCount) + stream.Send = func(buf *z.Buffer) error { + list, err := badger.BufferToKVList(buf) + if err != nil { + return err + } + + for _, kv := range list.GetKv() { + bus <- fbs.ParseGoodsItem(kv.GetValue()) + } + + return nil + } + + go func(ctx context.Context) { + defer close(bus) + + err := stream.Orchestrate(context.Background()) + if err != nil { + zerolog.Ctx(ctx).Warn().Err(err).Msg("unable to orchestrate") + } + }(ctx) + + return bus, nil +} + +func (c *goodsItemClient) List( + ctx context.Context, +) (out []entity.GoodsItem, err error) { + err = c.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = true + opts.PrefetchSize = 20 + iter := txn.NewIterator(opts) + defer iter.Close() + + prefix := c.prefix() + for iter.Seek(prefix); iter.ValidForPrefix(prefix); iter.Next() { + current := iter.Item() + err = current.Value(func(val []byte) error { + goodsItem := fbs.ParseGoodsItem(val) + out = append(out, goodsItem) + + return nil + }) + if err != nil { + return fmt.Errorf("getting value: %w", err) + } + } + + return nil + }) + if err != nil { + return nil, fmt.Errorf("viewing: %w", err) + } + + return out, nil +} + +func (c *goodsItemClient) Get( + ctx context.Context, + sku string, +) (out entity.GoodsItem, err error) { + err = c.db.View(func(txn *badger.Txn) error { + key := unsafe.Slice(unsafe.StringData(sku), len(sku)) + out, err = c.getBySKU(key, txn) + return err + }) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return out, entity.ErrNotFound + } + + return out, fmt.Errorf("viewing: %w", err) + } + + return out, nil +} + +func (c *goodsItemClient) GetByCart(ctx context.Context, id int64) (out entity.GoodsItem, err error) { + err = c.db.View(func(txn *badger.Txn) error { + var idByte [8]byte + binary.BigEndian.PutUint64(idByte[:], uint64(id)) + + item, err := txn.Get(c.prefixedIDByCart(idByte[:])) + if err != nil { + return fmt.Errorf("getting key: %w", err) + } + + sku := make([]byte, item.ValueSize()) + sku, err = item.ValueCopy(sku) + if err != nil { + return fmt.Errorf("getting value of idx: %w", err) + } + + out, err = c.getBySKU(sku, txn) + return err + }) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return out, entity.ErrNotFound + } + + return out, fmt.Errorf("viewing: %w", err) + } + + return out, nil +} + +func (c *goodsItemClient) UpsertMany(ctx context.Context, items ...entity.GoodsItem) ([]entity.GoodsItem, error) { + return items, c.upsertByOne(ctx, items) +} + +func (c *goodsItemClient) upsertByOne(ctx context.Context, items []entity.GoodsItem) error { + return c.db.Update(func(txn *badger.Txn) error { + for _, item := range items { + key := c.prefixedStr(item.Articul) + value := fbs.MakeDomainGoodItemFinished(item) + valueIdx := make([]byte, len(key)) + copy(valueIdx, key) + + err := txn.Set(key, value) + if err != nil { + return err + } + + err = txn.Set(c.prefixedIDByCartStr(item.Cart), valueIdx) + if err != nil { + return err + } + } + + return nil + }) +} + +func (c *goodsItemClient) upsertByStream(ctx context.Context, items []entity.GoodsItem) error { + stream := c.db.NewStreamWriter() + defer stream.Cancel() + + err := stream.Prepare() + if err != nil { + return fmt.Errorf("preparing stream: %w", err) + } + + buf := z.NewBuffer(len(items), "sometag") + for _, item := range items { + key := c.prefixedStr(item.Articul) + keyIdx := c.prefixedIDByCartStr(item.Cart) + value := fbs.MakeDomainGoodItemFinished(item) + + itemKV := &pb.KV{Key: key, Value: value} + itemKVIdx := &pb.KV{Key: keyIdx, Value: key} + + badger.KVToBuffer(itemKV, buf) + badger.KVToBuffer(itemKVIdx, buf) + } + + err = stream.Write(buf) + if err != nil { + return fmt.Errorf("writing buf: %w", err) + } + + err = stream.Flush() + if err != nil { + return fmt.Errorf("flushing changes: %w", err) + } + + return nil +} + +func (c *goodsItemClient) upsertByBatch(ctx context.Context, items []entity.GoodsItem) error { + batch := c.db.NewWriteBatch() + defer func() { + println("closing batch") + batch.Cancel() + }() + + log := zerolog.Ctx(ctx) + for _, item := range items { + select { + case <-ctx.Done(): + break + default: + } + key := c.prefixedStr(item.Articul) + value := fbs.MakeDomainGoodItemFinished(item) + idxValue := make([]byte, len(key)) + copy(idxValue, key) + coreEntry := badger.NewEntry(key, value) + if err := batch.SetEntry(coreEntry); err != nil { + log.Warn().Err(err).Msg("unable to set item, breaking") + break + } + + idxKey := c.prefixedIDByCartStr(item.Cart) + idxEntry := badger.NewEntry(idxKey, idxValue) + if err := batch.SetEntry(idxEntry); err != nil { + log.Warn().Err(err).Msg("unable to set idx, breaking") + break + } + runtime.Gosched() + } + + println("flushing") + err := batch.Flush() + runtime.Gosched() + if err != nil { + println("flush err", err.Error()) + return fmt.Errorf("flushing changes: %w", err) + } + + return nil +} + +func (c *goodsItemClient) getBySKU(sku []byte, txn *badger.Txn) (out entity.GoodsItem, err error) { + item, err := txn.Get(c.prefixed(sku)) + if err != nil { + return out, fmt.Errorf("getting key: %w", err) + } + + err = item.Value(func(val []byte) error { + out = fbs.ParseGoodsItem(val) + return nil + }) + if err != nil { + return out, fmt.Errorf("reading value: %w", err) + } + + return out, nil +}