diff --git a/.gitignore b/.gitignore index 3d5bdff..13dc1d7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ *.json *.zst +database +bin diff --git a/cmd/converter/components/di.go b/cmd/converter/components/di.go new file mode 100644 index 0000000..08b29fb --- /dev/null +++ b/cmd/converter/components/di.go @@ -0,0 +1,129 @@ +package components + +import ( + "context" + "fmt" + "io" + "time" + + "git.loyso.art/frx/eway/internal/config" + "git.loyso.art/frx/eway/internal/interconnect/eway" + "git.loyso.art/frx/eway/internal/storage" + xbadger "git.loyso.art/frx/eway/internal/storage/badger" + + "github.com/BurntSushi/toml" + "github.com/dgraph-io/badger/v4" + "github.com/rs/zerolog" + "github.com/samber/do" +) + +// Yeah, singleton is not good UNLESS you're really lazy +var diInjector *do.Injector + +func GetRepository() (storage.Repository, error) { + adapter, err := do.Invoke[*storageRepositoryAdapter](diInjector) + if err != nil { + return nil, err + } + + return adapter.entity, nil +} + +func GetLogger() (zerolog.Logger, error) { + return do.Invoke[zerolog.Logger](diInjector) +} + +func SetupDI(ctx context.Context, cfgpath string) error { + cfg, err := parseSettings(cfgpath) + if err != nil { + return err + } + + diInjector = do.New() + + do.Provide(diInjector, func(i *do.Injector) (zerolog.Logger, error) { + tsSet := func(wr *zerolog.ConsoleWriter) { + wr.TimeFormat = time.RFC3339 + } + + log := zerolog.New(zerolog.NewConsoleWriter(tsSet)).With().Timestamp().Str("app", "converter").Logger() + + return log, nil + }) + + do.Provide[eway.Client](diInjector, func(i *do.Injector) (eway.Client, error) { + log, err := GetLogger() + if err != nil { + return nil, fmt.Errorf("getting logger: %w", err) + } + + client := eway.New(eway.Config(cfg.Eway), log) + return client, nil + }) + + do.Provide[*badgerDBAdapter](diInjector, func(i *do.Injector) (*badgerDBAdapter, error) { + db, err := xbadger.Open(ctx, cfg.Badger.Dir, cfg.Badger.Debug, zerolog.Nop()) + if err != nil { + return nil, fmt.Errorf("getting db: %w", err) + } + + out := &badgerDBAdapter{entity: db} + return out, nil + }) + + do.Provide[*storageRepositoryAdapter](diInjector, func(i *do.Injector) (*storageRepositoryAdapter, error) { + db, err := getDB() + if err != nil { + return nil, err + } + + client, err := xbadger.NewClient(db) + if err != nil { + return nil, fmt.Errorf("getting badger client: %w", err) + } + + out := &storageRepositoryAdapter{entity: client} + return out, nil + }) + + return nil +} + +func Shutdown() error { + return diInjector.Shutdown() +} + +func getDB() (*badger.DB, error) { + adapter, err := do.Invoke[*badgerDBAdapter](diInjector) + if err != nil { + return nil, err + } + + return adapter.entity, nil +} + +type settings struct { + Badger config.Badger + Log config.Log + Eway config.Eway +} + +func parseSettings(cfgpath string) (cfg settings, err error) { + _, err = toml.DecodeFile(cfgpath, &cfg) + if err != nil { + return cfg, fmt.Errorf("parsing file: %w", err) + } + + return cfg, nil +} + +type entityCloserAdapter[T io.Closer] struct { + entity T +} + +func (a entityCloserAdapter[T]) Shutdown() error { + return a.entity.Close() +} + +type storageRepositoryAdapter entityCloserAdapter[storage.Repository] +type badgerDBAdapter entityCloserAdapter[*badger.DB] diff --git a/cmd/converter/main.go b/cmd/converter/main.go index 06ca5b3..0683418 100644 --- a/cmd/converter/main.go +++ b/cmd/converter/main.go @@ -11,206 +11,196 @@ import ( "io" "os" "os/signal" - "sync" - "sync/atomic" "time" - "git.loyso.art/frx/eway/internal/config" + "git.loyso.art/frx/eway/cmd/converter/components" + "git.loyso.art/frx/eway/internal/encoding/fbs" "git.loyso.art/frx/eway/internal/entity" - "git.loyso.art/frx/eway/internal/storage" - xbadger "git.loyso.art/frx/eway/internal/storage/badger" - badger "github.com/dgraph-io/badger/v4" - "github.com/dgraph-io/badger/v4/pb" "github.com/rodaine/table" "github.com/rs/zerolog" "github.com/urfave/cli" ) -type appSettings struct { - Badger config.Badger -} - func main() { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) defer func() { cancel() }() - var corewg sync.WaitGroup - err := runcli(ctx, &corewg) + err := runcli(ctx) if err != nil { fmt.Fprintf(os.Stderr, "unable to handle app: %v", err) - - corewg.Wait() os.Exit(1) } - corewg.Wait() os.Exit(0) } -func runcli(ctx context.Context, wg *sync.WaitGroup) (err error) { - tsSet := func(wr *zerolog.ConsoleWriter) { - wr.TimeFormat = time.RFC3339 - } - log := zerolog.New(zerolog.NewConsoleWriter(tsSet)).Level(zerolog.DebugLevel).With().Timestamp().Str("app", "converter").Logger() - - defer func() { - log.Info().Err(err).Msg("app finished") - }() - - ctx = log.WithContext(ctx) - - log.Info().Msg("making badger") - - db, err := xbadger.Open(ctx, "badger/", log) - if err != nil { - return fmt.Errorf("opening badger: %w", err) - } - - go func() { - var item atomic.Uint64 - err = db.Subscribe( - ctx, - func(kvlist *badger.KVList) error { - kvs := kvlist.GetKv() - for _, kv := range kvs { - count := item.Add(1) - log.Debug().Bytes("key", kv.GetKey()).Uint64("count", count).Msg("inspecting") - } - return nil - }, - []pb.Match{ - { - Prefix: []byte("!!category!!"), - }, - { - Prefix: []byte("!!goodsitem!!"), - }, - }, - ) - log.Err(err).Msg("subscribing") - - }() - - maxBatch := db.MaxBatchCount() - log.Info().Int("max_batch", int(maxBatch)).Msg("max batch settings") - client, err := xbadger.NewClient(db) - if err != nil { - return fmt.Errorf("making new client: %w", err) - } - - wg.Add(1) - defer func() { - defer wg.Done() - - println("closing client") - errClose := client.Close() - if errClose != nil { - log.Warn().Err(errClose).Msg("unable to close client") - } - - println("flushing db") - errSync := db.Sync() - if errSync != nil { - log.Warn().Err(errSync).Msg("unable to sync db") - } - - // time.Sleep(time.Second * 5) - - println("closing db") - errClose = db.Close() - if errClose != nil { - log.Warn().Err(errClose).Msg("unable to close db") - } - }() - - app := setupCLI(ctx, client, maxBatch) - +func runcli(ctx context.Context) (err error) { + app := setupCLI(ctx) return app.Run(os.Args) } -func setupCLI(ctx context.Context, r storage.Repository, maxBatch int64) *cli.App { - app := cli.NewApp() +func setupDI(ctx context.Context) cli.BeforeFunc { + return func(c *cli.Context) error { + cfgpath := c.String("config") + err := components.SetupDI(ctx, cfgpath) + if err != nil { + return fmt.Errorf("setting up di: %w", err) + } + + return nil + } +} + +func releaseDI(c *cli.Context) error { + log, err := components.GetLogger() + if err != nil { + return fmt.Errorf("getting logger: %w", err) + } + + log.Info().Msg("shutting down env") + start := time.Now() + defer func() { + since := time.Since(start) + log.Err(err).Dur("elapsed", since).Msg("shutdown finished") + }() + + return components.Shutdown() +} + +func setupCLI(ctx context.Context) *cli.App { + app := cli.NewApp() + app.Flags = append( + app.Flags, + cli.StringFlag{ + Name: "config", + Usage: "path to config in TOML format", + Value: "config.toml", + TakesFile: true, + }, + ) + + app.Before = setupDI(ctx) + app.After = releaseDI app.Commands = cli.Commands{ - newImportCmd(ctx, r, maxBatch), - newViewCmd(ctx, r), + newImportCmd(ctx), + newViewCmd(ctx), + cli.Command{ + Name: "test-fbs", + Usage: "a simple check for tbs", + Action: cli.ActionFunc(func(c *cli.Context) error { + gooditem := entity.GoodsItem{ + Articul: "some-sku", + Photo: "/photo/path.jpg", + Name: "some-name", + Description: "bad-desc", + Category: "", + Type: "some-type", + Producer: "my-producer", + Pack: 123, + Step: 10, + Price: 12.34, + TariffPrice: 43.21, + Cart: 1998, + Stock: 444, + } + + data := fbs.MakeDomainGoodItemFinished(gooditem) + datahexed := hex.EncodeToString(data) + println(datahexed) + + got, err := fbs.ParseGoodsItem(data) + if err != nil { + return fmt.Errorf("parsing: %w", err) + } + + if got != gooditem { + gotStr := fmt.Sprintf("%v", got) + hasStr := fmt.Sprintf("%v", gooditem) + println(gotStr, "\n", hasStr) + } + + return nil + }), + }, } return app } -func newImportCmd(ctx context.Context, r storage.Repository, maxBatch int64) cli.Command { +func newImportCmd(ctx context.Context) cli.Command { return cli.Command{ Name: "import", Usage: "category for importing data from sources", - Flags: []cli.Flag{ - // &cli.StringFlag{ - // Name: "config", - // Usage: "path to config", - // Value: "config.json", - // TakesFile: true, - // }, - &cli.BoolFlag{ - Name: "verbose", - Usage: "set logger to debug mode", - }, - }, - Before: func(c *cli.Context) error { - if c.Bool("verbose") { - zerolog.SetGlobalLevel(zerolog.DebugLevel) - } - return nil - }, Subcommands: cli.Commands{ - newImportFromFileCmd(ctx, r, maxBatch), + newImportFromFileCmd(ctx), }, } } -func newViewCmd(ctx context.Context, r storage.Repository) cli.Command { +func newViewCmd(ctx context.Context) cli.Command { return cli.Command{ Name: "view", Usage: "Set of commands to view the data inside db", Subcommands: []cli.Command{ - newViewCategoriesCmd(ctx, r), - newViewItemsCmd(ctx, r), + newViewCategoriesCmd(ctx), + newViewItemsCmd(ctx), }, } } -func newViewCategoriesCmd(ctx context.Context, r storage.Repository) cli.Command { +func newViewCategoriesCmd(ctx context.Context) cli.Command { return cli.Command{ Name: "categories", Usage: "Set of commands to work with categories", Subcommands: []cli.Command{ - newViewCategoriesListCmd(ctx, r), + newViewCategoriesListCmd(ctx), }, } } -func newViewItemsCmd(ctx context.Context, r storage.Repository) cli.Command { +func newViewItemsCmd(ctx context.Context) cli.Command { return cli.Command{ Name: "items", Usage: "Set of command to work with items", Subcommands: cli.Commands{ - newViewItemsCountCmd(ctx, r), + newViewItemsGetCmd(ctx), + newViewItemsCountCmd(ctx), }, } } -func newViewItemsCountCmd(ctx context.Context, r storage.Repository) cli.Command { +func newViewItemsCountCmd(ctx context.Context) cli.Command { return cli.Command{ Name: "count", Usage: "iterates over collection and counts number of items", - Action: viewItemsCount(ctx, r), + Action: decorateAction(ctx, viewItemsCountAction), } } -func newViewCategoriesListCmd(ctx context.Context, r storage.Repository) cli.Command { +func newViewItemsGetCmd(ctx context.Context) cli.Command { + return cli.Command{ + Name: "get", + Usage: "gets goods item by its id", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "id", + Usage: "id of the goods item. Either id or cart-id should be set", + }, + &cli.Int64Flag{ + Name: "cart-id", + Usage: "cart-id of the item. Either cart-id or id should be set", + }, + }, + Action: decorateAction(ctx, viewItemsGetAction), + } +} + +func newViewCategoriesListCmd(ctx context.Context) cli.Command { return cli.Command{ Name: "list", Usage: "lists stored categories stored in database", @@ -230,84 +220,11 @@ func newViewCategoriesListCmd(ctx context.Context, r storage.Repository) cli.Com Usage: "prints total count of categories", }, }, - Action: viewCategoriesListAction(ctx, r), + Action: decorateAction(ctx, viewCategoriesListAction), } } -func viewItemsCount(ctx context.Context, r storage.Repository) cli.ActionFunc { - f := func(c *cli.Context) error { - // itemChan, err := r.GoodsItem().ListIter(ctx, 10) - // if err != nil { - // return fmt.Errorf("getting list iter: %w", err) - // } - // - var count int - // for range itemChan { - // count++ - // } - // - items, err := r.GoodsItem().List(ctx) - if err != nil { - return fmt.Errorf("getting list: %w", err) - } - - zerolog.Ctx(ctx).Info().Int("count", count).Int("list_count", len(items)).Msg("read all items") - return nil - } - - return wrapActionFunc(ctx, f) -} - -func viewCategoriesListAction(ctx context.Context, r storage.Repository) cli.ActionFunc { - f := func(c *cli.Context) error { - categories, err := r.Category().List(ctx) - if err != nil { - return fmt.Errorf("listing categories: %w", err) - } - - limit := c.Int("limit") - page := c.Int("page") - total := len(categories) - - if page == 0 { - page = 1 - } - - if limit > 0 { - offset := (page - 1) * limit - if offset > len(categories) { - offset = len(categories) - 1 - } - - limit = offset + limit - if limit > len(categories) { - limit = len(categories) - } - - categories = categories[offset:limit] - } - - tbl := table.New("ID", "Name") - for _, category := range categories { - if category.ID == 0 && category.Name == "" { - continue - } - tbl.AddRow(category.ID, category.Name) - } - - tbl.Print() - - if c.Bool("with-total") { - zerolog.Ctx(ctx).Info().Int("count", total).Msg("total categories stats") - } - - return nil - } - - return wrapActionFunc(ctx, f) -} - -func newImportFromFileCmd(ctx context.Context, r storage.Repository, maxBatch int64) cli.Command { +func newImportFromFileCmd(ctx context.Context) cli.Command { return cli.Command{ Name: "fromfile", Usage: "imports from file into db", @@ -319,126 +236,250 @@ func newImportFromFileCmd(ctx context.Context, r storage.Repository, maxBatch in Required: true, }, }, - Action: handleConvert(ctx, r, maxBatch), + Action: decorateAction(ctx, importFromFileAction), } } -func handleConvert(ctx context.Context, r storage.Repository, maxBatch int64) cli.ActionFunc { - f := func(c *cli.Context) error { - filesrc := c.String("src") - log := zerolog.Ctx(ctx) +func viewItemsGetAction(ctx context.Context, c *cli.Context) error { + r, err := components.GetRepository() + if err != nil { + return fmt.Errorf("getting repository: %w", err) + } - log.Debug().Str("filepath", filesrc).Msg("importing data from file") + id := c.String("id") + cartID := c.Int64("cart-id") + if id == "" && cartID == 0 { + return cli.NewExitError("oneof: id or cart-id should be set", 1) + } else if id != "" && cartID != 0 { + return cli.NewExitError("oneof: id or cart-id should be set", 1) + } - productsFile, err := os.Open(filesrc) - if err != nil { - return fmt.Errorf("opening file: %w", err) + var item entity.GoodsItem + if id != "" { + item, err = r.GoodsItem().Get(ctx, id) + } else { + item, err = r.GoodsItem().GetByCart(ctx, cartID) + } + if err != nil { + return fmt.Errorf("getting item: %w", err) + } + + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + + err = enc.Encode(item) + if err != nil { + return fmt.Errorf("encoding item: %w", err) + } + + return nil +} + +func viewItemsCountAction(ctx context.Context, c *cli.Context) error { + r, err := components.GetRepository() + if err != nil { + return fmt.Errorf("getting repository: %w", err) + } + + itemChan, err := r.GoodsItem().ListIter(ctx, 10) + if err != nil { + if !errors.Is(err, entity.ErrNotImplemented) { + return fmt.Errorf("getting list iter: %w", err) } + } - defer func() { - errClose := productsFile.Close() - if errClose != nil { - log.Warn().Err(errClose).Msg("unable to close file") - } - }() - - var ( - goodsItem entity.GoodsItem - // goodsItems []entity.GoodsItem - failedToInsert int - ) - - seenCategories := make(map[string]struct{}) - categories, err := r.Category().List(ctx) - if err != nil { - return fmt.Errorf("listing categories: %w", err) - } - - for _, category := range categories { - seenCategories[category.Name] = struct{}{} - } - - bfile := bufio.NewReader(productsFile) - for { - line, _, err := bfile.ReadLine() - if err != nil { - if errors.Is(err, io.EOF) { - break - + var count int + if err == nil { + var done bool + for !done { + select { + case _, ok := <-itemChan: + if !ok { + done = true + continue } - return fmt.Errorf("reading line: %w", err) + count++ + case <-ctx.Done(): + return ctx.Err() } - - err = json.Unmarshal(line, &goodsItem) - if err != nil { - // log.Warn().Err(err).Str("line", string(line)).Msg("unable to unmarshal line into item") - failedToInsert++ - continue - } - - _, err = r.GoodsItem().UpsertMany(ctx, goodsItem) - if err != nil { - return fmt.Errorf("unable to upsert new item: %w", err) - } - - // goodsItems = append(goodsItems, goodsItem) - - if _, ok := seenCategories[goodsItem.Type]; ok { - continue - } - - _, err = r.Category().Create(ctx, goodsItem.Type) - if err != nil { - return fmt.Errorf("unable to create new category: %w", err) - } - log.Debug().Any("category", goodsItem.Type).Msg("inserted new category") - seenCategories[goodsItem.Type] = struct{}{} } - - // log.Debug().Int("count", len(goodsItems)).Int("failed", failedToInsert).Msg("preparing to upload") - // - // start := time.Now() - // batchSize := int(maxBatch) - // for i := 0; i < len(goodsItems); i += batchSize { - // to := i + batchSize - // if to > len(goodsItems) { - // to = len(goodsItems) - // } - // - // _, err = r.GoodsItem().UpsertMany(ctx, goodsItems[i:to]...) - // if err != nil { - // return fmt.Errorf("upserting items: %w", err) - // } - // log.Debug().Int("count", to-i).Msg("inserted batch") - // time.Sleep(time.Second) - // } - // log.Debug().Dur("elapsed", time.Since(start)).Msg("upload finished") - // - - time.Sleep(time.Second * 30) - - return nil } - time.Sleep(time.Second * 10) + items, err := r.GoodsItem().List(ctx) + if err != nil { + return fmt.Errorf("getting list: %w", err) + } - return wrapActionFunc(ctx, f) + zerolog.Ctx(ctx).Info().Int("count", count).Int("list_count", len(items)).Msg("read all items") + return nil } -func wrapActionFunc(ctx context.Context, next cli.ActionFunc) cli.ActionFunc { +func viewCategoriesListAction(ctx context.Context, c *cli.Context) error { + r, err := components.GetRepository() + if err != nil { + return fmt.Errorf("getting repository: %w", err) + } + + categories, err := r.Category().List(ctx) + if err != nil { + return fmt.Errorf("listing categories: %w", err) + } + + limit := c.Int("limit") + page := c.Int("page") + total := len(categories) + + if page == 0 { + page = 1 + } + + if limit > 0 { + offset := (page - 1) * limit + if offset > len(categories) { + offset = len(categories) - 1 + } + + limit = offset + limit + if limit > len(categories) { + limit = len(categories) + } + + categories = categories[offset:limit] + } + + tbl := table.New("ID", "Name") + for _, category := range categories { + if category.ID == 0 && category.Name == "" { + continue + } + tbl.AddRow(category.ID, category.Name) + } + + tbl.Print() + + if c.Bool("with-total") { + zerolog.Ctx(ctx).Info().Int("count", total).Msg("total categories stats") + } + + return nil +} + +func importFromFileAction(ctx context.Context, c *cli.Context) error { + const maxBatch = 2000 + + r, err := components.GetRepository() + if err != nil { + return fmt.Errorf("getting repository: %w", err) + } + + filesrc := c.String("src") + log := zerolog.Ctx(ctx) + + log.Debug().Str("filepath", filesrc).Msg("importing data from file") + + productsFile, err := os.Open(filesrc) + if err != nil { + return fmt.Errorf("opening file: %w", err) + } + + defer func() { + errClose := productsFile.Close() + if errClose != nil { + log.Warn().Err(errClose).Msg("unable to close file") + } + }() + + var ( + goodsItem entity.GoodsItem + goodsItems []entity.GoodsItem + failedToInsert int + ) + + seenCategories := make(map[string]struct{}) + categories, err := r.Category().List(ctx) + if err != nil { + return fmt.Errorf("listing categories: %w", err) + } + + for _, category := range categories { + seenCategories[category.Name] = struct{}{} + } + + bfile := bufio.NewReader(productsFile) + for { + line, _, err := bfile.ReadLine() + if err != nil { + if errors.Is(err, io.EOF) { + break + + } + return fmt.Errorf("reading line: %w", err) + } + + err = json.Unmarshal(line, &goodsItem) + if err != nil { + log.Warn().Err(err).Str("line", string(line)).Msg("unable to unmarshal line into item") + failedToInsert++ + continue + } + + goodsItems = append(goodsItems, goodsItem) + + if _, ok := seenCategories[goodsItem.Type]; ok { + continue + } + + _, err = r.Category().Create(ctx, goodsItem.Type) + if err != nil { + return fmt.Errorf("unable to create new category: %w", err) + } + log.Debug().Any("category", goodsItem.Type).Msg("inserted new category") + seenCategories[goodsItem.Type] = struct{}{} + } + + log.Debug().Int("count", len(goodsItems)).Int("failed", failedToInsert).Msg("preparing to upload") + + start := time.Now() + batchSize := int(maxBatch) + for i := 0; i < len(goodsItems); i += batchSize { + to := i + batchSize + if to > len(goodsItems) { + to = len(goodsItems) + } + + _, err = r.GoodsItem().UpsertMany(ctx, goodsItems[i:to]...) + if err != nil { + return fmt.Errorf("upserting items: %w", err) + } + log.Debug().Int("count", to-i).Msg("inserted batch") + } + log.Debug().Dur("elapsed", time.Since(start)).Msg("upload finished") + + return nil +} + +type action func(ctx context.Context, c *cli.Context) error + +func decorateAction(ctx context.Context, a action) cli.ActionFunc { return func(c *cli.Context) error { var data [3]byte _, _ = rand.Read(data[:]) reqid := hex.EncodeToString(data[:]) - log := zerolog.Ctx(ctx).With().Str("reqid", reqid).Logger() + log, err := components.GetLogger() + if err != nil { + return fmt.Errorf("getting logger: %w", err) + } + + log = log.With().Str("reqid", reqid).Logger() ctx = log.WithContext(ctx) + start := time.Now() defer func() { log.Info().Dur("elapsed", time.Since(start)).Msg("command completed") }() log.Info().Msg("command execution started") - return next(c) + return a(ctx, c) } } diff --git a/go.mod b/go.mod index 5b6e3b9..140b150 100644 --- a/go.mod +++ b/go.mod @@ -3,30 +3,31 @@ module git.loyso.art/frx/eway go 1.21.4 require ( + github.com/dgraph-io/badger/v4 v4.2.0 + github.com/dgraph-io/ristretto v0.1.1 + github.com/go-resty/resty/v2 v2.10.0 + github.com/google/flatbuffers v23.5.26+incompatible + github.com/rodaine/table v1.1.1 + github.com/rs/zerolog v1.31.0 + github.com/urfave/cli v1.22.14 +) + +require ( + github.com/BurntSushi/toml v1.3.2 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect - github.com/dgraph-io/badger/v4 v4.2.0 // indirect - github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dustin/go-humanize v1.0.0 // indirect - github.com/go-resty/resty/v2 v2.10.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/glog v1.0.0 // indirect github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.3 // indirect - github.com/google/flatbuffers v23.5.26+incompatible // indirect - github.com/karlseguin/ccache/v3 v3.0.5 // indirect - github.com/kataras/tablewriter v0.0.0-20180708051242-e063d29b7c23 // indirect github.com/klauspost/compress v1.12.3 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect - github.com/mattn/go-runewidth v0.0.15 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/rivo/uniseg v0.2.0 // indirect - github.com/rodaine/table v1.1.1 // indirect - github.com/rs/zerolog v1.31.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect - github.com/urfave/cli v1.22.14 // indirect + github.com/samber/do v1.6.0 // indirect go.opencensus.io v0.22.5 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/sys v0.13.0 // indirect diff --git a/go.sum b/go.sum index 81af957..0cbea73 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= @@ -9,11 +10,13 @@ github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgraph-io/badger/v4 v4.2.0 h1:kJrlajbXXL9DFTNuhhu9yCx7JJa4qpYWxtE8BzuWsEs= github.com/dgraph-io/badger/v4 v4.2.0/go.mod h1:qfCqhPoWDFJRx1gp5QwwyGo8xk1lbHUxvK9nK0OGAak= github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -39,11 +42,8 @@ github.com/google/flatbuffers v23.5.26+incompatible h1:M9dgRyhJemaM4Sw8+66GHBu8i github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/karlseguin/ccache/v3 v3.0.5 h1:hFX25+fxzNjsRlREYsoGNa2LoVEw5mPF8wkWq/UnevQ= -github.com/karlseguin/ccache/v3 v3.0.5/go.mod h1:qxC372+Qn+IBj8Pe3KvGjHPj0sWwEF7AeZVhsNPZ6uY= -github.com/kataras/tablewriter v0.0.0-20180708051242-e063d29b7c23 h1:M8exrBzuhWcU6aoHJlHWPe4qFjVKzkMGRal78f5jRRU= -github.com/kataras/tablewriter v0.0.0-20180708051242-e063d29b7c23/go.mod h1:kBSna6b0/RzsOcOZf515vAXwSsXYusl2U7SA0XP09yI= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.12.3 h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU= @@ -57,6 +57,7 @@ github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZ github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= @@ -67,12 +68,15 @@ github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A= github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/samber/do v1.6.0 h1:Jy/N++BXINDB6lAx5wBlbpHlUdl0FKpLWgGEV9YWqaU= +github.com/samber/do v1.6.0/go.mod h1:DWqBvumy8dyb2vEnYZE7D7zaVEB64J45B0NjTlY/M4k= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/urfave/cli v1.22.14 h1:ebbhrRiGK2i4naQJr+1Xj92HXZCrK7MsyTS/ob3HnAk= github.com/urfave/cli v1.22.14/go.mod h1:X0eDS6pD6Exaclxm99NJ3FiCDRED7vIHpx2mDOHLvkA= @@ -145,6 +149,7 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -173,5 +178,6 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/internal/config/badger.go b/internal/config/badger.go index 586c2da..7299381 100644 --- a/internal/config/badger.go +++ b/internal/config/badger.go @@ -1,5 +1,7 @@ package config type Badger struct { - Path string `json:"path"` + Debug bool + Dir string + ValueDir *string } diff --git a/internal/config/eway.go b/internal/config/eway.go new file mode 100644 index 0000000..98d6daa --- /dev/null +++ b/internal/config/eway.go @@ -0,0 +1,8 @@ +package config + +type Eway struct { + SessionID string + SessionUser string + Contract string + Debug bool +} diff --git a/internal/config/log.go b/internal/config/log.go new file mode 100644 index 0000000..d2826fc --- /dev/null +++ b/internal/config/log.go @@ -0,0 +1,55 @@ +package config + +import ( + "strings" + + "git.loyso.art/frx/eway/internal/entity" +) + +type LogLevel uint8 + +const ( + LogLevelDebug LogLevel = iota + LogLevelInfo + LogLevelWarn +) + +func (l *LogLevel) UnmarshalText(data []byte) (err error) { + switch strings.ToLower(string(data)) { + case "debug": + *l = LogLevelDebug + case "info": + *l = LogLevelInfo + case "warn": + *l = LogLevelWarn + default: + return entity.SimpleError("unsupported level " + string(data)) + } + + return nil +} + +type LogFormat uint8 + +const ( + LogFormatText LogFormat = iota + LogFormatJSON +) + +func (l *LogFormat) UnmarshalText(data []byte) (err error) { + switch strings.ToLower(string(data)) { + case "text": + *l = LogFormatText + case "info": + *l = LogFormatJSON + default: + return entity.SimpleError("unsupported format " + string(data)) + } + + return nil +} + +type Log struct { + Level string `json:"level"` + Format string `json:"format"` +} diff --git a/internal/encoding/fbs/helpers.go b/internal/encoding/fbs/helpers.go index 94bc05f..ed77f6e 100644 --- a/internal/encoding/fbs/helpers.go +++ b/internal/encoding/fbs/helpers.go @@ -1,6 +1,8 @@ package fbs import ( + "encoding/base64" + "fmt" "sync" "git.loyso.art/frx/eway/internal/entity" @@ -21,8 +23,8 @@ func getBuilder() *flatbuffers.Builder { } func putBuilder(builder *flatbuffers.Builder) { - builder.Reset() - builderPool.Put(builder) + // builder.Reset() + // builderPool.Put(builder) } func MakeDomainGoodItems(in ...entity.GoodsItem) []byte { @@ -62,7 +64,10 @@ func makeDomainGoodItem(builder *flatbuffers.Builder, in entity.GoodsItem) flatb sku := builder.CreateString(in.Articul) photo := builder.CreateString(in.Photo) name := builder.CreateString(in.Name) - desc := builder.CreateString(in.Description) + + descBase64 := base64.RawStdEncoding.EncodeToString([]byte(in.Description)) + desc := builder.CreateString(descBase64) + var cat flatbuffers.UOffsetT if in.Category != "" { cat = builder.CreateString(in.Category) @@ -90,12 +95,18 @@ func makeDomainGoodItem(builder *flatbuffers.Builder, in entity.GoodsItem) flatb return GoodItemEnd(builder) } -func ParseGoodsItem(data []byte) (item entity.GoodsItem) { +func ParseGoodsItem(data []byte) (item entity.GoodsItem, err error) { itemFBS := GetRootAsGoodItem(data, 0) item.Articul = string(itemFBS.Sku()) item.Photo = string(itemFBS.Photo()) item.Name = string(itemFBS.Name()) - item.Description = string(itemFBS.Description()) + + description, err := base64.RawStdEncoding.DecodeString(string(itemFBS.Description())) + if err != nil { + return item, fmt.Errorf("decoding description from base64: %w", err) + } + + item.Description = string(description) if value := itemFBS.Category(); value != nil { item.Category = string(value) } @@ -108,7 +119,7 @@ func ParseGoodsItem(data []byte) (item entity.GoodsItem) { item.Cart = itemFBS.Cart() item.Stock = int(itemFBS.Stock()) - return item + return item, nil } func ParseCategory(data []byte) (category entity.Category) { diff --git a/internal/entity/error.go b/internal/entity/error.go index 186ca55..dc0b627 100644 --- a/internal/entity/error.go +++ b/internal/entity/error.go @@ -7,5 +7,6 @@ func (err SimpleError) Error() string { } const ( - ErrNotFound SimpleError = "not found" + ErrNotFound SimpleError = "not found" + ErrNotImplemented SimpleError = "not implemented" ) diff --git a/internal/interconnect/eway/client.go b/internal/interconnect/eway/client.go index ab217e2..ac8e1be 100644 --- a/internal/interconnect/eway/client.go +++ b/internal/interconnect/eway/client.go @@ -12,41 +12,51 @@ import ( "strconv" "strings" + "git.loyso.art/frx/eway/internal/config" "git.loyso.art/frx/eway/internal/entity" "github.com/go-resty/resty/v2" "github.com/rs/zerolog" ) +type Client interface { +} + type client struct { http *resty.Client log zerolog.Logger } -func NewClientWithSession(sessionid, sessionuser string, log zerolog.Logger) client { +type Config config.Eway + +func New(cfg Config, log zerolog.Logger) client { + if cfg.Contract == "" { + cfg.Contract = "6101" + } + cookies := []*http.Cookie{ { Name: "session_id", - Value: sessionid, + Value: cfg.SessionID, Domain: "eway.elevel.ru", HttpOnly: true, }, { Name: "session_user", - Value: sessionuser, + Value: cfg.SessionUser, Domain: "eway.elevel.ru", HttpOnly: true, }, { Name: "contract", - Value: "6101", + Value: cfg.Contract, Domain: "eway.elevel.ru", HttpOnly: true, }, } httpclient := resty.New(). - SetDebug(false). + SetDebug(cfg.Debug). SetCookies(cookies). SetBaseURL("https://eway.elevel.ru/api") diff --git a/internal/storage/badger/category.go b/internal/storage/badger/category.go index 1441c62..909f1c1 100644 --- a/internal/storage/badger/category.go +++ b/internal/storage/badger/category.go @@ -10,7 +10,6 @@ import ( "git.loyso.art/frx/eway/internal/entity" badger "github.com/dgraph-io/badger/v4" - "github.com/rs/zerolog" ) type categoryClient struct { @@ -105,24 +104,14 @@ func (c categoryClient) Get(ctx context.Context, id int64) (out entity.Category, // Create new category inside DB. It also applies new id to it. func (c categoryClient) Create(ctx context.Context, name string) (out entity.Category, err error) { - seqGen, err := c.db.GetSequence(categorySequenceIDKey, 1) - if err != nil { - return out, fmt.Errorf("getting sequence for categories: %w", err) - } - defer func() { - errRelese := seqGen.Release() - if errRelese != nil { - zerolog.Ctx(ctx).Warn().Err(err).Msg("unable to release seq") - } - }() - - nextid, err := seqGen.Next() + nextid, err := c.seqGen.Next() if err != nil { return out, fmt.Errorf("getting next id: %w", err) } out = entity.Category{ - ID: int64(nextid), + // Because first value from sequence generator is 0 + ID: int64(nextid + 1), Name: name, } diff --git a/internal/storage/badger/client.go b/internal/storage/badger/client.go index 6091f89..e12d8d6 100644 --- a/internal/storage/badger/client.go +++ b/internal/storage/badger/client.go @@ -1,45 +1,46 @@ package badger import ( + "fmt" + "git.loyso.art/frx/eway/internal/entity" badger "github.com/dgraph-io/badger/v4" ) var ( - categorySequenceIDKey = []byte("cat:") + categorySequenceIDKey = []byte("!!cat_seq!!") ) type client struct { - db *badger.DB - - // nextCategoryIDSeq *badger.Sequence + db *badger.DB + nextCategoryIDSeq *badger.Sequence } func NewClient(db *badger.DB) (*client, error) { - // categorySeqGen, err := db.GetSequence(categorySequenceIDKey, 10) - // if err != nil { - // return nil, fmt.Errorf("getting sequence for categories: %w", err) - // } - // + categorySeqGen, err := db.GetSequence(categorySequenceIDKey, 10) + if err != nil { + return nil, fmt.Errorf("getting sequence for categories: %w", err) + } + return &client{ - db: db, - // nextCategoryIDSeq: categorySeqGen, + db: db, + nextCategoryIDSeq: categorySeqGen, }, nil } // Close closes the underlying sequences in the client. Should be called right before // underlying *badger.DB closed. func (c *client) Close() error { - // err := c.nextCategoryIDSeq.Release() - // if err != nil { - // return fmt.Errorf("releasing next_category_sequence: %w", err) - // } + err := c.nextCategoryIDSeq.Release() + if err != nil { + return fmt.Errorf("releasing next_category_sequence: %w", err) + } return nil } func (c *client) Category() entity.CategoryRepository { - return newCategoryClient(c.db, nil) + return newCategoryClient(c.db, c.nextCategoryIDSeq) } func (c *client) GoodsItem() entity.GoodsItemRepository { diff --git a/internal/storage/badger/db.go b/internal/storage/badger/db.go index 0c0fe16..b79f10e 100644 --- a/internal/storage/badger/db.go +++ b/internal/storage/badger/db.go @@ -30,21 +30,21 @@ func (za zerologAdapter) fmt(event *zerolog.Event, format string, args ...any) { event.Msgf(strings.TrimSuffix(format, "\n"), args...) } -func Open(ctx context.Context, path string, log zerolog.Logger) (*badger.DB, error) { +func Open(ctx context.Context, path string, debug bool, log zerolog.Logger) (*badger.DB, error) { bl := zerologAdapter{ log: log.With().Str("db", "badger").Logger(), } + + level := badger.INFO + if debug { + level = badger.DEBUG + } opts := badger.DefaultOptions(path). WithLogger(bl). - WithLoggingLevel(badger.INFO). + WithLoggingLevel(level). WithValueLogFileSize(4 << 20). WithDir(path). WithValueDir(path) - // WithMaxLevels(4). - // WithMemTableSize(8 << 20). - // WithMetricsEnabled(true). - // WithCompactL0OnClose(true). - // WithBlockCacheSize(8 << 20) db, err := badger.Open(opts) if err != nil { diff --git a/internal/storage/badger/goodsitem.go b/internal/storage/badger/goodsitem.go index 6b39e86..8e3a380 100644 --- a/internal/storage/badger/goodsitem.go +++ b/internal/storage/badger/goodsitem.go @@ -1,22 +1,24 @@ package badger import ( + "bytes" "context" "encoding/binary" + "encoding/json" "errors" "fmt" - "runtime" "unsafe" "git.loyso.art/frx/eway/internal/encoding/fbs" "git.loyso.art/frx/eway/internal/entity" badger "github.com/dgraph-io/badger/v4" - "github.com/dgraph-io/badger/v4/pb" "github.com/dgraph-io/ristretto/z" "github.com/rs/zerolog" ) +const useJSON = false + type goodsItemClient struct { db *badger.DB } @@ -65,7 +67,21 @@ func (c *goodsItemClient) ListIter( } for _, kv := range list.GetKv() { - bus <- fbs.ParseGoodsItem(kv.GetValue()) + var gooditem entity.GoodsItem + + if useJSON { + err = json.Unmarshal(kv.GetValue(), &gooditem) + if err != nil { + return err + } + } else { + gooditem, err = fbs.ParseGoodsItem(kv.GetValue()) + if err != nil { + return err + } + } + + bus <- gooditem } return nil @@ -74,10 +90,11 @@ func (c *goodsItemClient) ListIter( go func(ctx context.Context) { defer close(bus) - err := stream.Orchestrate(context.Background()) + err := stream.Orchestrate(ctx) if err != nil { zerolog.Ctx(ctx).Warn().Err(err).Msg("unable to orchestrate") } + println("finished") }(ctx) return bus, nil @@ -94,10 +111,23 @@ func (c *goodsItemClient) List( defer iter.Close() prefix := c.prefix() + var cursor int for iter.Seek(prefix); iter.ValidForPrefix(prefix); iter.Next() { + cursor++ current := iter.Item() err = current.Value(func(val []byte) error { - goodsItem := fbs.ParseGoodsItem(val) + var goodsItem entity.GoodsItem + if useJSON { + err := json.Unmarshal(val, &goodsItem) + if err != nil { + return err + } + } else { + goodsItem, err = fbs.ParseGoodsItem(val) + if err != nil { + return err + } + } out = append(out, goodsItem) return nil @@ -152,6 +182,8 @@ func (c *goodsItemClient) GetByCart(ctx context.Context, id int64) (out entity.G return fmt.Errorf("getting value of idx: %w", err) } + sku = bytes.TrimPrefix(sku, c.prefix()) + out, err = c.getBySKU(sku, txn) return err }) @@ -167,75 +199,15 @@ func (c *goodsItemClient) GetByCart(ctx context.Context, id int64) (out entity.G } func (c *goodsItemClient) UpsertMany(ctx context.Context, items ...entity.GoodsItem) ([]entity.GoodsItem, error) { - return items, c.upsertByOne(ctx, items) -} - -func (c *goodsItemClient) upsertByOne(ctx context.Context, items []entity.GoodsItem) error { - return c.db.Update(func(txn *badger.Txn) error { - for _, item := range items { - key := c.prefixedStr(item.Articul) - value := fbs.MakeDomainGoodItemFinished(item) - valueIdx := make([]byte, len(key)) - copy(valueIdx, key) - - err := txn.Set(key, value) - if err != nil { - return err - } - - err = txn.Set(c.prefixedIDByCartStr(item.Cart), valueIdx) - if err != nil { - return err - } - } - - return nil - }) -} - -func (c *goodsItemClient) upsertByStream(ctx context.Context, items []entity.GoodsItem) error { - stream := c.db.NewStreamWriter() - defer stream.Cancel() - - err := stream.Prepare() - if err != nil { - return fmt.Errorf("preparing stream: %w", err) - } - - buf := z.NewBuffer(len(items), "sometag") - for _, item := range items { - key := c.prefixedStr(item.Articul) - keyIdx := c.prefixedIDByCartStr(item.Cart) - value := fbs.MakeDomainGoodItemFinished(item) - - itemKV := &pb.KV{Key: key, Value: value} - itemKVIdx := &pb.KV{Key: keyIdx, Value: key} - - badger.KVToBuffer(itemKV, buf) - badger.KVToBuffer(itemKVIdx, buf) - } - - err = stream.Write(buf) - if err != nil { - return fmt.Errorf("writing buf: %w", err) - } - - err = stream.Flush() - if err != nil { - return fmt.Errorf("flushing changes: %w", err) - } - - return nil + return items, c.upsertByBatch(ctx, items) } func (c *goodsItemClient) upsertByBatch(ctx context.Context, items []entity.GoodsItem) error { batch := c.db.NewWriteBatch() - defer func() { - println("closing batch") - batch.Cancel() - }() + defer batch.Cancel() log := zerolog.Ctx(ctx) + for _, item := range items { select { case <-ctx.Done(): @@ -243,9 +215,16 @@ func (c *goodsItemClient) upsertByBatch(ctx context.Context, items []entity.Good default: } key := c.prefixedStr(item.Articul) - value := fbs.MakeDomainGoodItemFinished(item) + var value []byte + if useJSON { + value, _ = json.Marshal(item) + } else { + value = fbs.MakeDomainGoodItemFinished(item) + } + idxValue := make([]byte, len(key)) copy(idxValue, key) + coreEntry := badger.NewEntry(key, value) if err := batch.SetEntry(coreEntry); err != nil { log.Warn().Err(err).Msg("unable to set item, breaking") @@ -258,14 +237,10 @@ func (c *goodsItemClient) upsertByBatch(ctx context.Context, items []entity.Good log.Warn().Err(err).Msg("unable to set idx, breaking") break } - runtime.Gosched() } - println("flushing") err := batch.Flush() - runtime.Gosched() if err != nil { - println("flush err", err.Error()) return fmt.Errorf("flushing changes: %w", err) } @@ -279,9 +254,14 @@ func (c *goodsItemClient) getBySKU(sku []byte, txn *badger.Txn) (out entity.Good } err = item.Value(func(val []byte) error { - out = fbs.ParseGoodsItem(val) - return nil + if useJSON { + return json.Unmarshal(val, &out) + } + + out, err = fbs.ParseGoodsItem(val) + return err }) + if err != nil { return out, fmt.Errorf("reading value: %w", err) } diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 1144e49..3bdfd72 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -1,8 +1,14 @@ package storage -import "git.loyso.art/frx/eway/internal/entity" +import ( + "io" + + "git.loyso.art/frx/eway/internal/entity" +) type Repository interface { + io.Closer + Category() entity.CategoryRepository GoodsItem() entity.GoodsItemRepository }