add badger

This commit is contained in:
Gitea
2024-01-24 16:17:28 +03:00
parent 5b5dc2165c
commit f94d39b124
5 changed files with 535 additions and 1 deletions

1
.gitignore vendored
View File

@ -1,3 +1,2 @@
*.json
*.zst
badger/

View File

@ -0,0 +1,143 @@
package badger
import (
"context"
"encoding/binary"
"errors"
"fmt"
"git.loyso.art/frx/eway/internal/encoding/fbs"
"git.loyso.art/frx/eway/internal/entity"
badger "github.com/dgraph-io/badger/v4"
"github.com/rs/zerolog"
)
type categoryClient struct {
db *badger.DB
seqGen *badger.Sequence
}
func newCategoryClient(db *badger.DB, seqGen *badger.Sequence) categoryClient {
return categoryClient{
db: db,
seqGen: seqGen,
}
}
func (categoryClient) prefix() []byte {
return []byte("!!category!!")
}
func (c categoryClient) prefixed(key []byte) []byte {
return append(c.prefix(), key...)
}
func (c categoryClient) prefixedInt(key int64) []byte {
var keyBytes [8]byte
binary.BigEndian.PutUint64(keyBytes[:], uint64(key))
return c.prefixed(keyBytes[:])
}
func (c categoryClient) List(
ctx context.Context,
) (out []entity.Category, err error) {
err = c.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchSize = 10
opts.PrefetchValues = true
iter := txn.NewIterator(opts)
defer iter.Close()
prefix := c.prefix()
for iter.Seek(prefix); iter.ValidForPrefix(prefix); iter.Next() {
current := iter.Item()
err = current.Value(func(val []byte) error {
category := fbs.ParseCategory(val)
out = append(out, category)
return nil
})
if err != nil {
return fmt.Errorf("getting value: %w", err)
}
}
return nil
})
if err != nil {
return nil, fmt.Errorf("viewing: %w", err)
}
return out, nil
}
func (c categoryClient) Get(ctx context.Context, id int64) (out entity.Category, err error) {
err = c.db.View(func(txn *badger.Txn) error {
key := c.prefixedInt(id)
item, err := txn.Get(key)
if err != nil {
return fmt.Errorf("getting key: %w", err)
}
err = item.Value(func(val []byte) error {
out = fbs.ParseCategory(val)
return nil
})
if err != nil {
return fmt.Errorf("reading value: %w", err)
}
return nil
})
if err != nil {
if errors.Is(err, badger.ErrKeyNotFound) {
return out, entity.ErrNotFound
}
return out, fmt.Errorf("viewing: %w", err)
}
return out, nil
}
// Create new category inside DB. It also applies new id to it.
func (c categoryClient) Create(ctx context.Context, name string) (out entity.Category, err error) {
seqGen, err := c.db.GetSequence(categorySequenceIDKey, 1)
if err != nil {
return out, fmt.Errorf("getting sequence for categories: %w", err)
}
defer func() {
errRelese := seqGen.Release()
if errRelese != nil {
zerolog.Ctx(ctx).Warn().Err(err).Msg("unable to release seq")
}
}()
nextid, err := seqGen.Next()
if err != nil {
return out, fmt.Errorf("getting next id: %w", err)
}
out = entity.Category{
ID: int64(nextid),
Name: name,
}
err = c.db.Update(func(txn *badger.Txn) error {
key := c.prefixedInt(out.ID)
err = txn.Set(key, fbs.MakeCategoryFinished(out))
if err != nil {
return fmt.Errorf("setting: %w", err)
}
return nil
})
if err != nil {
return out, fmt.Errorf("updating: %w", err)
}
return out, nil
}

View File

@ -0,0 +1,47 @@
package badger
import (
"git.loyso.art/frx/eway/internal/entity"
badger "github.com/dgraph-io/badger/v4"
)
var (
categorySequenceIDKey = []byte("cat:")
)
type client struct {
db *badger.DB
// nextCategoryIDSeq *badger.Sequence
}
func NewClient(db *badger.DB) (*client, error) {
// categorySeqGen, err := db.GetSequence(categorySequenceIDKey, 10)
// if err != nil {
// return nil, fmt.Errorf("getting sequence for categories: %w", err)
// }
//
return &client{
db: db,
// nextCategoryIDSeq: categorySeqGen,
}, nil
}
// Close closes the underlying sequences in the client. Should be called right before
// underlying *badger.DB closed.
func (c *client) Close() error {
// err := c.nextCategoryIDSeq.Release()
// if err != nil {
// return fmt.Errorf("releasing next_category_sequence: %w", err)
// }
return nil
}
func (c *client) Category() entity.CategoryRepository {
return newCategoryClient(c.db, nil)
}
func (c *client) GoodsItem() entity.GoodsItemRepository {
return newGoodsItemClient(c.db)
}

View File

@ -0,0 +1,55 @@
package badger
import (
"context"
"fmt"
"strings"
badger "github.com/dgraph-io/badger/v4"
"github.com/rs/zerolog"
)
type zerologAdapter struct {
log zerolog.Logger
}
func (za zerologAdapter) Debugf(format string, args ...any) {
za.fmt(za.log.Debug(), format, args...)
}
func (za zerologAdapter) Infof(format string, args ...any) {
za.fmt(za.log.Info(), format, args...)
}
func (za zerologAdapter) Warningf(format string, args ...any) {
za.fmt(za.log.Warn(), format, args...)
}
func (za zerologAdapter) Errorf(format string, args ...any) {
za.fmt(za.log.Error(), format, args...)
}
func (za zerologAdapter) fmt(event *zerolog.Event, format string, args ...any) {
event.Msgf(strings.TrimSuffix(format, "\n"), args...)
}
func Open(ctx context.Context, path string, log zerolog.Logger) (*badger.DB, error) {
bl := zerologAdapter{
log: log.With().Str("db", "badger").Logger(),
}
opts := badger.DefaultOptions(path).
WithLogger(bl).
WithLoggingLevel(badger.INFO).
WithValueLogFileSize(4 << 20).
WithDir(path).
WithValueDir(path)
// WithMaxLevels(4).
// WithMemTableSize(8 << 20).
// WithMetricsEnabled(true).
// WithCompactL0OnClose(true).
// WithBlockCacheSize(8 << 20)
db, err := badger.Open(opts)
if err != nil {
return nil, fmt.Errorf("opening badger: %w", err)
}
return db, nil
}

View File

@ -0,0 +1,290 @@
package badger
import (
"context"
"encoding/binary"
"errors"
"fmt"
"runtime"
"unsafe"
"git.loyso.art/frx/eway/internal/encoding/fbs"
"git.loyso.art/frx/eway/internal/entity"
badger "github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/badger/v4/pb"
"github.com/dgraph-io/ristretto/z"
"github.com/rs/zerolog"
)
type goodsItemClient struct {
db *badger.DB
}
func newGoodsItemClient(db *badger.DB) *goodsItemClient {
return &goodsItemClient{
db: db,
}
}
func (*goodsItemClient) prefix() []byte {
return []byte("!!goodsitem!!")
}
func (c *goodsItemClient) prefixed(key []byte) []byte {
return append(c.prefix(), key...)
}
func (c *goodsItemClient) prefixedStr(key string) []byte {
keyBytes := unsafe.Slice(unsafe.StringData(key), len(key))
return c.prefixed(keyBytes)
}
func (c *goodsItemClient) prefixedIDByCartStr(key int64) []byte {
var keyBytes [8]byte
binary.BigEndian.PutUint64(keyBytes[:], uint64(key))
return c.prefixedIDByCart(keyBytes[:])
}
func (*goodsItemClient) prefixedIDByCart(key []byte) []byte {
return append([]byte("!!goodsitem_card_idx!!"), key...)
}
func (c *goodsItemClient) ListIter(
ctx context.Context, prefetchCount int,
) (out <-chan entity.GoodsItem, err error) {
stream := c.db.NewStream()
stream.Prefix = c.prefix()
stream.LogPrefix = "list_iter"
bus := make(chan entity.GoodsItem, prefetchCount)
stream.Send = func(buf *z.Buffer) error {
list, err := badger.BufferToKVList(buf)
if err != nil {
return err
}
for _, kv := range list.GetKv() {
bus <- fbs.ParseGoodsItem(kv.GetValue())
}
return nil
}
go func(ctx context.Context) {
defer close(bus)
err := stream.Orchestrate(context.Background())
if err != nil {
zerolog.Ctx(ctx).Warn().Err(err).Msg("unable to orchestrate")
}
}(ctx)
return bus, nil
}
func (c *goodsItemClient) List(
ctx context.Context,
) (out []entity.GoodsItem, err error) {
err = c.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = true
opts.PrefetchSize = 20
iter := txn.NewIterator(opts)
defer iter.Close()
prefix := c.prefix()
for iter.Seek(prefix); iter.ValidForPrefix(prefix); iter.Next() {
current := iter.Item()
err = current.Value(func(val []byte) error {
goodsItem := fbs.ParseGoodsItem(val)
out = append(out, goodsItem)
return nil
})
if err != nil {
return fmt.Errorf("getting value: %w", err)
}
}
return nil
})
if err != nil {
return nil, fmt.Errorf("viewing: %w", err)
}
return out, nil
}
func (c *goodsItemClient) Get(
ctx context.Context,
sku string,
) (out entity.GoodsItem, err error) {
err = c.db.View(func(txn *badger.Txn) error {
key := unsafe.Slice(unsafe.StringData(sku), len(sku))
out, err = c.getBySKU(key, txn)
return err
})
if err != nil {
if errors.Is(err, badger.ErrKeyNotFound) {
return out, entity.ErrNotFound
}
return out, fmt.Errorf("viewing: %w", err)
}
return out, nil
}
func (c *goodsItemClient) GetByCart(ctx context.Context, id int64) (out entity.GoodsItem, err error) {
err = c.db.View(func(txn *badger.Txn) error {
var idByte [8]byte
binary.BigEndian.PutUint64(idByte[:], uint64(id))
item, err := txn.Get(c.prefixedIDByCart(idByte[:]))
if err != nil {
return fmt.Errorf("getting key: %w", err)
}
sku := make([]byte, item.ValueSize())
sku, err = item.ValueCopy(sku)
if err != nil {
return fmt.Errorf("getting value of idx: %w", err)
}
out, err = c.getBySKU(sku, txn)
return err
})
if err != nil {
if errors.Is(err, badger.ErrKeyNotFound) {
return out, entity.ErrNotFound
}
return out, fmt.Errorf("viewing: %w", err)
}
return out, nil
}
func (c *goodsItemClient) UpsertMany(ctx context.Context, items ...entity.GoodsItem) ([]entity.GoodsItem, error) {
return items, c.upsertByOne(ctx, items)
}
func (c *goodsItemClient) upsertByOne(ctx context.Context, items []entity.GoodsItem) error {
return c.db.Update(func(txn *badger.Txn) error {
for _, item := range items {
key := c.prefixedStr(item.Articul)
value := fbs.MakeDomainGoodItemFinished(item)
valueIdx := make([]byte, len(key))
copy(valueIdx, key)
err := txn.Set(key, value)
if err != nil {
return err
}
err = txn.Set(c.prefixedIDByCartStr(item.Cart), valueIdx)
if err != nil {
return err
}
}
return nil
})
}
func (c *goodsItemClient) upsertByStream(ctx context.Context, items []entity.GoodsItem) error {
stream := c.db.NewStreamWriter()
defer stream.Cancel()
err := stream.Prepare()
if err != nil {
return fmt.Errorf("preparing stream: %w", err)
}
buf := z.NewBuffer(len(items), "sometag")
for _, item := range items {
key := c.prefixedStr(item.Articul)
keyIdx := c.prefixedIDByCartStr(item.Cart)
value := fbs.MakeDomainGoodItemFinished(item)
itemKV := &pb.KV{Key: key, Value: value}
itemKVIdx := &pb.KV{Key: keyIdx, Value: key}
badger.KVToBuffer(itemKV, buf)
badger.KVToBuffer(itemKVIdx, buf)
}
err = stream.Write(buf)
if err != nil {
return fmt.Errorf("writing buf: %w", err)
}
err = stream.Flush()
if err != nil {
return fmt.Errorf("flushing changes: %w", err)
}
return nil
}
func (c *goodsItemClient) upsertByBatch(ctx context.Context, items []entity.GoodsItem) error {
batch := c.db.NewWriteBatch()
defer func() {
println("closing batch")
batch.Cancel()
}()
log := zerolog.Ctx(ctx)
for _, item := range items {
select {
case <-ctx.Done():
break
default:
}
key := c.prefixedStr(item.Articul)
value := fbs.MakeDomainGoodItemFinished(item)
idxValue := make([]byte, len(key))
copy(idxValue, key)
coreEntry := badger.NewEntry(key, value)
if err := batch.SetEntry(coreEntry); err != nil {
log.Warn().Err(err).Msg("unable to set item, breaking")
break
}
idxKey := c.prefixedIDByCartStr(item.Cart)
idxEntry := badger.NewEntry(idxKey, idxValue)
if err := batch.SetEntry(idxEntry); err != nil {
log.Warn().Err(err).Msg("unable to set idx, breaking")
break
}
runtime.Gosched()
}
println("flushing")
err := batch.Flush()
runtime.Gosched()
if err != nil {
println("flush err", err.Error())
return fmt.Errorf("flushing changes: %w", err)
}
return nil
}
func (c *goodsItemClient) getBySKU(sku []byte, txn *badger.Txn) (out entity.GoodsItem, err error) {
item, err := txn.Get(c.prefixed(sku))
if err != nil {
return out, fmt.Errorf("getting key: %w", err)
}
err = item.Value(func(val []byte) error {
out = fbs.ParseGoodsItem(val)
return nil
})
if err != nil {
return out, fmt.Errorf("reading value: %w", err)
}
return out, nil
}