add workers pool

This commit is contained in:
Aleksandr Trushkin
2024-02-05 11:05:37 +03:00
parent 362d4524e3
commit 096c7e8157
3 changed files with 85 additions and 15 deletions

View File

@ -13,3 +13,4 @@ _session_id = "19b98ed56cc144f47e040e68dbcd8481"
_session_user = "1490" _session_user = "1490"
owner_id = "26476" owner_id = "26476"
debug = false debug = false
workers_pool = 3

View File

@ -7,4 +7,5 @@ type Eway struct {
SessionUser string `toml:"session_user"` SessionUser string `toml:"session_user"`
OwnerID string `toml:"owner_id"` OwnerID string `toml:"owner_id"`
Debug bool `toml:"debug"` Debug bool `toml:"debug"`
WorkersPool int `toml:"workers_pool"`
} }

View File

@ -11,6 +11,7 @@ import (
"reflect" "reflect"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"git.loyso.art/frx/eway/internal/config" "git.loyso.art/frx/eway/internal/config"
@ -37,7 +38,10 @@ type client struct {
htmlParseSema chan struct{} htmlParseSema chan struct{}
releaseSemaDelay time.Duration releaseSemaDelay time.Duration
getProductInfoBus chan getProductInfoRequest
ownerID string ownerID string
workersPool int
workerswg *sync.WaitGroup
} }
type Config config.Eway type Config config.Eway
@ -51,7 +55,17 @@ func New(ctx context.Context, cfg Config, log zerolog.Logger) (*client, error) {
http: httpclient, http: httpclient,
log: log.With().Str("client", "eway").Logger(), log: log.With().Str("client", "eway").Logger(),
htmlParseSema: make(chan struct{}, 2), htmlParseSema: make(chan struct{}, 2),
getProductInfoBus: make(chan getProductInfoRequest),
releaseSemaDelay: time.Second / 2, releaseSemaDelay: time.Second / 2,
workerswg: &sync.WaitGroup{},
}
for i := 0; i < cfg.WorkersPool; i++ {
c.workerswg.Add(1)
go func() {
defer c.workerswg.Done()
c.productInfoWorker(ctx, c.getProductInfoBus)
}()
} }
if cfg.SessionID == "" || cfg.SessionUser == "" { if cfg.SessionID == "" || cfg.SessionUser == "" {
@ -287,18 +301,25 @@ type parameterSelector struct {
} }
func (c *client) GetProductInfo(ctx context.Context, cart int64) (pi entity.GoodsItemInfo, err error) { func (c *client) GetProductInfo(ctx context.Context, cart int64) (pi entity.GoodsItemInfo, err error) {
if c.workersPool == 0 {
return c.getProductInfo(ctx, cart)
}
responseBus := make(chan taskResult[entity.GoodsItemInfo], 1)
c.getProductInfoBus <- getProductInfoRequest{
cartID: cart,
response: responseBus,
}
select { select {
case c.htmlParseSema <- struct{}{}: case response := <-responseBus:
defer func() { return response.value, response.err
go func() {
time.Sleep(c.releaseSemaDelay)
<-c.htmlParseSema
}()
}()
case <-ctx.Done(): case <-ctx.Done():
return pi, ctx.Err() return pi, ctx.Err()
} }
}
func (c *client) getProductInfo(ctx context.Context, cart int64) (pi entity.GoodsItemInfo, err error) {
collector := colly.NewCollector( collector := colly.NewCollector(
colly.AllowedDomains("eway.elevel.ru"), colly.AllowedDomains("eway.elevel.ru"),
colly.AllowURLRevisit(), colly.AllowURLRevisit(),
@ -309,7 +330,10 @@ func (c *client) GetProductInfo(ctx context.Context, cart int64) (pi entity.Good
start := time.Now() start := time.Now()
defer func() { defer func() {
elapsed := time.Since(start).Seconds() elapsed := time.Since(start).Seconds()
c.log.Info().Float64("elapsed", elapsed).Msg("request processed") c.log.Info().
Float64("elapsed", elapsed).
Int64("cart", cart).
Msg("request processed")
}() }()
collector.OnHTML("body > div.page-container > div.page-content > div.content-wrapper > div.content > div.row > div.col-md-4 > div > div > div:nth-child(6)", func(e *colly.HTMLElement) { collector.OnHTML("body > div.page-container > div.page-content > div.content-wrapper > div.content > div.row > div.col-md-4 > div > div > div:nth-child(6)", func(e *colly.HTMLElement) {
@ -361,3 +385,47 @@ func (c *client) GetProductInfo(ctx context.Context, cart int64) (pi entity.Good
return pi, nil return pi, nil
} }
type getProductInfoRequest struct {
cartID int64
response chan taskResult[entity.GoodsItemInfo]
}
type taskResult[T any] struct {
value T
err error
}
func (c *client) productInfoWorker(
ctx context.Context,
in <-chan getProductInfoRequest,
) {
var req getProductInfoRequest
var ok bool
for {
select {
case <-ctx.Done():
return
case req, ok = <-in:
if !ok {
return
}
}
func() {
c.htmlParseSema <- struct{}{}
defer func() {
go func() {
time.Sleep(c.releaseSemaDelay)
<-c.htmlParseSema
}()
}()
pi, err := c.getProductInfo(ctx, req.cartID)
req.response <- taskResult[entity.GoodsItemInfo]{
value: pi,
err: err,
}
}()
}
}