add workers pool

This commit is contained in:
Aleksandr Trushkin
2024-02-05 11:05:37 +03:00
parent ea9126e005
commit 58ee1821f6
2 changed files with 84 additions and 15 deletions

View File

@ -7,4 +7,5 @@ type Eway struct {
SessionUser string `toml:"session_user"`
OwnerID string `toml:"owner_id"`
Debug bool `toml:"debug"`
WorkersPool int `toml:"workers_pool"`
}

View File

@ -11,6 +11,7 @@ import (
"reflect"
"strconv"
"strings"
"sync"
"time"
"git.loyso.art/frx/eway/internal/config"
@ -35,9 +36,12 @@ type client struct {
http *resty.Client
log zerolog.Logger
htmlParseSema chan struct{}
releaseSemaDelay time.Duration
ownerID string
htmlParseSema chan struct{}
releaseSemaDelay time.Duration
getProductInfoBus chan getProductInfoRequest
ownerID string
workersPool int
workerswg *sync.WaitGroup
}
type Config config.Eway
@ -48,10 +52,20 @@ func New(ctx context.Context, cfg Config, log zerolog.Logger) (*client, error) {
SetBaseURL("https://eway.elevel.ru/api")
c := client{
http: httpclient,
log: log.With().Str("client", "eway").Logger(),
htmlParseSema: make(chan struct{}, 2),
releaseSemaDelay: time.Second / 2,
http: httpclient,
log: log.With().Str("client", "eway").Logger(),
htmlParseSema: make(chan struct{}, 2),
getProductInfoBus: make(chan getProductInfoRequest),
releaseSemaDelay: time.Second / 2,
workerswg: &sync.WaitGroup{},
}
for i := 0; i < cfg.WorkersPool; i++ {
c.workerswg.Add(1)
go func() {
defer c.workerswg.Done()
c.productInfoWorker(ctx, c.getProductInfoBus)
}()
}
if cfg.SessionID == "" || cfg.SessionUser == "" {
@ -287,18 +301,25 @@ type parameterSelector struct {
}
func (c *client) GetProductInfo(ctx context.Context, cart int64) (pi entity.GoodsItemInfo, err error) {
if c.workersPool == 0 {
return c.getProductInfo(ctx, cart)
}
responseBus := make(chan taskResult[entity.GoodsItemInfo], 1)
c.getProductInfoBus <- getProductInfoRequest{
cartID: cart,
response: responseBus,
}
select {
case c.htmlParseSema <- struct{}{}:
defer func() {
go func() {
time.Sleep(c.releaseSemaDelay)
<-c.htmlParseSema
}()
}()
case response := <-responseBus:
return response.value, response.err
case <-ctx.Done():
return pi, ctx.Err()
}
}
func (c *client) getProductInfo(ctx context.Context, cart int64) (pi entity.GoodsItemInfo, err error) {
collector := colly.NewCollector(
colly.AllowedDomains("eway.elevel.ru"),
colly.AllowURLRevisit(),
@ -309,7 +330,10 @@ func (c *client) GetProductInfo(ctx context.Context, cart int64) (pi entity.Good
start := time.Now()
defer func() {
elapsed := time.Since(start).Seconds()
c.log.Info().Float64("elapsed", elapsed).Msg("request processed")
c.log.Info().
Float64("elapsed", elapsed).
Int64("cart", cart).
Msg("request processed")
}()
collector.OnHTML("body > div.page-container > div.page-content > div.content-wrapper > div.content > div.row > div.col-md-4 > div > div > div:nth-child(6)", func(e *colly.HTMLElement) {
@ -361,3 +385,47 @@ func (c *client) GetProductInfo(ctx context.Context, cart int64) (pi entity.Good
return pi, nil
}
type getProductInfoRequest struct {
cartID int64
response chan taskResult[entity.GoodsItemInfo]
}
type taskResult[T any] struct {
value T
err error
}
func (c *client) productInfoWorker(
ctx context.Context,
in <-chan getProductInfoRequest,
) {
var req getProductInfoRequest
var ok bool
for {
select {
case <-ctx.Done():
return
case req, ok = <-in:
if !ok {
return
}
}
func() {
c.htmlParseSema <- struct{}{}
defer func() {
go func() {
time.Sleep(c.releaseSemaDelay)
<-c.htmlParseSema
}()
}()
pi, err := c.getProductInfo(ctx, req.cartID)
req.response <- taskResult[entity.GoodsItemInfo]{
value: pi,
err: err,
}
}()
}
}