* Added command and query for organizations * Saving unknown organizations into database in `background` service * Added `List` method in `OrganizationRepository`
439 lines
12 KiB
Go
439 lines
12 KiB
Go
package background
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"golang.org/x/time/rate"
|
|
|
|
"git.loyso.art/frx/kurious/internal/common/client/sravni"
|
|
"git.loyso.art/frx/kurious/internal/common/generator"
|
|
"git.loyso.art/frx/kurious/internal/common/nullable"
|
|
"git.loyso.art/frx/kurious/internal/common/xcontext"
|
|
"git.loyso.art/frx/kurious/internal/common/xslices"
|
|
"git.loyso.art/frx/kurious/internal/kurious/app/command"
|
|
"git.loyso.art/frx/kurious/internal/kurious/app/query"
|
|
"git.loyso.art/frx/kurious/internal/kurious/domain"
|
|
"git.loyso.art/frx/kurious/internal/kurious/service"
|
|
)
|
|
|
|
func NewSyncSravniHandler(svc service.Application, client sravni.Client, log *slog.Logger) *syncSravniHandler {
|
|
return &syncSravniHandler{
|
|
svc: svc,
|
|
client: client,
|
|
log: log,
|
|
}
|
|
}
|
|
|
|
type syncSravniHandler struct {
|
|
svc service.Application
|
|
client sravni.Client
|
|
log *slog.Logger
|
|
|
|
knownExternalIDs map[string]struct{}
|
|
knownOrganizationsByExternalID map[string]struct{}
|
|
isRunning uint32
|
|
}
|
|
|
|
func (h *syncSravniHandler) Handle(ctx context.Context) (err error) {
|
|
if !atomic.CompareAndSwapUint32(&h.isRunning, 0, 1) {
|
|
return nil
|
|
}
|
|
defer func() {
|
|
atomic.StoreUint32(&h.isRunning, 0)
|
|
}()
|
|
|
|
iterationID := generator.RandomInt64ID()
|
|
ctx = xcontext.WithLogFields(ctx, slog.String("iteration_id", iterationID))
|
|
start := time.Now()
|
|
xcontext.LogInfo(ctx, h.log, "handling iteration")
|
|
|
|
defer func() {
|
|
elapsed := slog.Duration("elapsed", time.Since(start))
|
|
if err != nil {
|
|
xcontext.LogWithError(ctx, h.log, err, "unable to handle iteration", elapsed)
|
|
return
|
|
}
|
|
xcontext.LogInfo(ctx, h.log, "iteration finished", elapsed)
|
|
}()
|
|
|
|
err = h.fillCaches(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
state, err := h.client.GetMainPageState()
|
|
if err != nil {
|
|
return fmt.Errorf("unable to get main page state: %w", err)
|
|
}
|
|
|
|
handleStart := time.Now()
|
|
defer func() {
|
|
elapsed := time.Since(handleStart)
|
|
xcontext.LogInfo(
|
|
ctx, h.log, "iteration finished",
|
|
slog.Duration("elapsed", elapsed),
|
|
slog.Bool("success", err == nil),
|
|
)
|
|
}()
|
|
|
|
learningTypes := state.Props.InitialReduxState.Dictionaries.Data.LearningType
|
|
courses := make([]sravni.Course, 0, 1024)
|
|
buffer := make([]sravni.Course, 0, 512)
|
|
organizations := make([]sravni.Organization, 0, 256)
|
|
for _, learningType := range learningTypes.Fields {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
lctx := xcontext.WithLogFields(ctx, slog.String("learning_type", learningType.Value))
|
|
xcontext.LogInfo(lctx, h.log, "parsing courses")
|
|
start := time.Now()
|
|
courses = courses[:0]
|
|
|
|
filterCount, err := h.client.ListEducationalProductsFilterCount(ctx, sravni.ListEducationProductsParams{
|
|
LearningType: learningType.Value,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("loading products filter count: %w", err)
|
|
}
|
|
|
|
thematics := make([]string, 0, len(filterCount.CoursesThematics))
|
|
for cr, count := range filterCount.CoursesThematics {
|
|
if count == 0 {
|
|
continue
|
|
}
|
|
|
|
thematics = append(thematics, cr)
|
|
}
|
|
|
|
xcontext.LogDebug(lctx, h.log, "loaded course thematics for learning type", slog.Int("count", len(thematics)))
|
|
|
|
// since count is known it might be optimized to allocate slice once per request.
|
|
for _, courseThematic := range thematics {
|
|
var filteredCourses int
|
|
var filteredOrgs int
|
|
|
|
var orgsByID map[string]sravni.Organization
|
|
buffer = buffer[:0]
|
|
buffer, orgsByID, err = h.loadEducationalProducts(lctx, learningType.Value, courseThematic, buffer)
|
|
if err != nil {
|
|
if errors.Is(err, context.Canceled) {
|
|
return fmt.Errorf("loading educational products: %w", err)
|
|
}
|
|
|
|
xcontext.LogWithWarnError(lctx, h.log, err, "unable to load educational products", slog.Int("count", len(thematics)))
|
|
|
|
continue
|
|
}
|
|
|
|
xslices.ForEach(buffer, func(c sravni.Course) {
|
|
// TODO: if the same course appears in different categories, it should be handled
|
|
if !h.setCourseIfNotKnown(c) {
|
|
return
|
|
}
|
|
|
|
c.Learningtype = []string{learningType.Value}
|
|
c.CourseThematics = []string{courseThematic}
|
|
courses = append(courses, c)
|
|
filteredCourses++
|
|
})
|
|
|
|
for _, org := range orgsByID {
|
|
if !h.setOrganizationIfNotKnown(org) {
|
|
continue
|
|
}
|
|
|
|
organizations = append(organizations, org)
|
|
filteredOrgs++
|
|
}
|
|
|
|
xcontext.LogInfo(
|
|
lctx, h.log, "parsed subitems",
|
|
slog.String("course_thematic", courseThematic),
|
|
slog.Int("amount", len(buffer)),
|
|
slog.Int("new_courses", filteredCourses),
|
|
slog.Int("new_organizations", filteredOrgs),
|
|
)
|
|
}
|
|
|
|
elapsed := time.Since(start)
|
|
xcontext.LogDebug(lctx, h.log, "parsed items", slog.Duration("elapsed", elapsed), slog.Int("amount", len(courses)))
|
|
|
|
xcontext.LogDebug(
|
|
lctx, h.log, "filtered items",
|
|
slog.Int("courses", len(courses)),
|
|
slog.Int("organizations", len(organizations)),
|
|
)
|
|
|
|
var insertCourseSuccess bool
|
|
if len(courses) > 0 {
|
|
err = h.insertCourses(lctx, courses)
|
|
if err != nil {
|
|
xcontext.LogWithError(lctx, h.log, err, "unable to insert courses")
|
|
}
|
|
|
|
insertCourseSuccess = err == nil
|
|
}
|
|
|
|
var insertOrgsSuccess bool
|
|
if len(organizations) > 0 {
|
|
err = h.insertOrganizations(lctx, organizations)
|
|
if err != nil {
|
|
xcontext.LogWithError(lctx, h.log, err, "unable to insert courses")
|
|
}
|
|
|
|
insertOrgsSuccess = err == nil
|
|
}
|
|
|
|
elapsed = time.Since(start) - elapsed
|
|
elapsedField := slog.Duration("elapsed", elapsed)
|
|
|
|
xcontext.LogInfo(
|
|
lctx, h.log, "inserting finished",
|
|
elapsedField,
|
|
slog.Bool("courses_insert_success", insertCourseSuccess),
|
|
slog.Bool("organization_insert_success", insertOrgsSuccess),
|
|
slog.Int("courses_count", len(courses)),
|
|
slog.Int("organizations_count", len(organizations)),
|
|
)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *syncSravniHandler) loadEducationalProducts(ctx context.Context, learningType, courseThematic string, buf []sravni.Course) ([]sravni.Course, map[string]sravni.Organization, error) {
|
|
const maxDeepIteration = 10
|
|
const defaultLimit = 50
|
|
|
|
rateStrategy := rate.Every(time.Millisecond * 400)
|
|
rateLimit := rate.NewLimiter(rateStrategy, 1)
|
|
|
|
var courses []sravni.Course
|
|
var organizationsByID = make(map[string]sravni.Organization)
|
|
if buf == nil || cap(buf) == 0 {
|
|
courses = make([]sravni.Course, 0, 256)
|
|
} else {
|
|
courses = buf
|
|
}
|
|
|
|
var offset int
|
|
params := sravni.ListEducationProductsParams{
|
|
LearningType: learningType,
|
|
CoursesThematics: []string{courseThematic},
|
|
}
|
|
for i := 0; i < maxDeepIteration; i++ {
|
|
params.Limit = defaultLimit
|
|
params.Offset = offset
|
|
response, err := h.client.ListEducationalProducts(ctx, params)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("listing educational products: %w", err)
|
|
}
|
|
|
|
offset += defaultLimit
|
|
|
|
courses = append(courses, response.Items...)
|
|
|
|
for oid, org := range response.Organizations {
|
|
organizationsByID[oid] = org
|
|
}
|
|
|
|
if len(response.Items) < defaultLimit {
|
|
break
|
|
}
|
|
|
|
err = rateLimit.Wait(ctx)
|
|
if err != nil {
|
|
return courses, organizationsByID, fmt.Errorf("waiting for limit: %w", err)
|
|
}
|
|
}
|
|
|
|
return courses, organizationsByID, nil
|
|
}
|
|
|
|
func (h *syncSravniHandler) setCourseIfNotKnown(course sravni.Course) (set bool) {
|
|
_, ok := h.knownExternalIDs[course.ID]
|
|
if !ok {
|
|
h.knownExternalIDs[course.ID] = struct{}{}
|
|
}
|
|
|
|
return !ok
|
|
}
|
|
|
|
func (h *syncSravniHandler) setOrganizationIfNotKnown(organization sravni.Organization) bool {
|
|
_, ok := h.knownOrganizationsByExternalID[organization.ID]
|
|
if !ok {
|
|
h.knownOrganizationsByExternalID[organization.ID] = struct{}{}
|
|
}
|
|
|
|
return !ok
|
|
}
|
|
|
|
func (h *syncSravniHandler) insertCourses(ctx context.Context, courses []sravni.Course) error {
|
|
courseParams := xslices.Map(courses, courseAsCreateCourseParams)
|
|
err := h.svc.Commands.InsertCourses.Handle(ctx, command.CreateCourses{
|
|
Courses: courseParams,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("inserting courses: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *syncSravniHandler) insertOrganizations(ctx context.Context, organizations []sravni.Organization) error {
|
|
organizationParams := xslices.Map(organizations, func(in sravni.Organization) command.CreateOrganization {
|
|
return command.CreateOrganization{
|
|
ID: generator.RandomInt64ID(),
|
|
ExternalID: nullable.NewValue(in.ID),
|
|
Alias: in.Alias,
|
|
Name: in.Name.Short,
|
|
Site: "",
|
|
Logo: in.Logotypes.Web,
|
|
}
|
|
})
|
|
|
|
for _, params := range organizationParams {
|
|
err := h.svc.Commands.InsertOrganization.Handle(ctx, params)
|
|
if err != nil {
|
|
return fmt.Errorf("inserting organization: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *syncSravniHandler) fillOrganizaionCaches(ctx context.Context) error {
|
|
if h.knownOrganizationsByExternalID != nil {
|
|
xcontext.LogDebug(ctx, h.log, "organization cache already filled")
|
|
|
|
return nil
|
|
}
|
|
|
|
organizations, err := h.svc.Queries.ListOrganzations.Handle(ctx, query.ListOrganizations{})
|
|
if err != nil {
|
|
return fmt.Errorf("listing organizations: %w", err)
|
|
}
|
|
|
|
withExternalID := func(in domain.Organization) bool {
|
|
return in.ExternalID.Valid()
|
|
}
|
|
getExtID := func(in domain.Organization) string {
|
|
return in.ExternalID.Value()
|
|
}
|
|
|
|
h.knownOrganizationsByExternalID = xslices.AsMap(xslices.Filter(organizations, withExternalID), getExtID)
|
|
|
|
xcontext.LogInfo(ctx, h.log, "cache filled", slog.String("kind", "organizations_by_external_id"), slog.Int("count", len(organizations)))
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *syncSravniHandler) fillCaches(ctx context.Context) error {
|
|
err := h.fillOrganizaionCaches(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = h.fillKnownExternalIDsCache(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *syncSravniHandler) fillKnownExternalIDsCache(ctx context.Context) error {
|
|
if h.knownExternalIDs != nil {
|
|
xcontext.LogInfo(ctx, h.log, "cache already filled")
|
|
|
|
return nil
|
|
}
|
|
|
|
result, err := h.svc.Queries.ListCourses.Handle(ctx, query.ListCourse{})
|
|
if err != nil {
|
|
return fmt.Errorf("listing courses: %w", err)
|
|
}
|
|
|
|
courses := result.Courses
|
|
|
|
h.knownExternalIDs = make(map[string]struct{}, len(courses))
|
|
|
|
xslices.ForEach(courses, func(c domain.Course) {
|
|
if !c.ExternalID.Valid() {
|
|
return
|
|
}
|
|
h.knownExternalIDs[c.ExternalID.Value()] = struct{}{}
|
|
})
|
|
|
|
xcontext.LogInfo(ctx, h.log, "cache filled", slog.String("kind", "courses_by_external_id"), slog.Int("count", len(courses)))
|
|
|
|
return nil
|
|
}
|
|
|
|
func courseAsCreateCourseParams(course sravni.Course) command.CreateCourse {
|
|
courseid := generator.RandomInt64ID()
|
|
var startAt time.Time
|
|
if course.DateStart != nil {
|
|
startAt = *course.DateStart
|
|
}
|
|
if course.TimeStart != nil {
|
|
startAtUnix := startAt.Unix() + course.TimeStart.Unix()
|
|
startAt = time.Unix(startAtUnix, 0)
|
|
}
|
|
|
|
var courseDuration time.Duration
|
|
if course.TimeAllDay != nil {
|
|
courseDuration += time.Hour * 24 * time.Duration(*course.TimeAllDay)
|
|
}
|
|
if course.TimeAllHour != nil {
|
|
courseDuration += time.Hour * time.Duration(*course.TimeAllHour)
|
|
}
|
|
if course.TimeAllMonth != nil {
|
|
courseDuration += time.Hour * 24 * 30 * time.Duration(*course.TimeAllMonth)
|
|
}
|
|
|
|
var discount float64
|
|
switch td := course.Discount.Percent.(type) {
|
|
case int:
|
|
discount = float64(td) / 100
|
|
case float64:
|
|
discount = td / 100
|
|
default:
|
|
|
|
}
|
|
|
|
var ct string
|
|
if len(course.CourseThematics) > 0 {
|
|
ct = course.CourseThematics[0]
|
|
}
|
|
var lt string
|
|
if len(course.Learningtype) > 0 {
|
|
lt = course.Learningtype[0]
|
|
}
|
|
|
|
return command.CreateCourse{
|
|
ID: courseid,
|
|
ExternalID: nullable.NewValue(course.ID),
|
|
CourseThematic: ct,
|
|
LearningType: lt,
|
|
Name: course.Name,
|
|
SourceType: domain.SourceTypeParsed,
|
|
SourceName: nullable.NewValue("sravni"),
|
|
OrganizationID: course.Organization,
|
|
OriginLink: course.Link,
|
|
ImageLink: course.CourseImage,
|
|
Description: "", // should be added to parse in queue
|
|
FullPrice: course.PriceAll,
|
|
Discount: discount,
|
|
StartsAt: startAt,
|
|
Duration: courseDuration,
|
|
}
|
|
}
|