package main import ( "bufio" "context" "crypto/rand" "encoding/hex" "encoding/json" "errors" "fmt" "io" "os" "os/signal" "sync" "sync/atomic" "time" "git.loyso.art/frx/eway/internal/config" "git.loyso.art/frx/eway/internal/entity" "git.loyso.art/frx/eway/internal/storage" xbadger "git.loyso.art/frx/eway/internal/storage/badger" badger "github.com/dgraph-io/badger/v4" "github.com/dgraph-io/badger/v4/pb" "github.com/rodaine/table" "github.com/rs/zerolog" "github.com/urfave/cli" ) type appSettings struct { Badger config.Badger } func main() { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) defer func() { cancel() }() var corewg sync.WaitGroup err := runcli(ctx, &corewg) if err != nil { fmt.Fprintf(os.Stderr, "unable to handle app: %v", err) corewg.Wait() os.Exit(1) } corewg.Wait() os.Exit(0) } func runcli(ctx context.Context, wg *sync.WaitGroup) (err error) { tsSet := func(wr *zerolog.ConsoleWriter) { wr.TimeFormat = time.RFC3339 } log := zerolog.New(zerolog.NewConsoleWriter(tsSet)).Level(zerolog.DebugLevel).With().Timestamp().Str("app", "converter").Logger() defer func() { log.Info().Err(err).Msg("app finished") }() ctx = log.WithContext(ctx) log.Info().Msg("making badger") db, err := xbadger.Open(ctx, "badger/", log) if err != nil { return fmt.Errorf("opening badger: %w", err) } go func() { var item atomic.Uint64 err = db.Subscribe( ctx, func(kvlist *badger.KVList) error { kvs := kvlist.GetKv() for _, kv := range kvs { count := item.Add(1) log.Debug().Bytes("key", kv.GetKey()).Uint64("count", count).Msg("inspecting") } return nil }, []pb.Match{ { Prefix: []byte("!!category!!"), }, { Prefix: []byte("!!goodsitem!!"), }, }, ) log.Err(err).Msg("subscribing") }() maxBatch := db.MaxBatchCount() log.Info().Int("max_batch", int(maxBatch)).Msg("max batch settings") client, err := xbadger.NewClient(db) if err != nil { return fmt.Errorf("making new client: %w", err) } wg.Add(1) defer func() { defer wg.Done() println("closing client") errClose := client.Close() if errClose != nil { log.Warn().Err(errClose).Msg("unable to close client") } println("flushing db") errSync := db.Sync() if errSync != nil { log.Warn().Err(errSync).Msg("unable to sync db") } // time.Sleep(time.Second * 5) println("closing db") errClose = db.Close() if errClose != nil { log.Warn().Err(errClose).Msg("unable to close db") } }() app := setupCLI(ctx, client, maxBatch) return app.Run(os.Args) } func setupCLI(ctx context.Context, r storage.Repository, maxBatch int64) *cli.App { app := cli.NewApp() app.Commands = cli.Commands{ newImportCmd(ctx, r, maxBatch), newViewCmd(ctx, r), } return app } func newImportCmd(ctx context.Context, r storage.Repository, maxBatch int64) cli.Command { return cli.Command{ Name: "import", Usage: "category for importing data from sources", Flags: []cli.Flag{ // &cli.StringFlag{ // Name: "config", // Usage: "path to config", // Value: "config.json", // TakesFile: true, // }, &cli.BoolFlag{ Name: "verbose", Usage: "set logger to debug mode", }, }, Before: func(c *cli.Context) error { if c.Bool("verbose") { zerolog.SetGlobalLevel(zerolog.DebugLevel) } return nil }, Subcommands: cli.Commands{ newImportFromFileCmd(ctx, r, maxBatch), }, } } func newViewCmd(ctx context.Context, r storage.Repository) cli.Command { return cli.Command{ Name: "view", Usage: "Set of commands to view the data inside db", Subcommands: []cli.Command{ newViewCategoriesCmd(ctx, r), newViewItemsCmd(ctx, r), }, } } func newViewCategoriesCmd(ctx context.Context, r storage.Repository) cli.Command { return cli.Command{ Name: "categories", Usage: "Set of commands to work with categories", Subcommands: []cli.Command{ newViewCategoriesListCmd(ctx, r), }, } } func newViewItemsCmd(ctx context.Context, r storage.Repository) cli.Command { return cli.Command{ Name: "items", Usage: "Set of command to work with items", Subcommands: cli.Commands{ newViewItemsCountCmd(ctx, r), }, } } func newViewItemsCountCmd(ctx context.Context, r storage.Repository) cli.Command { return cli.Command{ Name: "count", Usage: "iterates over collection and counts number of items", Action: viewItemsCount(ctx, r), } } func newViewCategoriesListCmd(ctx context.Context, r storage.Repository) cli.Command { return cli.Command{ Name: "list", Usage: "lists stored categories stored in database", Flags: []cli.Flag{ &cli.IntFlag{ Name: "limit", Usage: "limits output to selected items", Value: 20, }, &cli.IntFlag{ Name: "page", Usage: "in case of limit, selects page", Value: 0, }, &cli.BoolFlag{ Name: "with-total", Usage: "prints total count of categories", }, }, Action: viewCategoriesListAction(ctx, r), } } func viewItemsCount(ctx context.Context, r storage.Repository) cli.ActionFunc { f := func(c *cli.Context) error { // itemChan, err := r.GoodsItem().ListIter(ctx, 10) // if err != nil { // return fmt.Errorf("getting list iter: %w", err) // } // var count int // for range itemChan { // count++ // } // items, err := r.GoodsItem().List(ctx) if err != nil { return fmt.Errorf("getting list: %w", err) } zerolog.Ctx(ctx).Info().Int("count", count).Int("list_count", len(items)).Msg("read all items") return nil } return wrapActionFunc(ctx, f) } func viewCategoriesListAction(ctx context.Context, r storage.Repository) cli.ActionFunc { f := func(c *cli.Context) error { categories, err := r.Category().List(ctx) if err != nil { return fmt.Errorf("listing categories: %w", err) } limit := c.Int("limit") page := c.Int("page") total := len(categories) if page == 0 { page = 1 } if limit > 0 { offset := (page - 1) * limit if offset > len(categories) { offset = len(categories) - 1 } limit = offset + limit if limit > len(categories) { limit = len(categories) } categories = categories[offset:limit] } tbl := table.New("ID", "Name") for _, category := range categories { if category.ID == 0 && category.Name == "" { continue } tbl.AddRow(category.ID, category.Name) } tbl.Print() if c.Bool("with-total") { zerolog.Ctx(ctx).Info().Int("count", total).Msg("total categories stats") } return nil } return wrapActionFunc(ctx, f) } func newImportFromFileCmd(ctx context.Context, r storage.Repository, maxBatch int64) cli.Command { return cli.Command{ Name: "fromfile", Usage: "imports from file into db", Flags: []cli.Flag{ &cli.StringFlag{ Name: "src", Value: "", Usage: "source of the data.", Required: true, }, }, Action: handleConvert(ctx, r, maxBatch), } } func handleConvert(ctx context.Context, r storage.Repository, maxBatch int64) cli.ActionFunc { f := func(c *cli.Context) error { filesrc := c.String("src") log := zerolog.Ctx(ctx) log.Debug().Str("filepath", filesrc).Msg("importing data from file") productsFile, err := os.Open(filesrc) if err != nil { return fmt.Errorf("opening file: %w", err) } defer func() { errClose := productsFile.Close() if errClose != nil { log.Warn().Err(errClose).Msg("unable to close file") } }() var ( goodsItem entity.GoodsItem // goodsItems []entity.GoodsItem failedToInsert int ) seenCategories := make(map[string]struct{}) categories, err := r.Category().List(ctx) if err != nil { return fmt.Errorf("listing categories: %w", err) } for _, category := range categories { seenCategories[category.Name] = struct{}{} } bfile := bufio.NewReader(productsFile) for { line, _, err := bfile.ReadLine() if err != nil { if errors.Is(err, io.EOF) { break } return fmt.Errorf("reading line: %w", err) } err = json.Unmarshal(line, &goodsItem) if err != nil { // log.Warn().Err(err).Str("line", string(line)).Msg("unable to unmarshal line into item") failedToInsert++ continue } _, err = r.GoodsItem().UpsertMany(ctx, goodsItem) if err != nil { return fmt.Errorf("unable to upsert new item: %w", err) } // goodsItems = append(goodsItems, goodsItem) if _, ok := seenCategories[goodsItem.Type]; ok { continue } _, err = r.Category().Create(ctx, goodsItem.Type) if err != nil { return fmt.Errorf("unable to create new category: %w", err) } log.Debug().Any("category", goodsItem.Type).Msg("inserted new category") seenCategories[goodsItem.Type] = struct{}{} } // log.Debug().Int("count", len(goodsItems)).Int("failed", failedToInsert).Msg("preparing to upload") // // start := time.Now() // batchSize := int(maxBatch) // for i := 0; i < len(goodsItems); i += batchSize { // to := i + batchSize // if to > len(goodsItems) { // to = len(goodsItems) // } // // _, err = r.GoodsItem().UpsertMany(ctx, goodsItems[i:to]...) // if err != nil { // return fmt.Errorf("upserting items: %w", err) // } // log.Debug().Int("count", to-i).Msg("inserted batch") // time.Sleep(time.Second) // } // log.Debug().Dur("elapsed", time.Since(start)).Msg("upload finished") // time.Sleep(time.Second * 30) return nil } time.Sleep(time.Second * 10) return wrapActionFunc(ctx, f) } func wrapActionFunc(ctx context.Context, next cli.ActionFunc) cli.ActionFunc { return func(c *cli.Context) error { var data [3]byte _, _ = rand.Read(data[:]) reqid := hex.EncodeToString(data[:]) log := zerolog.Ctx(ctx).With().Str("reqid", reqid).Logger() ctx = log.WithContext(ctx) start := time.Now() defer func() { log.Info().Dur("elapsed", time.Since(start)).Msg("command completed") }() log.Info().Msg("command execution started") return next(c) } }