list by pages and rate req limits

This commit is contained in:
Gitea
2023-12-15 13:00:59 +03:00
parent 2f3a6e35e9
commit 06c7997491
10 changed files with 204 additions and 116 deletions

View File

@ -32,6 +32,11 @@ tasks:
cmds: cmds:
- go build -o $GOBIN/sravnicli -v -ldflags '{{.LDFLAGS}}' cmd/dev/sravnicli/*.go - go build -o $GOBIN/sravnicli -v -ldflags '{{.LDFLAGS}}' cmd/dev/sravnicli/*.go
deps: [check, test] deps: [check, test]
build:
cmds:
- task: build_dev_cli
- task: build_background
run: run:
deps: [build] deps: [build]
cmds: cmds:

View File

@ -8,6 +8,7 @@ import (
"log/slog" "log/slog"
"strconv" "strconv"
"strings" "strings"
"time"
"git.loyso.art/frx/kurious/internal/common/errors" "git.loyso.art/frx/kurious/internal/common/errors"
"git.loyso.art/frx/kurious/pkg/slices" "git.loyso.art/frx/kurious/pkg/slices"
@ -16,6 +17,7 @@ import (
"github.com/go-resty/resty/v2" "github.com/go-resty/resty/v2"
"golang.org/x/net/html" "golang.org/x/net/html"
"golang.org/x/net/html/atom" "golang.org/x/net/html/atom"
"golang.org/x/time/rate"
) )
const ( const (
@ -38,7 +40,8 @@ type Client interface {
func NewClient(ctx context.Context, log *slog.Logger, debug bool) (c *client, err error) { func NewClient(ctx context.Context, log *slog.Logger, debug bool) (c *client, err error) {
c = &client{ c = &client{
log: log.With(slog.String("client", "sravni")), log: log.With(slog.String("client", "sravni")),
limiter: rate.NewLimiter(rate.Every(time.Millisecond*100), 1),
http: resty.New(). http: resty.New().
SetBaseURL(baseURL). SetBaseURL(baseURL).
SetDebug(debug), SetDebug(debug),
@ -71,6 +74,8 @@ type client struct {
cachedMainPageInfo *PageState cachedMainPageInfo *PageState
validLearningTypes querySet validLearningTypes querySet
validCourseThematics querySet validCourseThematics querySet
limiter *rate.Limiter
} }
func (c *client) GetMainPageState() (*PageState, error) { func (c *client) GetMainPageState() (*PageState, error) {
@ -146,24 +151,24 @@ type listEducationProductsRequest struct {
// Filters // Filters
LearningType []string `json:"learningtype"` LearningType []string `json:"learningtype"`
CoursesThematics []string `json:"coursesThematics"` CoursesThematics []string `json:"coursesThematics"`
Organizations []string `json:"organizations"` // list of ids Organizations []string `json:"organizations,omitempty"` // list of ids
DictionatyFormatFilterNew []FilterFormat `json:"dictionaryFormatFilterNew"` DictionatyFormatFilterNew []FilterFormat `json:"dictionaryFormatFilterNew,omitempty"`
DictionaryTimeFilter []FilterTime `json:"dictionaryTimeFilter"` DictionaryTimeFilter []FilterTime `json:"dictionaryTimeFilter,omitempty"`
DictionaryGraphicFilterNew []FilterGraphic `json:"dictionaryGraphicFilterNew"` DictionaryGraphicFilterNew []FilterGraphic `json:"dictionaryGraphicFilterNew,omitempty"`
DictionatyLevelFilterNew []FilterLevel `json:"dictionaryLevelFilterNew"` DictionatyLevelFilterNew []FilterLevel `json:"dictionaryLevelFilterNew,omitempty"`
// Options // Options
SubMentor []stringifiedBool `json:"sub-mentor"` // option with mentor SubMentor []stringifiedBool `json:"sub-mentor,omitempty"` // option with mentor
SubTimeFree []stringifiedBool `json:"sub-timeFree"` // option with trial SubTimeFree []stringifiedBool `json:"sub-timeFree,omitempty"` // option with trial
SubJobGarantSub []stringifiedBool `json:"sub-jobGarantsub"` // option for job garantee SubJobGarantSub []stringifiedBool `json:"sub-jobGarantsub,omitempty"` // option for job garantee
SubPriceFree []stringifiedBool `json:"sub-priceFree"` // only free SubPriceFree []stringifiedBool `json:"sub-priceFree,omitempty"` // only free
SubInstallment []stringifiedBool `json:"sub-installment"` // with credit SubInstallment []stringifiedBool `json:"sub-installment,omitempty"` // with credit
SubIsCourseProfession []stringifiedBool `json:"sub-isCourseProfession"` // освоить профессию с нуля SubIsCourseProfession []stringifiedBool `json:"sub-isCourseProfession,omitempty"` // освоить профессию с нуля
DevelopSkills []stringifiedBool `json:"developSkills"` // развить навыки DevelopSkills []stringifiedBool `json:"developSkills,omitempty"` // развить навыки
NotSubIsWebinar string `json:"not-sub-isWebinar"` NotSubIsWebinar string `json:"not-sub-isWebinar,omitempty"`
NotB2B string `json:"not-b2b"` NotB2B string `json:"not-b2b,omitempty"`
AdvertisingOnly bool `json:"advertisingOnly"` AdvertisingOnly bool `json:"advertisingOnly,omitempty"`
// Pagination and sorting // Pagination and sorting
Limit int `json:"limit"` Limit int `json:"limit"`
@ -223,6 +228,10 @@ func (c *client) ListEducationalProducts(
Offset: params.Offset, Offset: params.Offset,
} }
if err = c.limiter.Wait(ctx); err != nil {
return result, fmt.Errorf("waiting for limit: %w", err)
}
resp, err := c.http.R(). resp, err := c.http.R().
SetBody(reqParams). SetBody(reqParams).
SetResult(&result). SetResult(&result).
@ -304,6 +313,10 @@ func (c *client) ListEducationalProductsFilterCount(
}, },
} }
if err = c.limiter.Wait(ctx); err != nil {
return result, fmt.Errorf("waiting for limit: %w", err)
}
var respData DataContainer[ProductsFilterCount] var respData DataContainer[ProductsFilterCount]
resp, err := c.http.R(). resp, err := c.http.R().
SetBody(reqParams). SetBody(reqParams).

View File

@ -223,6 +223,7 @@ type Course struct {
Discount CourseDiscount `json:"discount"` Discount CourseDiscount `json:"discount"`
Link string `json:"link"` Link string `json:"link"`
Learningtype []string `json:"learningtype"` Learningtype []string `json:"learningtype"`
CourseThematics []string `json:"courseThematics"`
DateStart *time.Time `json:"dateStart"` DateStart *time.Time `json:"dateStart"`
TimeStart *time.Time `json:"timeStart"` TimeStart *time.Time `json:"timeStart"`
TimeAllHour *float64 `json:"timeAllHour"` TimeAllHour *float64 `json:"timeAllHour"`

View File

@ -34,6 +34,10 @@ func LogError(ctx context.Context, log *slog.Logger, msg string, attrs ...slog.A
log.LogAttrs(ctx, slog.LevelError, msg, append(attrs, getLogFields(ctx)...)...) log.LogAttrs(ctx, slog.LevelError, msg, append(attrs, getLogFields(ctx)...)...)
} }
func LogWithWarnError(ctx context.Context, log *slog.Logger, err error, msg string, attrs ...slog.Attr) {
LogWarn(ctx, log, msg, append(attrs, slog.Any("err", err))...)
}
func LogWithError(ctx context.Context, log *slog.Logger, err error, msg string, attrs ...slog.Attr) { func LogWithError(ctx context.Context, log *slog.Logger, err error, msg string, attrs ...slog.Attr) {
LogError(ctx, log, msg, append(attrs, slog.Any("err", err))...) LogError(ctx, log, msg, append(attrs, slog.Any("err", err))...)
} }

View File

@ -0,0 +1,15 @@
package xcontext
import (
"context"
"time"
)
func Wait(ctx context.Context, wait time.Duration) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(wait):
return nil
}
}

View File

@ -80,7 +80,36 @@ type ydbCourseRepository struct {
} }
func (r *ydbCourseRepository) List(ctx context.Context, params domain.ListCoursesParams) (courses []domain.Course, err error) { func (r *ydbCourseRepository) List(ctx context.Context, params domain.ListCoursesParams) (courses []domain.Course, err error) {
const limit = 1000
const queryName = "list" const queryName = "list"
const query = `
DECLARE $limit AS Int32;
DECLARE $id AS Text;
SELECT
id,
external_id,
source_type,
source_name,
course_thematic,
learning_type,
organization_id,
origin_link,
image_link,
name,
description,
full_price,
discount,
duration,
starts_at,
created_at,
updated_at,
deleted_at
FROM
courses
WHERE
id > $id
ORDER BY id
LIMIT $limit;`
courses = make([]domain.Course, 0, 4_000) courses = make([]domain.Course, 0, 4_000)
readTx := table.TxControl( readTx := table.TxControl(
@ -103,45 +132,38 @@ func (r *ydbCourseRepository) List(ctx context.Context, params domain.ListCourse
) )
}() }()
_, res, err := s.Execute( var lastKnownID string
ctx, for {
readTx, queryParams := table.NewQueryParameters(
`SELECT table.ValueParam("$limit", types.Int32Value(limit)),
id, table.ValueParam("$id", types.TextValue(lastKnownID)),
external_id, )
source_type, _, res, err := s.Execute(
source_name, ctx, readTx, query, queryParams,
organization_id, options.WithCollectStatsModeBasic(),
origin_link, )
image_link, if err != nil {
name, return fmt.Errorf("executing: %w", err)
description, }
full_price,
discount, if !res.NextResultSet(ctx) || !res.HasNextRow() {
duration, break
starts_at, }
created_at,
updated_at,
deleted_at
FROM
courses
`,
table.NewQueryParameters(),
options.WithCollectStatsModeBasic(),
)
if err != nil {
return fmt.Errorf("executing: %w", err)
}
for res.NextResultSet(ctx) {
for res.NextRow() { for res.NextRow() {
var cdb courseDB var cdb courseDB
_ = res.ScanNamed(cdb.getNamedValues()...) err = res.ScanNamed(cdb.getNamedValues()...)
if err != nil {
return fmt.Errorf("scanning row: %w", err)
}
courses = append(courses, mapCourseDB(cdb)) courses = append(courses, mapCourseDB(cdb))
} }
} if err = res.Err(); err != nil {
if err = res.Err(); err != nil { return err
return err }
lastKnownID = courses[len(courses)-1].ID
} }
return nil return nil
}, },
@ -187,6 +209,8 @@ func (r *ydbCourseRepository) Get(ctx context.Context, id string) (course domain
external_id, external_id,
source_type, source_type,
source_name, source_name,
course_thematic,
learning_type,
organization_id, organization_id,
origin_link, origin_link,
image_link, image_link,
@ -249,6 +273,8 @@ func createCourseParamsAsStruct(params domain.CreateCourseParams) types.Value {
types.StructFieldValue("name", types.TextValue(params.Name)), types.StructFieldValue("name", types.TextValue(params.Name)),
types.StructFieldValue("source_type", types.TextValue(st)), types.StructFieldValue("source_type", types.TextValue(st)),
types.StructFieldValue("source_name", types.NullableTextValue(params.SourceName.ValutPtr())), types.StructFieldValue("source_name", types.NullableTextValue(params.SourceName.ValutPtr())),
types.StructFieldValue("course_thematic", types.TextValue(params.CourseThematic)),
types.StructFieldValue("learning_type", types.TextValue(params.LearningType)),
types.StructFieldValue("external_id", types.NullableTextValue(params.ExternalID.ValutPtr())), types.StructFieldValue("external_id", types.NullableTextValue(params.ExternalID.ValutPtr())),
types.StructFieldValue("organization_id", types.TextValue(params.OrganizationID)), types.StructFieldValue("organization_id", types.TextValue(params.OrganizationID)),
types.StructFieldValue("origin_link", types.TextValue(params.OriginLink)), types.StructFieldValue("origin_link", types.TextValue(params.OriginLink)),
@ -272,6 +298,8 @@ func (r *ydbCourseRepository) CreateBatch(ctx context.Context, params ...domain.
name: Text, name: Text,
source_type: Text, source_type: Text,
source_name: Optional<Text>, source_name: Optional<Text>,
course_thematic: Text,
learning_type: Text,
organization_id: Text, organization_id: Text,
origin_link: Text, origin_link: Text,
image_link: Text, image_link: Text,
@ -292,6 +320,8 @@ func (r *ydbCourseRepository) CreateBatch(ctx context.Context, params ...domain.
name, name,
source_type, source_type,
source_name, source_name,
course_thematic,
learning_type,
organization_id, organization_id,
origin_link, origin_link,
image_link, image_link,
@ -328,65 +358,12 @@ func (r *ydbCourseRepository) CreateBatch(ctx context.Context, params ...domain.
} }
func (r *ydbCourseRepository) Create(ctx context.Context, params domain.CreateCourseParams) (domain.Course, error) { func (r *ydbCourseRepository) Create(ctx context.Context, params domain.CreateCourseParams) (domain.Course, error) {
// -- PRAGMA TablePathPrefix("courses"); err := r.CreateBatch(ctx, params)
const upsertQuery = `DECLARE $courseData AS List<Struct< if err != nil {
id: Text, return domain.Course{}, err
external_id: Optional<Text>, }
name: Text,
source_type: Text,
source_name: Optional<Text>,
organization_id: Text,
origin_link: Text,
image_link: Text,
description: Text,
full_price: Double,
discount: Double,
duration: Interval,
starts_at: Datetime,
created_at: Datetime,
updated_at: Datetime,
deleted_at: Optional<Datetime>>>;
REPLACE INTO return domain.Course{}, nil
courses
SELECT
id,
external_id,
name,
source_type,
source_name,
organization_id,
origin_link,
image_link,
description,
full_price,
discount,
duration,
starts_at,
created_at,
updated_at,
deleted_at
FROM AS_TABLE($courseData);`
writeTx := table.TxControl(
table.BeginTx(
table.WithSerializableReadWrite(),
),
table.CommitTx(),
)
err := r.db.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
queryParams := table.NewQueryParameters(
table.ValueParam("$courseData", types.ListValue(createCourseParamsAsStruct(params))),
)
_, _, err := s.Execute(ctx, writeTx, upsertQuery, queryParams)
if err != nil {
return fmt.Errorf("executing query: %w", err)
}
return nil
})
return domain.Course{}, err
} }
func (r *ydbCourseRepository) Delete(ctx context.Context, id string) error { func (r *ydbCourseRepository) Delete(ctx context.Context, id string) error {
@ -403,6 +380,8 @@ func (r *ydbCourseRepository) CreateCourseTable(ctx context.Context) error {
options.WithColumn("name", types.TypeText), options.WithColumn("name", types.TypeText),
options.WithColumn("source_type", types.TypeText), options.WithColumn("source_type", types.TypeText),
options.WithColumn("source_name", types.Optional(types.TypeText)), options.WithColumn("source_name", types.Optional(types.TypeText)),
options.WithColumn("course_thematic", types.TypeText),
options.WithColumn("learning_type", types.TypeText),
options.WithColumn("organization_id", types.TypeText), options.WithColumn("organization_id", types.TypeText),
options.WithColumn("origin_link", types.TypeText), options.WithColumn("origin_link", types.TypeText),
options.WithColumn("image_link", types.TypeText), options.WithColumn("image_link", types.TypeText),
@ -425,6 +404,8 @@ type courseDB struct {
Name string Name string
SourceType string SourceType string
SourceName *string SourceName *string
CourseThematic string
LearningType string
OrganizationID string OrganizationID string
OriginLink string OriginLink string
ImageLink string ImageLink string
@ -445,6 +426,8 @@ func (c *courseDB) getNamedValues() []named.Value {
named.Optional("external_id", &c.ExternalID), named.Optional("external_id", &c.ExternalID),
named.Required("source_type", &c.SourceType), named.Required("source_type", &c.SourceType),
named.Optional("source_name", &c.SourceName), named.Optional("source_name", &c.SourceName),
named.Required("course_thematic", &c.CourseThematic),
named.Required("learning_type", &c.LearningType),
named.Required("organization_id", &c.OrganizationID), named.Required("organization_id", &c.OrganizationID),
named.Required("origin_link", &c.OriginLink), named.Required("origin_link", &c.OriginLink),
named.Required("image_link", &c.ImageLink), named.Required("image_link", &c.ImageLink),
@ -501,6 +484,8 @@ func mapCourseDB(cdb courseDB) domain.Course {
Name: cdb.Name, Name: cdb.Name,
SourceType: st, SourceType: st,
SourceName: nullable.NewValuePtr(cdb.SourceName), SourceName: nullable.NewValuePtr(cdb.SourceName),
Thematic: cdb.CourseThematic,
LearningType: cdb.LearningType,
OrganizationID: cdb.OrganizationID, OrganizationID: cdb.OrganizationID,
OriginLink: cdb.OriginLink, OriginLink: cdb.OriginLink,
ImageLink: cdb.ImageLink, ImageLink: cdb.ImageLink,

View File

@ -18,6 +18,8 @@ type CreateCourse struct {
Name string Name string
SourceType domain.SourceType SourceType domain.SourceType
SourceName nullable.Value[string] SourceName nullable.Value[string]
CourseThematic string
LearningType string
OrganizationID string OrganizationID string
OriginLink string OriginLink string
ImageLink string ImageLink string

View File

@ -19,6 +19,8 @@ type CreateCourseParams struct {
Name string Name string
SourceType SourceType SourceType SourceType
SourceName nullable.Value[string] SourceName nullable.Value[string]
CourseThematic string
LearningType string
OrganizationID string OrganizationID string
OriginLink string OriginLink string
ImageLink string ImageLink string

View File

@ -108,10 +108,11 @@ type handler interface {
func (bp *BackgroundProcess) registerHandler(ctx context.Context, spec, name string, h handler) (nullable.Value[cron.EntryID], error) { func (bp *BackgroundProcess) registerHandler(ctx context.Context, spec, name string, h handler) (nullable.Value[cron.EntryID], error) {
handlerField := slog.String("handler", name) handlerField := slog.String("handler", name)
xcontext.LogInfo(ctx, bp.log, "registering handler", handlerField) jctx := xcontext.WithLogFields(ctx, handlerField)
xcontext.LogInfo(jctx, bp.log, "registering handler", handlerField)
entry, err := bp.scheduler.AddJob(spec, cron.FuncJob(func() { entry, err := bp.scheduler.AddJob(spec, cron.FuncJob(func() {
jctx := xcontext.WithLogFields(ctx, handlerField)
err := h.Handle(jctx) err := h.Handle(jctx)
if err != nil { if err != nil {
xcontext.LogWithError(jctx, bp.log, err, "unable to run iteration") xcontext.LogWithError(jctx, bp.log, err, "unable to run iteration")

View File

@ -2,8 +2,10 @@ package background
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"log/slog" "log/slog"
"sync/atomic"
"time" "time"
"golang.org/x/time/rate" "golang.org/x/time/rate"
@ -33,9 +35,17 @@ type syncSravniHandler struct {
log *slog.Logger log *slog.Logger
knownExternalIDs map[string]struct{} knownExternalIDs map[string]struct{}
isRunning uint32
} }
func (h *syncSravniHandler) Handle(ctx context.Context) (err error) { func (h *syncSravniHandler) Handle(ctx context.Context) (err error) {
if !atomic.CompareAndSwapUint32(&h.isRunning, 0, 1) {
return nil
}
defer func() {
atomic.StoreUint32(&h.isRunning, 0)
}()
iterationID := generator.RandomInt64ID() iterationID := generator.RandomInt64ID()
ctx = xcontext.WithLogFields(ctx, slog.String("iteration_id", iterationID)) ctx = xcontext.WithLogFields(ctx, slog.String("iteration_id", iterationID))
start := time.Now() start := time.Now()
@ -72,6 +82,7 @@ func (h *syncSravniHandler) Handle(ctx context.Context) (err error) {
learningTypes := state.Props.InitialReduxState.Dictionaries.Data.LearningType learningTypes := state.Props.InitialReduxState.Dictionaries.Data.LearningType
courses := make([]sravni.Course, 0, 1024) courses := make([]sravni.Course, 0, 1024)
buffer := make([]sravni.Course, 0, 512)
for _, learningType := range learningTypes.Fields { for _, learningType := range learningTypes.Fields {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -79,14 +90,49 @@ func (h *syncSravniHandler) Handle(ctx context.Context) (err error) {
default: default:
} }
lctx := xcontext.WithLogFields(ctx, slog.String("learning_type", learningType.Name)) lctx := xcontext.WithLogFields(ctx, slog.String("learning_type", learningType.Value))
xcontext.LogInfo(lctx, h.log, "parsing course", slog.String("name", learningType.Name)) xcontext.LogInfo(lctx, h.log, "parsing courses")
start := time.Now() start := time.Now()
courses = courses[:0] courses = courses[:0]
courses, err = h.loadEducationalProducts(lctx, learningType.Value, courses) filterCount, err := h.client.ListEducationalProductsFilterCount(ctx, sravni.ListEducationProductsParams{
LearningType: learningType.Value,
})
if err != nil { if err != nil {
return fmt.Errorf("loading educational products: %w", err) return fmt.Errorf("loading products filter count: %w", err)
}
thematics := make([]string, 0, len(filterCount.CoursesThematics))
for cr, count := range filterCount.CoursesThematics {
if count == 0 {
continue
}
thematics = append(thematics, cr)
}
xcontext.LogDebug(lctx, h.log, "loaded course thematics for learning type", slog.Int("count", len(thematics)))
// since count is known it might be optimized to allocate slice once per request.
for _, courseThematic := range thematics {
buffer = buffer[:0]
buffer, err = h.loadEducationalProducts(lctx, learningType.Value, courseThematic, buffer)
if err != nil {
if !errors.Is(err, context.Canceled) {
xcontext.LogWithWarnError(lctx, h.log, err, "unable to load educational products", slog.Int("count", len(thematics)))
continue
}
return fmt.Errorf("loading educational products: %w", err)
}
xslice.ForEach(buffer, func(c sravni.Course) {
c.Learningtype = []string{learningType.Value}
c.CourseThematics = []string{courseThematic}
courses = append(courses, c)
})
xcontext.LogInfo(lctx, h.log, "parsed subitems", slog.String("course_thematic", courseThematic), slog.Int("amount", len(buffer)))
} }
elapsed := time.Since(start) elapsed := time.Since(start)
@ -123,7 +169,7 @@ func (h *syncSravniHandler) Handle(ctx context.Context) (err error) {
return nil return nil
} }
func (h *syncSravniHandler) loadEducationalProducts(ctx context.Context, learningType string, buf []sravni.Course) ([]sravni.Course, error) { func (h *syncSravniHandler) loadEducationalProducts(ctx context.Context, learningType, courseThematic string, buf []sravni.Course) ([]sravni.Course, error) {
const maxDeepIteration = 10 const maxDeepIteration = 10
const defaultLimit = 50 const defaultLimit = 50
@ -138,7 +184,10 @@ func (h *syncSravniHandler) loadEducationalProducts(ctx context.Context, learnin
} }
var offset int var offset int
params := sravni.ListEducationProductsParams{LearningType: learningType} params := sravni.ListEducationProductsParams{
LearningType: learningType,
CoursesThematics: []string{courseThematic},
}
for i := 0; i < maxDeepIteration; i++ { for i := 0; i < maxDeepIteration; i++ {
params.Limit = defaultLimit params.Limit = defaultLimit
params.Offset = offset params.Offset = offset
@ -242,9 +291,20 @@ func courseAsCreateCourseParams(course sravni.Course) command.CreateCourse {
} }
var ct string
if len(course.CourseThematics) > 0 {
ct = course.CourseThematics[0]
}
var lt string
if len(course.Learningtype) > 0 {
lt = course.Learningtype[0]
}
return command.CreateCourse{ return command.CreateCourse{
ID: courseid, ID: courseid,
ExternalID: nullable.NewValue(course.ID), ExternalID: nullable.NewValue(course.ID),
CourseThematic: ct,
LearningType: lt,
Name: course.Name, Name: course.Name,
SourceType: domain.SourceTypeParsed, SourceType: domain.SourceTypeParsed,
SourceName: nullable.NewValue("sravni"), SourceName: nullable.NewValue("sravni"),