fix saving and project improvments

This commit is contained in:
Aleksandr Trushkin
2024-01-25 16:42:08 +03:00
parent f94d39b124
commit 6fe250896c
17 changed files with 692 additions and 438 deletions

View File

@ -1,22 +1,24 @@
package badger
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"runtime"
"unsafe"
"git.loyso.art/frx/eway/internal/encoding/fbs"
"git.loyso.art/frx/eway/internal/entity"
badger "github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/badger/v4/pb"
"github.com/dgraph-io/ristretto/z"
"github.com/rs/zerolog"
)
const useJSON = false
type goodsItemClient struct {
db *badger.DB
}
@ -65,7 +67,21 @@ func (c *goodsItemClient) ListIter(
}
for _, kv := range list.GetKv() {
bus <- fbs.ParseGoodsItem(kv.GetValue())
var gooditem entity.GoodsItem
if useJSON {
err = json.Unmarshal(kv.GetValue(), &gooditem)
if err != nil {
return err
}
} else {
gooditem, err = fbs.ParseGoodsItem(kv.GetValue())
if err != nil {
return err
}
}
bus <- gooditem
}
return nil
@ -74,10 +90,11 @@ func (c *goodsItemClient) ListIter(
go func(ctx context.Context) {
defer close(bus)
err := stream.Orchestrate(context.Background())
err := stream.Orchestrate(ctx)
if err != nil {
zerolog.Ctx(ctx).Warn().Err(err).Msg("unable to orchestrate")
}
println("finished")
}(ctx)
return bus, nil
@ -94,10 +111,23 @@ func (c *goodsItemClient) List(
defer iter.Close()
prefix := c.prefix()
var cursor int
for iter.Seek(prefix); iter.ValidForPrefix(prefix); iter.Next() {
cursor++
current := iter.Item()
err = current.Value(func(val []byte) error {
goodsItem := fbs.ParseGoodsItem(val)
var goodsItem entity.GoodsItem
if useJSON {
err := json.Unmarshal(val, &goodsItem)
if err != nil {
return err
}
} else {
goodsItem, err = fbs.ParseGoodsItem(val)
if err != nil {
return err
}
}
out = append(out, goodsItem)
return nil
@ -152,6 +182,8 @@ func (c *goodsItemClient) GetByCart(ctx context.Context, id int64) (out entity.G
return fmt.Errorf("getting value of idx: %w", err)
}
sku = bytes.TrimPrefix(sku, c.prefix())
out, err = c.getBySKU(sku, txn)
return err
})
@ -167,75 +199,15 @@ func (c *goodsItemClient) GetByCart(ctx context.Context, id int64) (out entity.G
}
func (c *goodsItemClient) UpsertMany(ctx context.Context, items ...entity.GoodsItem) ([]entity.GoodsItem, error) {
return items, c.upsertByOne(ctx, items)
}
func (c *goodsItemClient) upsertByOne(ctx context.Context, items []entity.GoodsItem) error {
return c.db.Update(func(txn *badger.Txn) error {
for _, item := range items {
key := c.prefixedStr(item.Articul)
value := fbs.MakeDomainGoodItemFinished(item)
valueIdx := make([]byte, len(key))
copy(valueIdx, key)
err := txn.Set(key, value)
if err != nil {
return err
}
err = txn.Set(c.prefixedIDByCartStr(item.Cart), valueIdx)
if err != nil {
return err
}
}
return nil
})
}
func (c *goodsItemClient) upsertByStream(ctx context.Context, items []entity.GoodsItem) error {
stream := c.db.NewStreamWriter()
defer stream.Cancel()
err := stream.Prepare()
if err != nil {
return fmt.Errorf("preparing stream: %w", err)
}
buf := z.NewBuffer(len(items), "sometag")
for _, item := range items {
key := c.prefixedStr(item.Articul)
keyIdx := c.prefixedIDByCartStr(item.Cart)
value := fbs.MakeDomainGoodItemFinished(item)
itemKV := &pb.KV{Key: key, Value: value}
itemKVIdx := &pb.KV{Key: keyIdx, Value: key}
badger.KVToBuffer(itemKV, buf)
badger.KVToBuffer(itemKVIdx, buf)
}
err = stream.Write(buf)
if err != nil {
return fmt.Errorf("writing buf: %w", err)
}
err = stream.Flush()
if err != nil {
return fmt.Errorf("flushing changes: %w", err)
}
return nil
return items, c.upsertByBatch(ctx, items)
}
func (c *goodsItemClient) upsertByBatch(ctx context.Context, items []entity.GoodsItem) error {
batch := c.db.NewWriteBatch()
defer func() {
println("closing batch")
batch.Cancel()
}()
defer batch.Cancel()
log := zerolog.Ctx(ctx)
for _, item := range items {
select {
case <-ctx.Done():
@ -243,9 +215,16 @@ func (c *goodsItemClient) upsertByBatch(ctx context.Context, items []entity.Good
default:
}
key := c.prefixedStr(item.Articul)
value := fbs.MakeDomainGoodItemFinished(item)
var value []byte
if useJSON {
value, _ = json.Marshal(item)
} else {
value = fbs.MakeDomainGoodItemFinished(item)
}
idxValue := make([]byte, len(key))
copy(idxValue, key)
coreEntry := badger.NewEntry(key, value)
if err := batch.SetEntry(coreEntry); err != nil {
log.Warn().Err(err).Msg("unable to set item, breaking")
@ -258,14 +237,10 @@ func (c *goodsItemClient) upsertByBatch(ctx context.Context, items []entity.Good
log.Warn().Err(err).Msg("unable to set idx, breaking")
break
}
runtime.Gosched()
}
println("flushing")
err := batch.Flush()
runtime.Gosched()
if err != nil {
println("flush err", err.Error())
return fmt.Errorf("flushing changes: %w", err)
}
@ -279,9 +254,14 @@ func (c *goodsItemClient) getBySKU(sku []byte, txn *badger.Txn) (out entity.Good
}
err = item.Value(func(val []byte) error {
out = fbs.ParseGoodsItem(val)
return nil
if useJSON {
return json.Unmarshal(val, &out)
}
out, err = fbs.ParseGoodsItem(val)
return err
})
if err != nil {
return out, fmt.Errorf("reading value: %w", err)
}