fix saving and project improvments
This commit is contained in:
129
cmd/converter/components/di.go
Normal file
129
cmd/converter/components/di.go
Normal file
@ -0,0 +1,129 @@
|
||||
package components
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"git.loyso.art/frx/eway/internal/config"
|
||||
"git.loyso.art/frx/eway/internal/interconnect/eway"
|
||||
"git.loyso.art/frx/eway/internal/storage"
|
||||
xbadger "git.loyso.art/frx/eway/internal/storage/badger"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/samber/do"
|
||||
)
|
||||
|
||||
// Yeah, singleton is not good UNLESS you're really lazy
|
||||
var diInjector *do.Injector
|
||||
|
||||
func GetRepository() (storage.Repository, error) {
|
||||
adapter, err := do.Invoke[*storageRepositoryAdapter](diInjector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return adapter.entity, nil
|
||||
}
|
||||
|
||||
func GetLogger() (zerolog.Logger, error) {
|
||||
return do.Invoke[zerolog.Logger](diInjector)
|
||||
}
|
||||
|
||||
func SetupDI(ctx context.Context, cfgpath string) error {
|
||||
cfg, err := parseSettings(cfgpath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
diInjector = do.New()
|
||||
|
||||
do.Provide(diInjector, func(i *do.Injector) (zerolog.Logger, error) {
|
||||
tsSet := func(wr *zerolog.ConsoleWriter) {
|
||||
wr.TimeFormat = time.RFC3339
|
||||
}
|
||||
|
||||
log := zerolog.New(zerolog.NewConsoleWriter(tsSet)).With().Timestamp().Str("app", "converter").Logger()
|
||||
|
||||
return log, nil
|
||||
})
|
||||
|
||||
do.Provide[eway.Client](diInjector, func(i *do.Injector) (eway.Client, error) {
|
||||
log, err := GetLogger()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting logger: %w", err)
|
||||
}
|
||||
|
||||
client := eway.New(eway.Config(cfg.Eway), log)
|
||||
return client, nil
|
||||
})
|
||||
|
||||
do.Provide[*badgerDBAdapter](diInjector, func(i *do.Injector) (*badgerDBAdapter, error) {
|
||||
db, err := xbadger.Open(ctx, cfg.Badger.Dir, cfg.Badger.Debug, zerolog.Nop())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting db: %w", err)
|
||||
}
|
||||
|
||||
out := &badgerDBAdapter{entity: db}
|
||||
return out, nil
|
||||
})
|
||||
|
||||
do.Provide[*storageRepositoryAdapter](diInjector, func(i *do.Injector) (*storageRepositoryAdapter, error) {
|
||||
db, err := getDB()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client, err := xbadger.NewClient(db)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting badger client: %w", err)
|
||||
}
|
||||
|
||||
out := &storageRepositoryAdapter{entity: client}
|
||||
return out, nil
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func Shutdown() error {
|
||||
return diInjector.Shutdown()
|
||||
}
|
||||
|
||||
func getDB() (*badger.DB, error) {
|
||||
adapter, err := do.Invoke[*badgerDBAdapter](diInjector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return adapter.entity, nil
|
||||
}
|
||||
|
||||
type settings struct {
|
||||
Badger config.Badger
|
||||
Log config.Log
|
||||
Eway config.Eway
|
||||
}
|
||||
|
||||
func parseSettings(cfgpath string) (cfg settings, err error) {
|
||||
_, err = toml.DecodeFile(cfgpath, &cfg)
|
||||
if err != nil {
|
||||
return cfg, fmt.Errorf("parsing file: %w", err)
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
type entityCloserAdapter[T io.Closer] struct {
|
||||
entity T
|
||||
}
|
||||
|
||||
func (a entityCloserAdapter[T]) Shutdown() error {
|
||||
return a.entity.Close()
|
||||
}
|
||||
|
||||
type storageRepositoryAdapter entityCloserAdapter[storage.Repository]
|
||||
type badgerDBAdapter entityCloserAdapter[*badger.DB]
|
||||
@ -11,206 +11,196 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.loyso.art/frx/eway/internal/config"
|
||||
"git.loyso.art/frx/eway/cmd/converter/components"
|
||||
"git.loyso.art/frx/eway/internal/encoding/fbs"
|
||||
"git.loyso.art/frx/eway/internal/entity"
|
||||
"git.loyso.art/frx/eway/internal/storage"
|
||||
xbadger "git.loyso.art/frx/eway/internal/storage/badger"
|
||||
|
||||
badger "github.com/dgraph-io/badger/v4"
|
||||
"github.com/dgraph-io/badger/v4/pb"
|
||||
"github.com/rodaine/table"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/urfave/cli"
|
||||
)
|
||||
|
||||
type appSettings struct {
|
||||
Badger config.Badger
|
||||
}
|
||||
|
||||
func main() {
|
||||
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
|
||||
var corewg sync.WaitGroup
|
||||
err := runcli(ctx, &corewg)
|
||||
err := runcli(ctx)
|
||||
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "unable to handle app: %v", err)
|
||||
|
||||
corewg.Wait()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
corewg.Wait()
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
func runcli(ctx context.Context, wg *sync.WaitGroup) (err error) {
|
||||
tsSet := func(wr *zerolog.ConsoleWriter) {
|
||||
wr.TimeFormat = time.RFC3339
|
||||
}
|
||||
log := zerolog.New(zerolog.NewConsoleWriter(tsSet)).Level(zerolog.DebugLevel).With().Timestamp().Str("app", "converter").Logger()
|
||||
|
||||
defer func() {
|
||||
log.Info().Err(err).Msg("app finished")
|
||||
}()
|
||||
|
||||
ctx = log.WithContext(ctx)
|
||||
|
||||
log.Info().Msg("making badger")
|
||||
|
||||
db, err := xbadger.Open(ctx, "badger/", log)
|
||||
if err != nil {
|
||||
return fmt.Errorf("opening badger: %w", err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
var item atomic.Uint64
|
||||
err = db.Subscribe(
|
||||
ctx,
|
||||
func(kvlist *badger.KVList) error {
|
||||
kvs := kvlist.GetKv()
|
||||
for _, kv := range kvs {
|
||||
count := item.Add(1)
|
||||
log.Debug().Bytes("key", kv.GetKey()).Uint64("count", count).Msg("inspecting")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
[]pb.Match{
|
||||
{
|
||||
Prefix: []byte("!!category!!"),
|
||||
},
|
||||
{
|
||||
Prefix: []byte("!!goodsitem!!"),
|
||||
},
|
||||
},
|
||||
)
|
||||
log.Err(err).Msg("subscribing")
|
||||
|
||||
}()
|
||||
|
||||
maxBatch := db.MaxBatchCount()
|
||||
log.Info().Int("max_batch", int(maxBatch)).Msg("max batch settings")
|
||||
client, err := xbadger.NewClient(db)
|
||||
if err != nil {
|
||||
return fmt.Errorf("making new client: %w", err)
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
defer func() {
|
||||
defer wg.Done()
|
||||
|
||||
println("closing client")
|
||||
errClose := client.Close()
|
||||
if errClose != nil {
|
||||
log.Warn().Err(errClose).Msg("unable to close client")
|
||||
}
|
||||
|
||||
println("flushing db")
|
||||
errSync := db.Sync()
|
||||
if errSync != nil {
|
||||
log.Warn().Err(errSync).Msg("unable to sync db")
|
||||
}
|
||||
|
||||
// time.Sleep(time.Second * 5)
|
||||
|
||||
println("closing db")
|
||||
errClose = db.Close()
|
||||
if errClose != nil {
|
||||
log.Warn().Err(errClose).Msg("unable to close db")
|
||||
}
|
||||
}()
|
||||
|
||||
app := setupCLI(ctx, client, maxBatch)
|
||||
|
||||
func runcli(ctx context.Context) (err error) {
|
||||
app := setupCLI(ctx)
|
||||
return app.Run(os.Args)
|
||||
}
|
||||
|
||||
func setupCLI(ctx context.Context, r storage.Repository, maxBatch int64) *cli.App {
|
||||
app := cli.NewApp()
|
||||
func setupDI(ctx context.Context) cli.BeforeFunc {
|
||||
return func(c *cli.Context) error {
|
||||
cfgpath := c.String("config")
|
||||
|
||||
err := components.SetupDI(ctx, cfgpath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("setting up di: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func releaseDI(c *cli.Context) error {
|
||||
log, err := components.GetLogger()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting logger: %w", err)
|
||||
}
|
||||
|
||||
log.Info().Msg("shutting down env")
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
since := time.Since(start)
|
||||
log.Err(err).Dur("elapsed", since).Msg("shutdown finished")
|
||||
}()
|
||||
|
||||
return components.Shutdown()
|
||||
}
|
||||
|
||||
func setupCLI(ctx context.Context) *cli.App {
|
||||
app := cli.NewApp()
|
||||
app.Flags = append(
|
||||
app.Flags,
|
||||
cli.StringFlag{
|
||||
Name: "config",
|
||||
Usage: "path to config in TOML format",
|
||||
Value: "config.toml",
|
||||
TakesFile: true,
|
||||
},
|
||||
)
|
||||
|
||||
app.Before = setupDI(ctx)
|
||||
app.After = releaseDI
|
||||
app.Commands = cli.Commands{
|
||||
newImportCmd(ctx, r, maxBatch),
|
||||
newViewCmd(ctx, r),
|
||||
newImportCmd(ctx),
|
||||
newViewCmd(ctx),
|
||||
cli.Command{
|
||||
Name: "test-fbs",
|
||||
Usage: "a simple check for tbs",
|
||||
Action: cli.ActionFunc(func(c *cli.Context) error {
|
||||
gooditem := entity.GoodsItem{
|
||||
Articul: "some-sku",
|
||||
Photo: "/photo/path.jpg",
|
||||
Name: "some-name",
|
||||
Description: "bad-desc",
|
||||
Category: "",
|
||||
Type: "some-type",
|
||||
Producer: "my-producer",
|
||||
Pack: 123,
|
||||
Step: 10,
|
||||
Price: 12.34,
|
||||
TariffPrice: 43.21,
|
||||
Cart: 1998,
|
||||
Stock: 444,
|
||||
}
|
||||
|
||||
data := fbs.MakeDomainGoodItemFinished(gooditem)
|
||||
datahexed := hex.EncodeToString(data)
|
||||
println(datahexed)
|
||||
|
||||
got, err := fbs.ParseGoodsItem(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parsing: %w", err)
|
||||
}
|
||||
|
||||
if got != gooditem {
|
||||
gotStr := fmt.Sprintf("%v", got)
|
||||
hasStr := fmt.Sprintf("%v", gooditem)
|
||||
println(gotStr, "\n", hasStr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}),
|
||||
},
|
||||
}
|
||||
|
||||
return app
|
||||
}
|
||||
|
||||
func newImportCmd(ctx context.Context, r storage.Repository, maxBatch int64) cli.Command {
|
||||
func newImportCmd(ctx context.Context) cli.Command {
|
||||
return cli.Command{
|
||||
Name: "import",
|
||||
Usage: "category for importing data from sources",
|
||||
Flags: []cli.Flag{
|
||||
// &cli.StringFlag{
|
||||
// Name: "config",
|
||||
// Usage: "path to config",
|
||||
// Value: "config.json",
|
||||
// TakesFile: true,
|
||||
// },
|
||||
&cli.BoolFlag{
|
||||
Name: "verbose",
|
||||
Usage: "set logger to debug mode",
|
||||
},
|
||||
},
|
||||
Before: func(c *cli.Context) error {
|
||||
if c.Bool("verbose") {
|
||||
zerolog.SetGlobalLevel(zerolog.DebugLevel)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
Subcommands: cli.Commands{
|
||||
newImportFromFileCmd(ctx, r, maxBatch),
|
||||
newImportFromFileCmd(ctx),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newViewCmd(ctx context.Context, r storage.Repository) cli.Command {
|
||||
func newViewCmd(ctx context.Context) cli.Command {
|
||||
return cli.Command{
|
||||
Name: "view",
|
||||
Usage: "Set of commands to view the data inside db",
|
||||
Subcommands: []cli.Command{
|
||||
newViewCategoriesCmd(ctx, r),
|
||||
newViewItemsCmd(ctx, r),
|
||||
newViewCategoriesCmd(ctx),
|
||||
newViewItemsCmd(ctx),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newViewCategoriesCmd(ctx context.Context, r storage.Repository) cli.Command {
|
||||
func newViewCategoriesCmd(ctx context.Context) cli.Command {
|
||||
return cli.Command{
|
||||
Name: "categories",
|
||||
Usage: "Set of commands to work with categories",
|
||||
Subcommands: []cli.Command{
|
||||
newViewCategoriesListCmd(ctx, r),
|
||||
newViewCategoriesListCmd(ctx),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newViewItemsCmd(ctx context.Context, r storage.Repository) cli.Command {
|
||||
func newViewItemsCmd(ctx context.Context) cli.Command {
|
||||
return cli.Command{
|
||||
Name: "items",
|
||||
Usage: "Set of command to work with items",
|
||||
Subcommands: cli.Commands{
|
||||
newViewItemsCountCmd(ctx, r),
|
||||
newViewItemsGetCmd(ctx),
|
||||
newViewItemsCountCmd(ctx),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newViewItemsCountCmd(ctx context.Context, r storage.Repository) cli.Command {
|
||||
func newViewItemsCountCmd(ctx context.Context) cli.Command {
|
||||
return cli.Command{
|
||||
Name: "count",
|
||||
Usage: "iterates over collection and counts number of items",
|
||||
Action: viewItemsCount(ctx, r),
|
||||
Action: decorateAction(ctx, viewItemsCountAction),
|
||||
}
|
||||
}
|
||||
|
||||
func newViewCategoriesListCmd(ctx context.Context, r storage.Repository) cli.Command {
|
||||
func newViewItemsGetCmd(ctx context.Context) cli.Command {
|
||||
return cli.Command{
|
||||
Name: "get",
|
||||
Usage: "gets goods item by its id",
|
||||
Flags: []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "id",
|
||||
Usage: "id of the goods item. Either id or cart-id should be set",
|
||||
},
|
||||
&cli.Int64Flag{
|
||||
Name: "cart-id",
|
||||
Usage: "cart-id of the item. Either cart-id or id should be set",
|
||||
},
|
||||
},
|
||||
Action: decorateAction(ctx, viewItemsGetAction),
|
||||
}
|
||||
}
|
||||
|
||||
func newViewCategoriesListCmd(ctx context.Context) cli.Command {
|
||||
return cli.Command{
|
||||
Name: "list",
|
||||
Usage: "lists stored categories stored in database",
|
||||
@ -230,84 +220,11 @@ func newViewCategoriesListCmd(ctx context.Context, r storage.Repository) cli.Com
|
||||
Usage: "prints total count of categories",
|
||||
},
|
||||
},
|
||||
Action: viewCategoriesListAction(ctx, r),
|
||||
Action: decorateAction(ctx, viewCategoriesListAction),
|
||||
}
|
||||
}
|
||||
|
||||
func viewItemsCount(ctx context.Context, r storage.Repository) cli.ActionFunc {
|
||||
f := func(c *cli.Context) error {
|
||||
// itemChan, err := r.GoodsItem().ListIter(ctx, 10)
|
||||
// if err != nil {
|
||||
// return fmt.Errorf("getting list iter: %w", err)
|
||||
// }
|
||||
//
|
||||
var count int
|
||||
// for range itemChan {
|
||||
// count++
|
||||
// }
|
||||
//
|
||||
items, err := r.GoodsItem().List(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting list: %w", err)
|
||||
}
|
||||
|
||||
zerolog.Ctx(ctx).Info().Int("count", count).Int("list_count", len(items)).Msg("read all items")
|
||||
return nil
|
||||
}
|
||||
|
||||
return wrapActionFunc(ctx, f)
|
||||
}
|
||||
|
||||
func viewCategoriesListAction(ctx context.Context, r storage.Repository) cli.ActionFunc {
|
||||
f := func(c *cli.Context) error {
|
||||
categories, err := r.Category().List(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("listing categories: %w", err)
|
||||
}
|
||||
|
||||
limit := c.Int("limit")
|
||||
page := c.Int("page")
|
||||
total := len(categories)
|
||||
|
||||
if page == 0 {
|
||||
page = 1
|
||||
}
|
||||
|
||||
if limit > 0 {
|
||||
offset := (page - 1) * limit
|
||||
if offset > len(categories) {
|
||||
offset = len(categories) - 1
|
||||
}
|
||||
|
||||
limit = offset + limit
|
||||
if limit > len(categories) {
|
||||
limit = len(categories)
|
||||
}
|
||||
|
||||
categories = categories[offset:limit]
|
||||
}
|
||||
|
||||
tbl := table.New("ID", "Name")
|
||||
for _, category := range categories {
|
||||
if category.ID == 0 && category.Name == "" {
|
||||
continue
|
||||
}
|
||||
tbl.AddRow(category.ID, category.Name)
|
||||
}
|
||||
|
||||
tbl.Print()
|
||||
|
||||
if c.Bool("with-total") {
|
||||
zerolog.Ctx(ctx).Info().Int("count", total).Msg("total categories stats")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return wrapActionFunc(ctx, f)
|
||||
}
|
||||
|
||||
func newImportFromFileCmd(ctx context.Context, r storage.Repository, maxBatch int64) cli.Command {
|
||||
func newImportFromFileCmd(ctx context.Context) cli.Command {
|
||||
return cli.Command{
|
||||
Name: "fromfile",
|
||||
Usage: "imports from file into db",
|
||||
@ -319,126 +236,250 @@ func newImportFromFileCmd(ctx context.Context, r storage.Repository, maxBatch in
|
||||
Required: true,
|
||||
},
|
||||
},
|
||||
Action: handleConvert(ctx, r, maxBatch),
|
||||
Action: decorateAction(ctx, importFromFileAction),
|
||||
}
|
||||
}
|
||||
|
||||
func handleConvert(ctx context.Context, r storage.Repository, maxBatch int64) cli.ActionFunc {
|
||||
f := func(c *cli.Context) error {
|
||||
filesrc := c.String("src")
|
||||
log := zerolog.Ctx(ctx)
|
||||
func viewItemsGetAction(ctx context.Context, c *cli.Context) error {
|
||||
r, err := components.GetRepository()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting repository: %w", err)
|
||||
}
|
||||
|
||||
log.Debug().Str("filepath", filesrc).Msg("importing data from file")
|
||||
id := c.String("id")
|
||||
cartID := c.Int64("cart-id")
|
||||
if id == "" && cartID == 0 {
|
||||
return cli.NewExitError("oneof: id or cart-id should be set", 1)
|
||||
} else if id != "" && cartID != 0 {
|
||||
return cli.NewExitError("oneof: id or cart-id should be set", 1)
|
||||
}
|
||||
|
||||
productsFile, err := os.Open(filesrc)
|
||||
if err != nil {
|
||||
return fmt.Errorf("opening file: %w", err)
|
||||
var item entity.GoodsItem
|
||||
if id != "" {
|
||||
item, err = r.GoodsItem().Get(ctx, id)
|
||||
} else {
|
||||
item, err = r.GoodsItem().GetByCart(ctx, cartID)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting item: %w", err)
|
||||
}
|
||||
|
||||
enc := json.NewEncoder(os.Stdout)
|
||||
enc.SetIndent("", " ")
|
||||
|
||||
err = enc.Encode(item)
|
||||
if err != nil {
|
||||
return fmt.Errorf("encoding item: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func viewItemsCountAction(ctx context.Context, c *cli.Context) error {
|
||||
r, err := components.GetRepository()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting repository: %w", err)
|
||||
}
|
||||
|
||||
itemChan, err := r.GoodsItem().ListIter(ctx, 10)
|
||||
if err != nil {
|
||||
if !errors.Is(err, entity.ErrNotImplemented) {
|
||||
return fmt.Errorf("getting list iter: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
errClose := productsFile.Close()
|
||||
if errClose != nil {
|
||||
log.Warn().Err(errClose).Msg("unable to close file")
|
||||
}
|
||||
}()
|
||||
|
||||
var (
|
||||
goodsItem entity.GoodsItem
|
||||
// goodsItems []entity.GoodsItem
|
||||
failedToInsert int
|
||||
)
|
||||
|
||||
seenCategories := make(map[string]struct{})
|
||||
categories, err := r.Category().List(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("listing categories: %w", err)
|
||||
}
|
||||
|
||||
for _, category := range categories {
|
||||
seenCategories[category.Name] = struct{}{}
|
||||
}
|
||||
|
||||
bfile := bufio.NewReader(productsFile)
|
||||
for {
|
||||
line, _, err := bfile.ReadLine()
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
|
||||
var count int
|
||||
if err == nil {
|
||||
var done bool
|
||||
for !done {
|
||||
select {
|
||||
case _, ok := <-itemChan:
|
||||
if !ok {
|
||||
done = true
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("reading line: %w", err)
|
||||
count++
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
err = json.Unmarshal(line, &goodsItem)
|
||||
if err != nil {
|
||||
// log.Warn().Err(err).Str("line", string(line)).Msg("unable to unmarshal line into item")
|
||||
failedToInsert++
|
||||
continue
|
||||
}
|
||||
|
||||
_, err = r.GoodsItem().UpsertMany(ctx, goodsItem)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to upsert new item: %w", err)
|
||||
}
|
||||
|
||||
// goodsItems = append(goodsItems, goodsItem)
|
||||
|
||||
if _, ok := seenCategories[goodsItem.Type]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
_, err = r.Category().Create(ctx, goodsItem.Type)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create new category: %w", err)
|
||||
}
|
||||
log.Debug().Any("category", goodsItem.Type).Msg("inserted new category")
|
||||
seenCategories[goodsItem.Type] = struct{}{}
|
||||
}
|
||||
|
||||
// log.Debug().Int("count", len(goodsItems)).Int("failed", failedToInsert).Msg("preparing to upload")
|
||||
//
|
||||
// start := time.Now()
|
||||
// batchSize := int(maxBatch)
|
||||
// for i := 0; i < len(goodsItems); i += batchSize {
|
||||
// to := i + batchSize
|
||||
// if to > len(goodsItems) {
|
||||
// to = len(goodsItems)
|
||||
// }
|
||||
//
|
||||
// _, err = r.GoodsItem().UpsertMany(ctx, goodsItems[i:to]...)
|
||||
// if err != nil {
|
||||
// return fmt.Errorf("upserting items: %w", err)
|
||||
// }
|
||||
// log.Debug().Int("count", to-i).Msg("inserted batch")
|
||||
// time.Sleep(time.Second)
|
||||
// }
|
||||
// log.Debug().Dur("elapsed", time.Since(start)).Msg("upload finished")
|
||||
//
|
||||
|
||||
time.Sleep(time.Second * 30)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
time.Sleep(time.Second * 10)
|
||||
items, err := r.GoodsItem().List(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting list: %w", err)
|
||||
}
|
||||
|
||||
return wrapActionFunc(ctx, f)
|
||||
zerolog.Ctx(ctx).Info().Int("count", count).Int("list_count", len(items)).Msg("read all items")
|
||||
return nil
|
||||
}
|
||||
|
||||
func wrapActionFunc(ctx context.Context, next cli.ActionFunc) cli.ActionFunc {
|
||||
func viewCategoriesListAction(ctx context.Context, c *cli.Context) error {
|
||||
r, err := components.GetRepository()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting repository: %w", err)
|
||||
}
|
||||
|
||||
categories, err := r.Category().List(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("listing categories: %w", err)
|
||||
}
|
||||
|
||||
limit := c.Int("limit")
|
||||
page := c.Int("page")
|
||||
total := len(categories)
|
||||
|
||||
if page == 0 {
|
||||
page = 1
|
||||
}
|
||||
|
||||
if limit > 0 {
|
||||
offset := (page - 1) * limit
|
||||
if offset > len(categories) {
|
||||
offset = len(categories) - 1
|
||||
}
|
||||
|
||||
limit = offset + limit
|
||||
if limit > len(categories) {
|
||||
limit = len(categories)
|
||||
}
|
||||
|
||||
categories = categories[offset:limit]
|
||||
}
|
||||
|
||||
tbl := table.New("ID", "Name")
|
||||
for _, category := range categories {
|
||||
if category.ID == 0 && category.Name == "" {
|
||||
continue
|
||||
}
|
||||
tbl.AddRow(category.ID, category.Name)
|
||||
}
|
||||
|
||||
tbl.Print()
|
||||
|
||||
if c.Bool("with-total") {
|
||||
zerolog.Ctx(ctx).Info().Int("count", total).Msg("total categories stats")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func importFromFileAction(ctx context.Context, c *cli.Context) error {
|
||||
const maxBatch = 2000
|
||||
|
||||
r, err := components.GetRepository()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting repository: %w", err)
|
||||
}
|
||||
|
||||
filesrc := c.String("src")
|
||||
log := zerolog.Ctx(ctx)
|
||||
|
||||
log.Debug().Str("filepath", filesrc).Msg("importing data from file")
|
||||
|
||||
productsFile, err := os.Open(filesrc)
|
||||
if err != nil {
|
||||
return fmt.Errorf("opening file: %w", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
errClose := productsFile.Close()
|
||||
if errClose != nil {
|
||||
log.Warn().Err(errClose).Msg("unable to close file")
|
||||
}
|
||||
}()
|
||||
|
||||
var (
|
||||
goodsItem entity.GoodsItem
|
||||
goodsItems []entity.GoodsItem
|
||||
failedToInsert int
|
||||
)
|
||||
|
||||
seenCategories := make(map[string]struct{})
|
||||
categories, err := r.Category().List(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("listing categories: %w", err)
|
||||
}
|
||||
|
||||
for _, category := range categories {
|
||||
seenCategories[category.Name] = struct{}{}
|
||||
}
|
||||
|
||||
bfile := bufio.NewReader(productsFile)
|
||||
for {
|
||||
line, _, err := bfile.ReadLine()
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
|
||||
}
|
||||
return fmt.Errorf("reading line: %w", err)
|
||||
}
|
||||
|
||||
err = json.Unmarshal(line, &goodsItem)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Str("line", string(line)).Msg("unable to unmarshal line into item")
|
||||
failedToInsert++
|
||||
continue
|
||||
}
|
||||
|
||||
goodsItems = append(goodsItems, goodsItem)
|
||||
|
||||
if _, ok := seenCategories[goodsItem.Type]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
_, err = r.Category().Create(ctx, goodsItem.Type)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create new category: %w", err)
|
||||
}
|
||||
log.Debug().Any("category", goodsItem.Type).Msg("inserted new category")
|
||||
seenCategories[goodsItem.Type] = struct{}{}
|
||||
}
|
||||
|
||||
log.Debug().Int("count", len(goodsItems)).Int("failed", failedToInsert).Msg("preparing to upload")
|
||||
|
||||
start := time.Now()
|
||||
batchSize := int(maxBatch)
|
||||
for i := 0; i < len(goodsItems); i += batchSize {
|
||||
to := i + batchSize
|
||||
if to > len(goodsItems) {
|
||||
to = len(goodsItems)
|
||||
}
|
||||
|
||||
_, err = r.GoodsItem().UpsertMany(ctx, goodsItems[i:to]...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("upserting items: %w", err)
|
||||
}
|
||||
log.Debug().Int("count", to-i).Msg("inserted batch")
|
||||
}
|
||||
log.Debug().Dur("elapsed", time.Since(start)).Msg("upload finished")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type action func(ctx context.Context, c *cli.Context) error
|
||||
|
||||
func decorateAction(ctx context.Context, a action) cli.ActionFunc {
|
||||
return func(c *cli.Context) error {
|
||||
var data [3]byte
|
||||
_, _ = rand.Read(data[:])
|
||||
reqid := hex.EncodeToString(data[:])
|
||||
|
||||
log := zerolog.Ctx(ctx).With().Str("reqid", reqid).Logger()
|
||||
log, err := components.GetLogger()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting logger: %w", err)
|
||||
}
|
||||
|
||||
log = log.With().Str("reqid", reqid).Logger()
|
||||
ctx = log.WithContext(ctx)
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
log.Info().Dur("elapsed", time.Since(start)).Msg("command completed")
|
||||
}()
|
||||
|
||||
log.Info().Msg("command execution started")
|
||||
return next(c)
|
||||
return a(ctx, c)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user