diff --git a/Taskfile.yml b/Taskfile.yml index 454029a..53effde 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -32,6 +32,11 @@ tasks: cmds: - go build -o $GOBIN/sravnicli -v -ldflags '{{.LDFLAGS}}' cmd/dev/sravnicli/*.go deps: [check, test] + build: + cmds: + - task: build_dev_cli + - task: build_background + run: deps: [build] cmds: diff --git a/internal/common/client/sravni/client.go b/internal/common/client/sravni/client.go index f2325d9..1d2b97e 100644 --- a/internal/common/client/sravni/client.go +++ b/internal/common/client/sravni/client.go @@ -8,6 +8,7 @@ import ( "log/slog" "strconv" "strings" + "time" "git.loyso.art/frx/kurious/internal/common/errors" "git.loyso.art/frx/kurious/pkg/slices" @@ -16,6 +17,7 @@ import ( "github.com/go-resty/resty/v2" "golang.org/x/net/html" "golang.org/x/net/html/atom" + "golang.org/x/time/rate" ) const ( @@ -38,7 +40,8 @@ type Client interface { func NewClient(ctx context.Context, log *slog.Logger, debug bool) (c *client, err error) { c = &client{ - log: log.With(slog.String("client", "sravni")), + log: log.With(slog.String("client", "sravni")), + limiter: rate.NewLimiter(rate.Every(time.Millisecond*100), 1), http: resty.New(). SetBaseURL(baseURL). SetDebug(debug), @@ -71,6 +74,8 @@ type client struct { cachedMainPageInfo *PageState validLearningTypes querySet validCourseThematics querySet + + limiter *rate.Limiter } func (c *client) GetMainPageState() (*PageState, error) { @@ -146,24 +151,24 @@ type listEducationProductsRequest struct { // Filters LearningType []string `json:"learningtype"` CoursesThematics []string `json:"coursesThematics"` - Organizations []string `json:"organizations"` // list of ids - DictionatyFormatFilterNew []FilterFormat `json:"dictionaryFormatFilterNew"` - DictionaryTimeFilter []FilterTime `json:"dictionaryTimeFilter"` - DictionaryGraphicFilterNew []FilterGraphic `json:"dictionaryGraphicFilterNew"` - DictionatyLevelFilterNew []FilterLevel `json:"dictionaryLevelFilterNew"` + Organizations []string `json:"organizations,omitempty"` // list of ids + DictionatyFormatFilterNew []FilterFormat `json:"dictionaryFormatFilterNew,omitempty"` + DictionaryTimeFilter []FilterTime `json:"dictionaryTimeFilter,omitempty"` + DictionaryGraphicFilterNew []FilterGraphic `json:"dictionaryGraphicFilterNew,omitempty"` + DictionatyLevelFilterNew []FilterLevel `json:"dictionaryLevelFilterNew,omitempty"` // Options - SubMentor []stringifiedBool `json:"sub-mentor"` // option with mentor - SubTimeFree []stringifiedBool `json:"sub-timeFree"` // option with trial - SubJobGarantSub []stringifiedBool `json:"sub-jobGarantsub"` // option for job garantee - SubPriceFree []stringifiedBool `json:"sub-priceFree"` // only free - SubInstallment []stringifiedBool `json:"sub-installment"` // with credit - SubIsCourseProfession []stringifiedBool `json:"sub-isCourseProfession"` // освоить профессию с нуля - DevelopSkills []stringifiedBool `json:"developSkills"` // развить навыки + SubMentor []stringifiedBool `json:"sub-mentor,omitempty"` // option with mentor + SubTimeFree []stringifiedBool `json:"sub-timeFree,omitempty"` // option with trial + SubJobGarantSub []stringifiedBool `json:"sub-jobGarantsub,omitempty"` // option for job garantee + SubPriceFree []stringifiedBool `json:"sub-priceFree,omitempty"` // only free + SubInstallment []stringifiedBool `json:"sub-installment,omitempty"` // with credit + SubIsCourseProfession []stringifiedBool `json:"sub-isCourseProfession,omitempty"` // освоить профессию с нуля + DevelopSkills []stringifiedBool `json:"developSkills,omitempty"` // развить навыки - NotSubIsWebinar string `json:"not-sub-isWebinar"` - NotB2B string `json:"not-b2b"` - AdvertisingOnly bool `json:"advertisingOnly"` + NotSubIsWebinar string `json:"not-sub-isWebinar,omitempty"` + NotB2B string `json:"not-b2b,omitempty"` + AdvertisingOnly bool `json:"advertisingOnly,omitempty"` // Pagination and sorting Limit int `json:"limit"` @@ -223,6 +228,10 @@ func (c *client) ListEducationalProducts( Offset: params.Offset, } + if err = c.limiter.Wait(ctx); err != nil { + return result, fmt.Errorf("waiting for limit: %w", err) + } + resp, err := c.http.R(). SetBody(reqParams). SetResult(&result). @@ -304,6 +313,10 @@ func (c *client) ListEducationalProductsFilterCount( }, } + if err = c.limiter.Wait(ctx); err != nil { + return result, fmt.Errorf("waiting for limit: %w", err) + } + var respData DataContainer[ProductsFilterCount] resp, err := c.http.R(). SetBody(reqParams). diff --git a/internal/common/client/sravni/entities.go b/internal/common/client/sravni/entities.go index 5b84076..7e6690d 100644 --- a/internal/common/client/sravni/entities.go +++ b/internal/common/client/sravni/entities.go @@ -223,6 +223,7 @@ type Course struct { Discount CourseDiscount `json:"discount"` Link string `json:"link"` Learningtype []string `json:"learningtype"` + CourseThematics []string `json:"courseThematics"` DateStart *time.Time `json:"dateStart"` TimeStart *time.Time `json:"timeStart"` TimeAllHour *float64 `json:"timeAllHour"` diff --git a/internal/common/xcontext/log.go b/internal/common/xcontext/log.go index 1217616..5bb4e9e 100644 --- a/internal/common/xcontext/log.go +++ b/internal/common/xcontext/log.go @@ -34,6 +34,10 @@ func LogError(ctx context.Context, log *slog.Logger, msg string, attrs ...slog.A log.LogAttrs(ctx, slog.LevelError, msg, append(attrs, getLogFields(ctx)...)...) } +func LogWithWarnError(ctx context.Context, log *slog.Logger, err error, msg string, attrs ...slog.Attr) { + LogWarn(ctx, log, msg, append(attrs, slog.Any("err", err))...) +} + func LogWithError(ctx context.Context, log *slog.Logger, err error, msg string, attrs ...slog.Attr) { LogError(ctx, log, msg, append(attrs, slog.Any("err", err))...) } diff --git a/internal/common/xcontext/wait.go b/internal/common/xcontext/wait.go new file mode 100644 index 0000000..46778d3 --- /dev/null +++ b/internal/common/xcontext/wait.go @@ -0,0 +1,15 @@ +package xcontext + +import ( + "context" + "time" +) + +func Wait(ctx context.Context, wait time.Duration) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(wait): + return nil + } +} diff --git a/internal/kurious/adapters/ydb_course_repository.go b/internal/kurious/adapters/ydb_course_repository.go index dfb722e..76e6a4a 100644 --- a/internal/kurious/adapters/ydb_course_repository.go +++ b/internal/kurious/adapters/ydb_course_repository.go @@ -80,7 +80,36 @@ type ydbCourseRepository struct { } func (r *ydbCourseRepository) List(ctx context.Context, params domain.ListCoursesParams) (courses []domain.Course, err error) { + const limit = 1000 const queryName = "list" + const query = ` +DECLARE $limit AS Int32; +DECLARE $id AS Text; +SELECT + id, + external_id, + source_type, + source_name, + course_thematic, + learning_type, + organization_id, + origin_link, + image_link, + name, + description, + full_price, + discount, + duration, + starts_at, + created_at, + updated_at, + deleted_at +FROM + courses +WHERE + id > $id +ORDER BY id +LIMIT $limit;` courses = make([]domain.Course, 0, 4_000) readTx := table.TxControl( @@ -103,45 +132,38 @@ func (r *ydbCourseRepository) List(ctx context.Context, params domain.ListCourse ) }() - _, res, err := s.Execute( - ctx, - readTx, - `SELECT - id, - external_id, - source_type, - source_name, - organization_id, - origin_link, - image_link, - name, - description, - full_price, - discount, - duration, - starts_at, - created_at, - updated_at, - deleted_at - FROM - courses - `, - table.NewQueryParameters(), - options.WithCollectStatsModeBasic(), - ) - if err != nil { - return fmt.Errorf("executing: %w", err) - } + var lastKnownID string + for { + queryParams := table.NewQueryParameters( + table.ValueParam("$limit", types.Int32Value(limit)), + table.ValueParam("$id", types.TextValue(lastKnownID)), + ) + _, res, err := s.Execute( + ctx, readTx, query, queryParams, + options.WithCollectStatsModeBasic(), + ) + if err != nil { + return fmt.Errorf("executing: %w", err) + } + + if !res.NextResultSet(ctx) || !res.HasNextRow() { + break + } - for res.NextResultSet(ctx) { for res.NextRow() { var cdb courseDB - _ = res.ScanNamed(cdb.getNamedValues()...) + err = res.ScanNamed(cdb.getNamedValues()...) + if err != nil { + return fmt.Errorf("scanning row: %w", err) + } + courses = append(courses, mapCourseDB(cdb)) } - } - if err = res.Err(); err != nil { - return err + if err = res.Err(); err != nil { + return err + } + + lastKnownID = courses[len(courses)-1].ID } return nil }, @@ -187,6 +209,8 @@ func (r *ydbCourseRepository) Get(ctx context.Context, id string) (course domain external_id, source_type, source_name, + course_thematic, + learning_type, organization_id, origin_link, image_link, @@ -249,6 +273,8 @@ func createCourseParamsAsStruct(params domain.CreateCourseParams) types.Value { types.StructFieldValue("name", types.TextValue(params.Name)), types.StructFieldValue("source_type", types.TextValue(st)), types.StructFieldValue("source_name", types.NullableTextValue(params.SourceName.ValutPtr())), + types.StructFieldValue("course_thematic", types.TextValue(params.CourseThematic)), + types.StructFieldValue("learning_type", types.TextValue(params.LearningType)), types.StructFieldValue("external_id", types.NullableTextValue(params.ExternalID.ValutPtr())), types.StructFieldValue("organization_id", types.TextValue(params.OrganizationID)), types.StructFieldValue("origin_link", types.TextValue(params.OriginLink)), @@ -272,6 +298,8 @@ func (r *ydbCourseRepository) CreateBatch(ctx context.Context, params ...domain. name: Text, source_type: Text, source_name: Optional, + course_thematic: Text, + learning_type: Text, organization_id: Text, origin_link: Text, image_link: Text, @@ -292,6 +320,8 @@ func (r *ydbCourseRepository) CreateBatch(ctx context.Context, params ...domain. name, source_type, source_name, + course_thematic, + learning_type, organization_id, origin_link, image_link, @@ -328,65 +358,12 @@ func (r *ydbCourseRepository) CreateBatch(ctx context.Context, params ...domain. } func (r *ydbCourseRepository) Create(ctx context.Context, params domain.CreateCourseParams) (domain.Course, error) { - // -- PRAGMA TablePathPrefix("courses"); - const upsertQuery = `DECLARE $courseData AS List, - name: Text, - source_type: Text, - source_name: Optional, - organization_id: Text, - origin_link: Text, - image_link: Text, - description: Text, - full_price: Double, - discount: Double, - duration: Interval, - starts_at: Datetime, - created_at: Datetime, - updated_at: Datetime, - deleted_at: Optional>>; + err := r.CreateBatch(ctx, params) + if err != nil { + return domain.Course{}, err + } - REPLACE INTO - courses - SELECT - id, - external_id, - name, - source_type, - source_name, - organization_id, - origin_link, - image_link, - description, - full_price, - discount, - duration, - starts_at, - created_at, - updated_at, - deleted_at - FROM AS_TABLE($courseData);` - - writeTx := table.TxControl( - table.BeginTx( - table.WithSerializableReadWrite(), - ), - table.CommitTx(), - ) - err := r.db.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - queryParams := table.NewQueryParameters( - table.ValueParam("$courseData", types.ListValue(createCourseParamsAsStruct(params))), - ) - _, _, err := s.Execute(ctx, writeTx, upsertQuery, queryParams) - if err != nil { - return fmt.Errorf("executing query: %w", err) - } - - return nil - }) - - return domain.Course{}, err + return domain.Course{}, nil } func (r *ydbCourseRepository) Delete(ctx context.Context, id string) error { @@ -403,6 +380,8 @@ func (r *ydbCourseRepository) CreateCourseTable(ctx context.Context) error { options.WithColumn("name", types.TypeText), options.WithColumn("source_type", types.TypeText), options.WithColumn("source_name", types.Optional(types.TypeText)), + options.WithColumn("course_thematic", types.TypeText), + options.WithColumn("learning_type", types.TypeText), options.WithColumn("organization_id", types.TypeText), options.WithColumn("origin_link", types.TypeText), options.WithColumn("image_link", types.TypeText), @@ -425,6 +404,8 @@ type courseDB struct { Name string SourceType string SourceName *string + CourseThematic string + LearningType string OrganizationID string OriginLink string ImageLink string @@ -445,6 +426,8 @@ func (c *courseDB) getNamedValues() []named.Value { named.Optional("external_id", &c.ExternalID), named.Required("source_type", &c.SourceType), named.Optional("source_name", &c.SourceName), + named.Required("course_thematic", &c.CourseThematic), + named.Required("learning_type", &c.LearningType), named.Required("organization_id", &c.OrganizationID), named.Required("origin_link", &c.OriginLink), named.Required("image_link", &c.ImageLink), @@ -501,6 +484,8 @@ func mapCourseDB(cdb courseDB) domain.Course { Name: cdb.Name, SourceType: st, SourceName: nullable.NewValuePtr(cdb.SourceName), + Thematic: cdb.CourseThematic, + LearningType: cdb.LearningType, OrganizationID: cdb.OrganizationID, OriginLink: cdb.OriginLink, ImageLink: cdb.ImageLink, diff --git a/internal/kurious/app/command/createcourse.go b/internal/kurious/app/command/createcourse.go index ac0084d..91554f1 100644 --- a/internal/kurious/app/command/createcourse.go +++ b/internal/kurious/app/command/createcourse.go @@ -18,6 +18,8 @@ type CreateCourse struct { Name string SourceType domain.SourceType SourceName nullable.Value[string] + CourseThematic string + LearningType string OrganizationID string OriginLink string ImageLink string diff --git a/internal/kurious/domain/repository.go b/internal/kurious/domain/repository.go index 3a29503..b02afd9 100644 --- a/internal/kurious/domain/repository.go +++ b/internal/kurious/domain/repository.go @@ -19,6 +19,8 @@ type CreateCourseParams struct { Name string SourceType SourceType SourceName nullable.Value[string] + CourseThematic string + LearningType string OrganizationID string OriginLink string ImageLink string diff --git a/internal/kurious/ports/background.go b/internal/kurious/ports/background.go index c8e7faf..ace4078 100644 --- a/internal/kurious/ports/background.go +++ b/internal/kurious/ports/background.go @@ -108,10 +108,11 @@ type handler interface { func (bp *BackgroundProcess) registerHandler(ctx context.Context, spec, name string, h handler) (nullable.Value[cron.EntryID], error) { handlerField := slog.String("handler", name) - xcontext.LogInfo(ctx, bp.log, "registering handler", handlerField) + jctx := xcontext.WithLogFields(ctx, handlerField) + + xcontext.LogInfo(jctx, bp.log, "registering handler", handlerField) entry, err := bp.scheduler.AddJob(spec, cron.FuncJob(func() { - jctx := xcontext.WithLogFields(ctx, handlerField) err := h.Handle(jctx) if err != nil { xcontext.LogWithError(jctx, bp.log, err, "unable to run iteration") diff --git a/internal/kurious/ports/background/synchandler.go b/internal/kurious/ports/background/synchandler.go index bd4600f..b914b11 100644 --- a/internal/kurious/ports/background/synchandler.go +++ b/internal/kurious/ports/background/synchandler.go @@ -2,8 +2,10 @@ package background import ( "context" + "errors" "fmt" "log/slog" + "sync/atomic" "time" "golang.org/x/time/rate" @@ -33,9 +35,17 @@ type syncSravniHandler struct { log *slog.Logger knownExternalIDs map[string]struct{} + isRunning uint32 } func (h *syncSravniHandler) Handle(ctx context.Context) (err error) { + if !atomic.CompareAndSwapUint32(&h.isRunning, 0, 1) { + return nil + } + defer func() { + atomic.StoreUint32(&h.isRunning, 0) + }() + iterationID := generator.RandomInt64ID() ctx = xcontext.WithLogFields(ctx, slog.String("iteration_id", iterationID)) start := time.Now() @@ -72,6 +82,7 @@ func (h *syncSravniHandler) Handle(ctx context.Context) (err error) { learningTypes := state.Props.InitialReduxState.Dictionaries.Data.LearningType courses := make([]sravni.Course, 0, 1024) + buffer := make([]sravni.Course, 0, 512) for _, learningType := range learningTypes.Fields { select { case <-ctx.Done(): @@ -79,14 +90,49 @@ func (h *syncSravniHandler) Handle(ctx context.Context) (err error) { default: } - lctx := xcontext.WithLogFields(ctx, slog.String("learning_type", learningType.Name)) - xcontext.LogInfo(lctx, h.log, "parsing course", slog.String("name", learningType.Name)) + lctx := xcontext.WithLogFields(ctx, slog.String("learning_type", learningType.Value)) + xcontext.LogInfo(lctx, h.log, "parsing courses") start := time.Now() courses = courses[:0] - courses, err = h.loadEducationalProducts(lctx, learningType.Value, courses) + filterCount, err := h.client.ListEducationalProductsFilterCount(ctx, sravni.ListEducationProductsParams{ + LearningType: learningType.Value, + }) if err != nil { - return fmt.Errorf("loading educational products: %w", err) + return fmt.Errorf("loading products filter count: %w", err) + } + + thematics := make([]string, 0, len(filterCount.CoursesThematics)) + for cr, count := range filterCount.CoursesThematics { + if count == 0 { + continue + } + + thematics = append(thematics, cr) + } + + xcontext.LogDebug(lctx, h.log, "loaded course thematics for learning type", slog.Int("count", len(thematics))) + + // since count is known it might be optimized to allocate slice once per request. + for _, courseThematic := range thematics { + buffer = buffer[:0] + buffer, err = h.loadEducationalProducts(lctx, learningType.Value, courseThematic, buffer) + if err != nil { + if !errors.Is(err, context.Canceled) { + xcontext.LogWithWarnError(lctx, h.log, err, "unable to load educational products", slog.Int("count", len(thematics))) + continue + } + + return fmt.Errorf("loading educational products: %w", err) + } + + xslice.ForEach(buffer, func(c sravni.Course) { + c.Learningtype = []string{learningType.Value} + c.CourseThematics = []string{courseThematic} + courses = append(courses, c) + }) + + xcontext.LogInfo(lctx, h.log, "parsed subitems", slog.String("course_thematic", courseThematic), slog.Int("amount", len(buffer))) } elapsed := time.Since(start) @@ -123,7 +169,7 @@ func (h *syncSravniHandler) Handle(ctx context.Context) (err error) { return nil } -func (h *syncSravniHandler) loadEducationalProducts(ctx context.Context, learningType string, buf []sravni.Course) ([]sravni.Course, error) { +func (h *syncSravniHandler) loadEducationalProducts(ctx context.Context, learningType, courseThematic string, buf []sravni.Course) ([]sravni.Course, error) { const maxDeepIteration = 10 const defaultLimit = 50 @@ -138,7 +184,10 @@ func (h *syncSravniHandler) loadEducationalProducts(ctx context.Context, learnin } var offset int - params := sravni.ListEducationProductsParams{LearningType: learningType} + params := sravni.ListEducationProductsParams{ + LearningType: learningType, + CoursesThematics: []string{courseThematic}, + } for i := 0; i < maxDeepIteration; i++ { params.Limit = defaultLimit params.Offset = offset @@ -242,9 +291,20 @@ func courseAsCreateCourseParams(course sravni.Course) command.CreateCourse { } + var ct string + if len(course.CourseThematics) > 0 { + ct = course.CourseThematics[0] + } + var lt string + if len(course.Learningtype) > 0 { + lt = course.Learningtype[0] + } + return command.CreateCourse{ ID: courseid, ExternalID: nullable.NewValue(course.ID), + CourseThematic: ct, + LearningType: lt, Name: course.Name, SourceType: domain.SourceTypeParsed, SourceName: nullable.NewValue("sravni"),