setup parser
This commit is contained in:
129
internal/kurious/ports/background.go
Normal file
129
internal/kurious/ports/background.go
Normal file
@ -0,0 +1,129 @@
|
||||
package ports
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.loyso.art/frx/kurious/internal/common/client/sravni"
|
||||
"git.loyso.art/frx/kurious/internal/common/errors"
|
||||
"git.loyso.art/frx/kurious/internal/common/nullable"
|
||||
"git.loyso.art/frx/kurious/internal/common/xcontext"
|
||||
"git.loyso.art/frx/kurious/internal/common/xlog"
|
||||
"git.loyso.art/frx/kurious/internal/kurious/ports/background"
|
||||
"git.loyso.art/frx/kurious/internal/kurious/service"
|
||||
|
||||
"github.com/robfig/cron/v3"
|
||||
)
|
||||
|
||||
type BackgroundProcess struct {
|
||||
scheduler *cron.Cron
|
||||
log *slog.Logger
|
||||
|
||||
syncSravniHandlerEntryID nullable.Value[cron.EntryID]
|
||||
syncSravniHandler handler
|
||||
}
|
||||
|
||||
func NewBackgroundProcess(ctx context.Context, log *slog.Logger) *BackgroundProcess {
|
||||
clog := xlog.WrapSLogger(ctx, log)
|
||||
scheduler := cron.New(
|
||||
cron.WithSeconds(),
|
||||
cron.WithChain(
|
||||
cron.Recover(clog),
|
||||
),
|
||||
)
|
||||
|
||||
bp := &BackgroundProcess{
|
||||
scheduler: scheduler,
|
||||
log: log,
|
||||
}
|
||||
|
||||
return bp
|
||||
}
|
||||
|
||||
func (bp *BackgroundProcess) RegisterSyncSravniHandler(
|
||||
ctx context.Context,
|
||||
svc service.Application,
|
||||
client sravni.Client,
|
||||
cronValue string,
|
||||
) (err error) {
|
||||
const handlerName = "sync_sravni_handler"
|
||||
|
||||
bp.syncSravniHandler = background.NewSyncSravniHandler(svc, client, bp.log)
|
||||
|
||||
if cronValue == "" {
|
||||
return nil
|
||||
}
|
||||
bp.syncSravniHandlerEntryID, err = bp.registerHandler(ctx, cronValue, handlerName, bp.syncSravniHandler)
|
||||
return err
|
||||
}
|
||||
|
||||
func (bp *BackgroundProcess) ForceExecuteSyncSravniHandler(ctx context.Context) error {
|
||||
if bp.syncSravniHandler == nil {
|
||||
return errors.SimpleError("sync sravni handler not mounted")
|
||||
}
|
||||
return bp.syncSravniHandler.Handle(ctx)
|
||||
}
|
||||
|
||||
type NextRunStats map[string]time.Time
|
||||
|
||||
func (s NextRunStats) String() string {
|
||||
var sb strings.Builder
|
||||
_ = json.NewEncoder(&sb).Encode(s)
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
func (bp *BackgroundProcess) GetNextRunStats() NextRunStats {
|
||||
out := make(NextRunStats)
|
||||
if bp.syncSravniHandlerEntryID.Valid() {
|
||||
sEntry := bp.scheduler.Entry(bp.syncSravniHandlerEntryID.Value())
|
||||
out["sravni_handler"] = sEntry.Next
|
||||
}
|
||||
|
||||
return NextRunStats(out)
|
||||
}
|
||||
|
||||
func (bp *BackgroundProcess) Run() {
|
||||
bp.scheduler.Run()
|
||||
}
|
||||
|
||||
func (bp *BackgroundProcess) Shutdown(ctx context.Context) error {
|
||||
sdctx := bp.scheduler.Stop()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-sdctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
type handler interface {
|
||||
Handle(context.Context) error
|
||||
}
|
||||
|
||||
func (bp *BackgroundProcess) registerHandler(ctx context.Context, spec, name string, h handler) (nullable.Value[cron.EntryID], error) {
|
||||
handlerField := slog.String("handler", name)
|
||||
xcontext.LogInfo(ctx, bp.log, "registering handler", handlerField)
|
||||
|
||||
entry, err := bp.scheduler.AddJob(spec, cron.FuncJob(func() {
|
||||
jctx := xcontext.WithLogFields(ctx, handlerField)
|
||||
err := h.Handle(jctx)
|
||||
if err != nil {
|
||||
xcontext.LogWithError(jctx, bp.log, err, "unable to run iteration")
|
||||
}
|
||||
xcontext.LogInfo(jctx, bp.log, "iteration completed")
|
||||
}))
|
||||
|
||||
var out nullable.Value[cron.EntryID]
|
||||
if err != nil {
|
||||
return out, fmt.Errorf("adding %s job: %w", name, err)
|
||||
}
|
||||
|
||||
out.Set(entry)
|
||||
|
||||
return out, nil
|
||||
}
|
||||
260
internal/kurious/ports/background/synchandler.go
Normal file
260
internal/kurious/ports/background/synchandler.go
Normal file
@ -0,0 +1,260 @@
|
||||
package background
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"git.loyso.art/frx/kurious/internal/common/client/sravni"
|
||||
"git.loyso.art/frx/kurious/internal/common/generator"
|
||||
"git.loyso.art/frx/kurious/internal/common/nullable"
|
||||
"git.loyso.art/frx/kurious/internal/common/xcontext"
|
||||
"git.loyso.art/frx/kurious/internal/common/xslice"
|
||||
"git.loyso.art/frx/kurious/internal/kurious/app/command"
|
||||
"git.loyso.art/frx/kurious/internal/kurious/app/query"
|
||||
"git.loyso.art/frx/kurious/internal/kurious/domain"
|
||||
"git.loyso.art/frx/kurious/internal/kurious/service"
|
||||
)
|
||||
|
||||
func NewSyncSravniHandler(svc service.Application, client sravni.Client, log *slog.Logger) *syncSravniHandler {
|
||||
return &syncSravniHandler{
|
||||
svc: svc,
|
||||
client: client,
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
type syncSravniHandler struct {
|
||||
svc service.Application
|
||||
client sravni.Client
|
||||
log *slog.Logger
|
||||
|
||||
knownExternalIDs map[string]struct{}
|
||||
}
|
||||
|
||||
func (h *syncSravniHandler) Handle(ctx context.Context) (err error) {
|
||||
iterationID := generator.RandomInt64ID()
|
||||
ctx = xcontext.WithLogFields(ctx, slog.String("iteration_id", iterationID))
|
||||
start := time.Now()
|
||||
xcontext.LogInfo(ctx, h.log, "handling iteration")
|
||||
|
||||
defer func() {
|
||||
elapsed := slog.Duration("elapsed", time.Since(start))
|
||||
if err != nil {
|
||||
xcontext.LogWithError(ctx, h.log, err, "unable to handle iteration", elapsed)
|
||||
return
|
||||
}
|
||||
xcontext.LogInfo(ctx, h.log, "iteration finished", elapsed)
|
||||
}()
|
||||
|
||||
err = h.fillCaches(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
state, err := h.client.GetMainPageState()
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to get main page state: %w", err)
|
||||
}
|
||||
|
||||
handleStart := time.Now()
|
||||
defer func() {
|
||||
elapsed := time.Since(handleStart)
|
||||
xcontext.LogInfo(
|
||||
ctx, h.log, "iteration finished",
|
||||
slog.Duration("elapsed", elapsed),
|
||||
slog.Bool("success", err == nil),
|
||||
)
|
||||
}()
|
||||
|
||||
learningTypes := state.Props.InitialReduxState.Dictionaries.Data.LearningType
|
||||
courses := make([]sravni.Course, 0, 1024)
|
||||
for _, learningType := range learningTypes.Fields {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
lctx := xcontext.WithLogFields(ctx, slog.String("learning_type", learningType.Name))
|
||||
xcontext.LogInfo(lctx, h.log, "parsing course", slog.String("name", learningType.Name))
|
||||
start := time.Now()
|
||||
courses = courses[:0]
|
||||
|
||||
courses, err = h.loadEducationalProducts(lctx, learningType.Value, courses)
|
||||
if err != nil {
|
||||
return fmt.Errorf("loading educational products: %w", err)
|
||||
}
|
||||
|
||||
elapsed := time.Since(start)
|
||||
xcontext.LogDebug(lctx, h.log, "parsed items", slog.Duration("elapsed", elapsed), slog.Int("amount", len(courses)))
|
||||
|
||||
// TODO: if the same course appears in different categories, it should be handled
|
||||
courses = h.filterByCache(courses)
|
||||
if len(courses) == 0 {
|
||||
xcontext.LogInfo(lctx, h.log, "all courses were filtered out")
|
||||
continue
|
||||
}
|
||||
|
||||
xcontext.LogDebug(lctx, h.log, "filtered items", slog.Int("amount", len(courses)))
|
||||
|
||||
err = h.insertValues(lctx, courses)
|
||||
elapsed = time.Since(start) - elapsed
|
||||
elapsedField := slog.Duration("elapsed", elapsed)
|
||||
if err != nil {
|
||||
xcontext.LogWithError(lctx, h.log, err, "unable to insert courses", elapsedField)
|
||||
continue
|
||||
}
|
||||
|
||||
xslice.ForEach(courses, func(c sravni.Course) {
|
||||
h.knownExternalIDs[c.ID] = struct{}{}
|
||||
})
|
||||
|
||||
xcontext.LogInfo(
|
||||
lctx, h.log, "processed items",
|
||||
elapsedField,
|
||||
slog.Int("count", len(courses)),
|
||||
)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *syncSravniHandler) loadEducationalProducts(ctx context.Context, learningType string, buf []sravni.Course) ([]sravni.Course, error) {
|
||||
const maxDeepIteration = 10
|
||||
const defaultLimit = 50
|
||||
|
||||
rateStrategy := rate.Every(time.Millisecond * 400)
|
||||
rateLimit := rate.NewLimiter(rateStrategy, 1)
|
||||
|
||||
var courses []sravni.Course
|
||||
if buf == nil || cap(buf) == 0 {
|
||||
courses = make([]sravni.Course, 0, 256)
|
||||
} else {
|
||||
courses = buf
|
||||
}
|
||||
|
||||
var offset int
|
||||
params := sravni.ListEducationProductsParams{LearningType: learningType}
|
||||
for i := 0; i < maxDeepIteration; i++ {
|
||||
params.Limit = defaultLimit
|
||||
params.Offset = offset
|
||||
response, err := h.client.ListEducationalProducts(ctx, params)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("listing educational products: %w", err)
|
||||
}
|
||||
|
||||
offset += defaultLimit
|
||||
courses = append(courses, response.Items...)
|
||||
if len(response.Items) < defaultLimit {
|
||||
break
|
||||
}
|
||||
|
||||
err = rateLimit.Wait(ctx)
|
||||
if err != nil {
|
||||
return courses, fmt.Errorf("waiting for limit: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return courses, nil
|
||||
}
|
||||
|
||||
func (h *syncSravniHandler) filterByCache(courses []sravni.Course) (toInsert []sravni.Course) {
|
||||
toCut := xslice.FilterInplace(courses, xslice.Not(h.isCached))
|
||||
return courses[:toCut]
|
||||
}
|
||||
|
||||
func (h *syncSravniHandler) isCached(course sravni.Course) bool {
|
||||
_, ok := h.knownExternalIDs[course.ID]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (h *syncSravniHandler) insertValues(ctx context.Context, courses []sravni.Course) error {
|
||||
courseParams := xslice.Map(courses, courseAsCreateCourseParams)
|
||||
err := h.svc.Commands.InsertCourses.Handle(ctx, command.CreateCourses{
|
||||
Courses: courseParams,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("inserting courses: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *syncSravniHandler) fillCaches(ctx context.Context) error {
|
||||
if h.knownExternalIDs != nil {
|
||||
xcontext.LogInfo(ctx, h.log, "cache already filled")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
courses, err := h.svc.Queries.ListCourses.Handle(ctx, query.ListCourse{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("listing courses: %w", err)
|
||||
}
|
||||
|
||||
h.knownExternalIDs = make(map[string]struct{}, len(courses))
|
||||
|
||||
xslice.ForEach(courses, func(c domain.Course) {
|
||||
if !c.ExternalID.Valid() {
|
||||
return
|
||||
}
|
||||
h.knownExternalIDs[c.ExternalID.Value()] = struct{}{}
|
||||
})
|
||||
|
||||
xcontext.LogInfo(ctx, h.log, "cache filled", slog.Int("count", len(courses)))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func courseAsCreateCourseParams(course sravni.Course) command.CreateCourse {
|
||||
courseid := generator.RandomInt64ID()
|
||||
var startAt time.Time
|
||||
if course.DateStart != nil {
|
||||
startAt = *course.DateStart
|
||||
}
|
||||
if course.TimeStart != nil {
|
||||
startAtUnix := startAt.Unix() + course.TimeStart.Unix()
|
||||
startAt = time.Unix(startAtUnix, 0)
|
||||
}
|
||||
|
||||
var courseDuration time.Duration
|
||||
if course.TimeAllDay != nil {
|
||||
courseDuration += time.Hour * 24 * time.Duration(*course.TimeAllDay)
|
||||
}
|
||||
if course.TimeAllHour != nil {
|
||||
courseDuration += time.Hour * time.Duration(*course.TimeAllHour)
|
||||
}
|
||||
if course.TimeAllMonth != nil {
|
||||
courseDuration += time.Hour * 24 * 30 * time.Duration(*course.TimeAllMonth)
|
||||
}
|
||||
|
||||
var discount float64
|
||||
switch td := course.Discount.Percent.(type) {
|
||||
case int:
|
||||
discount = float64(td) / 100
|
||||
case float64:
|
||||
discount = td / 100
|
||||
default:
|
||||
|
||||
}
|
||||
|
||||
return command.CreateCourse{
|
||||
ID: courseid,
|
||||
ExternalID: nullable.NewValue(course.ID),
|
||||
Name: course.Name,
|
||||
SourceType: domain.SourceTypeParsed,
|
||||
SourceName: nullable.NewValue("sravni"),
|
||||
OrganizationID: course.Organization,
|
||||
OriginLink: course.Link,
|
||||
ImageLink: course.CourseImage,
|
||||
Description: "", // should be added to parse in queue
|
||||
FullPrice: course.PriceAll,
|
||||
Discount: discount,
|
||||
StartsAt: startAt,
|
||||
Duration: courseDuration,
|
||||
}
|
||||
}
|
||||
@ -1,43 +0,0 @@
|
||||
package ports
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
|
||||
"git.loyso.art/frx/kurious/internal/common/xlog"
|
||||
"git.loyso.art/frx/kurious/internal/kurious/service"
|
||||
|
||||
"github.com/robfig/cron/v3"
|
||||
)
|
||||
|
||||
type BackgroundParser struct {
|
||||
scheduler *cron.Cron
|
||||
}
|
||||
|
||||
func NewBackgroundParser(ctx context.Context, svc service.Application, log *slog.Logger) *BackgroundParser {
|
||||
clog := xlog.WrapSLogger(ctx, log)
|
||||
scheduler := cron.New(cron.WithSeconds(), cron.WithChain(
|
||||
cron.Recover(clog),
|
||||
))
|
||||
|
||||
bp := &BackgroundParser{
|
||||
scheduler: scheduler,
|
||||
}
|
||||
|
||||
return bp
|
||||
}
|
||||
|
||||
func (bp *BackgroundParser) Run() {
|
||||
bp.scheduler.Run()
|
||||
}
|
||||
|
||||
func (bp *BackgroundParser) Shutdown(ctx context.Context) error {
|
||||
sdctx := bp.scheduler.Stop()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-sdctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user