able to get product
This commit is contained in:
24
internal/common/config/duration.go
Normal file
24
internal/common/config/duration.go
Normal file
@ -0,0 +1,24 @@
|
||||
package config
|
||||
|
||||
import "time"
|
||||
|
||||
type Duration time.Duration
|
||||
|
||||
func (d *Duration) UnmarshalJSON(data []byte) error {
|
||||
if len(data) == 0 {
|
||||
*d = 0
|
||||
return nil
|
||||
}
|
||||
|
||||
duration, err := time.ParseDuration(string(data))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*d = Duration(duration)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d Duration) Std() time.Duration {
|
||||
return time.Duration(d)
|
||||
}
|
||||
@ -1,8 +1,11 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"log/slog"
|
||||
"os"
|
||||
|
||||
"git.loyso.art/frx/kurious/internal/common/errors"
|
||||
)
|
||||
|
||||
type LogFormat uint8
|
||||
@ -12,6 +15,19 @@ const (
|
||||
LogFormatJSON
|
||||
)
|
||||
|
||||
func (f *LogFormat) UnmarshalText(data []byte) error {
|
||||
switch format := string(bytes.ToLower(data)); format {
|
||||
case "json":
|
||||
*f = LogFormatJSON
|
||||
case "text":
|
||||
*f = LogFormatText
|
||||
default:
|
||||
return errors.NewValidationError("format", "unsupported value "+format)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type LogLevel uint8
|
||||
|
||||
const (
|
||||
@ -21,12 +37,29 @@ const (
|
||||
LogLevelError
|
||||
)
|
||||
|
||||
type LogConfig struct {
|
||||
Level LogLevel
|
||||
Format LogFormat
|
||||
func (lvl *LogLevel) UnmarshalText(data []byte) error {
|
||||
switch level := string(bytes.ToLower(data)); level {
|
||||
case "debug", "":
|
||||
*lvl = LogLevelDebug
|
||||
case "info":
|
||||
*lvl = LogLevelInfo
|
||||
case "warn":
|
||||
*lvl = LogLevelWarn
|
||||
case "error":
|
||||
*lvl = LogLevelError
|
||||
default:
|
||||
return errors.NewValidationError("level", "unsupported value "+level)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewSLogger(config LogConfig) *slog.Logger {
|
||||
type Log struct {
|
||||
Level LogLevel `json:"level"`
|
||||
Format LogFormat `json:"format"`
|
||||
}
|
||||
|
||||
func NewSLogger(config Log) *slog.Logger {
|
||||
var level slog.Level
|
||||
switch config.Level {
|
||||
case LogLevelDebug:
|
||||
|
||||
63
internal/common/config/ydb.go
Normal file
63
internal/common/config/ydb.go
Normal file
@ -0,0 +1,63 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"git.loyso.art/frx/kurious/internal/common/errors"
|
||||
)
|
||||
|
||||
type YCAuth interface {
|
||||
isYCAuth()
|
||||
}
|
||||
|
||||
type YCAuthCAKeysFile struct{ Path string }
|
||||
|
||||
func (YCAuthCAKeysFile) isYCAuth() {}
|
||||
|
||||
type YCAuthIAMToken struct{ Token string }
|
||||
|
||||
func (YCAuthIAMToken) isYCAuth() {}
|
||||
|
||||
type YCAuthNone struct{}
|
||||
|
||||
func (YCAuthNone) isYCAuth() {}
|
||||
|
||||
type YDB struct {
|
||||
DSN string
|
||||
Auth YCAuth
|
||||
ShutdownDuration time.Duration
|
||||
}
|
||||
|
||||
func (ydb *YDB) UnmarshalJSON(data []byte) error {
|
||||
type ydbConfig struct {
|
||||
DSN string `json:"dsn"`
|
||||
CAKeysFile *string `json:"ca_keys_file_path"`
|
||||
StaticIAMToken *string `json:"static_iam_token"`
|
||||
ShutdownDuration Duration `json:"duration"`
|
||||
}
|
||||
|
||||
var imcfg ydbConfig
|
||||
err := json.Unmarshal(data, &imcfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ydb.DSN = imcfg.DSN
|
||||
ydb.ShutdownDuration = imcfg.ShutdownDuration.Std()
|
||||
if imcfg.CAKeysFile != nil && imcfg.StaticIAMToken != nil {
|
||||
return errors.NewValidationError("ca_keys_file_path", "could not be set together with static_iam_token field")
|
||||
} else if imcfg.CAKeysFile != nil {
|
||||
ydb.Auth = YCAuthCAKeysFile{
|
||||
Path: *imcfg.CAKeysFile,
|
||||
}
|
||||
} else if imcfg.StaticIAMToken != nil {
|
||||
ydb.Auth = YCAuthIAMToken{
|
||||
Token: *imcfg.StaticIAMToken,
|
||||
}
|
||||
} else {
|
||||
ydb.Auth = YCAuthNone{}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -5,6 +5,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
ErrNotFound SimpleError = "not found"
|
||||
ErrNotImplemented SimpleError = "not implemented"
|
||||
ErrUnexpectedStatus SimpleError = "unexpected status"
|
||||
)
|
||||
|
||||
32
internal/common/xlog/cronlogger.go
Normal file
32
internal/common/xlog/cronlogger.go
Normal file
@ -0,0 +1,32 @@
|
||||
package xlog
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
type cronlogger struct {
|
||||
basectx context.Context
|
||||
log *slog.Logger
|
||||
}
|
||||
|
||||
func WrapSLogger(ctx context.Context, log *slog.Logger) cronlogger {
|
||||
return cronlogger{
|
||||
basectx: ctx,
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
func (l cronlogger) Info(msg string, keysAndValues ...any) {
|
||||
attrs := mapKeysAndValues(keysAndValues...)
|
||||
l.log.LogAttrs(l.basectx, slog.LevelInfo, msg, attrs...)
|
||||
}
|
||||
|
||||
func (l cronlogger) Error(err error, msg string, keysAndValues ...any) {
|
||||
attrs := append(mapKeysAndValues(keysAndValues...), slog.Any("err", err))
|
||||
l.log.LogAttrs(l.basectx, slog.LevelError, msg, attrs...)
|
||||
}
|
||||
|
||||
func mapKeysAndValues(keysAndValues ...any) []slog.Attr {
|
||||
return nil
|
||||
}
|
||||
@ -2,31 +2,279 @@ package adapters
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"git.loyso.art/frx/kurious/internal/common/config"
|
||||
"git.loyso.art/frx/kurious/internal/common/errors"
|
||||
"git.loyso.art/frx/kurious/internal/common/nullable"
|
||||
"git.loyso.art/frx/kurious/internal/common/xcontext"
|
||||
"git.loyso.art/frx/kurious/internal/kurious/domain"
|
||||
"git.loyso.art/frx/kurious/pkg/xdefault"
|
||||
|
||||
"github.com/ydb-platform/ydb-go-sdk/v3"
|
||||
"github.com/ydb-platform/ydb-go-sdk/v3/table"
|
||||
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
|
||||
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
|
||||
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
|
||||
yc "github.com/ydb-platform/ydb-go-yc"
|
||||
)
|
||||
|
||||
func NewYDBCourseRepository() (*ydbCourseRepository, error) {
|
||||
return &ydbCourseRepository{}, nil
|
||||
const (
|
||||
defaultShutdownTimeout = time.Second * 10
|
||||
)
|
||||
|
||||
type YDBConnection struct {
|
||||
*ydb.Driver
|
||||
|
||||
log *slog.Logger
|
||||
shutdownTimeout time.Duration
|
||||
}
|
||||
|
||||
type ydbCourseRepository struct{}
|
||||
func NewYDBConnection(ctx context.Context, cfg config.YDB, log *slog.Logger) (*YDBConnection, error) {
|
||||
opts := make([]ydb.Option, 0, 2)
|
||||
switch auth := cfg.Auth.(type) {
|
||||
case config.YCAuthIAMToken:
|
||||
opts = append(opts, ydb.WithAccessTokenCredentials(auth.Token))
|
||||
case config.YCAuthCAKeysFile:
|
||||
opts = append(opts,
|
||||
yc.WithInternalCA(),
|
||||
yc.WithServiceAccountKeyFileCredentials(auth.Path),
|
||||
)
|
||||
}
|
||||
db, err := ydb.Open(
|
||||
ctx,
|
||||
cfg.DSN,
|
||||
opts...,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("opening connection: %w", err)
|
||||
}
|
||||
|
||||
func (ydbCourseRepository) List(ctx context.Context, params domain.ListCoursesParams) ([]domain.Course, error) {
|
||||
return &YDBConnection{
|
||||
Driver: db,
|
||||
shutdownTimeout: xdefault.WithFallback(cfg.ShutdownDuration, defaultShutdownTimeout),
|
||||
log: log,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (conn *YDBConnection) Close() error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), conn.shutdownTimeout)
|
||||
defer cancel()
|
||||
|
||||
return conn.Driver.Close(ctx)
|
||||
}
|
||||
|
||||
func (conn *YDBConnection) CourseRepository() *ydbCourseRepository {
|
||||
return &ydbCourseRepository{
|
||||
db: conn.Driver,
|
||||
log: conn.log.With(slog.String("repository", "course")),
|
||||
}
|
||||
}
|
||||
|
||||
type ydbCourseRepository struct {
|
||||
db *ydb.Driver
|
||||
log *slog.Logger
|
||||
}
|
||||
|
||||
func (r *ydbCourseRepository) List(ctx context.Context, params domain.ListCoursesParams) ([]domain.Course, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (ydbCourseRepository) Get(ctx context.Context, id string) (domain.Course, error) {
|
||||
|
||||
func (r *ydbCourseRepository) Get(ctx context.Context, id string) (course domain.Course, err error) {
|
||||
const queryName = "get"
|
||||
|
||||
courses := make([]domain.Course, 0, 1)
|
||||
readTx := table.TxControl(
|
||||
table.BeginTx(
|
||||
table.WithOnlineReadOnly(),
|
||||
),
|
||||
table.CommitTx(),
|
||||
)
|
||||
err = r.db.Table().Do(
|
||||
ctx,
|
||||
func(ctx context.Context, s table.Session) error {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
since := time.Since(start)
|
||||
xcontext.LogInfo(
|
||||
ctx, r.log,
|
||||
"executed query",
|
||||
slog.String("name", queryName),
|
||||
slog.Duration("elapsed", since),
|
||||
)
|
||||
}()
|
||||
|
||||
_, res, err := s.Execute(
|
||||
ctx,
|
||||
readTx,
|
||||
`
|
||||
DECLARE $id AS Text;
|
||||
SELECT
|
||||
id,
|
||||
external_id,
|
||||
source_type,
|
||||
source_name,
|
||||
organization_id,
|
||||
origin_link,
|
||||
image_link,
|
||||
name,
|
||||
description,
|
||||
full_price,
|
||||
discount,
|
||||
duration,
|
||||
starts_at,
|
||||
created_at,
|
||||
updated_at,
|
||||
deleted_at,
|
||||
FROM
|
||||
courses
|
||||
WHERE
|
||||
id = $id;
|
||||
`,
|
||||
table.NewQueryParameters(
|
||||
table.ValueParam("$id", types.TextValue(id)),
|
||||
),
|
||||
options.WithCollectStatsModeBasic(),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("executing: %w", err)
|
||||
}
|
||||
|
||||
for res.NextResultSet(ctx) {
|
||||
for res.NextRow() {
|
||||
var cdb courseDB
|
||||
_ = res.ScanNamed(cdb.getNamedValues()...)
|
||||
courses = append(courses, mapCourseDB(cdb))
|
||||
}
|
||||
}
|
||||
if err = res.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
},
|
||||
table.WithIdempotent())
|
||||
if err != nil {
|
||||
return domain.Course{}, err
|
||||
}
|
||||
|
||||
if len(courses) == 0 {
|
||||
return course, errors.ErrNotFound
|
||||
}
|
||||
|
||||
return courses[0], err
|
||||
}
|
||||
|
||||
func (r *ydbCourseRepository) GetByExternalID(ctx context.Context, id string) (domain.Course, error) {
|
||||
return domain.Course{}, nil
|
||||
}
|
||||
func (ydbCourseRepository) GetByExternalID(ctx context.Context, id string) (domain.Course, error) {
|
||||
|
||||
func (r *ydbCourseRepository) Create(context.Context, domain.CreateCourseParams) (domain.Course, error) {
|
||||
return domain.Course{}, nil
|
||||
}
|
||||
func (ydbCourseRepository) Create(context.Context, domain.CreateCourseParams) (domain.Course, error) {
|
||||
return domain.Course{}, nil
|
||||
}
|
||||
func (ydbCourseRepository) Delete(ctx context.Context, id string) error {
|
||||
|
||||
func (r *ydbCourseRepository) Delete(ctx context.Context, id string) error {
|
||||
return nil
|
||||
}
|
||||
func (ydbCourseRepository) Close() error {
|
||||
return nil
|
||||
|
||||
func (r *ydbCourseRepository) CreateCourseTable(ctx context.Context) error {
|
||||
return r.db.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
|
||||
return s.CreateTable(
|
||||
ctx,
|
||||
path.Join(r.db.Name(), "courses"),
|
||||
options.WithColumn("id", types.TypeString),
|
||||
options.WithColumn("external_id", types.Optional(types.TypeText)),
|
||||
options.WithColumn("name", types.TypeText),
|
||||
options.WithColumn("source_type", types.TypeText),
|
||||
options.WithColumn("source_name", types.Optional(types.TypeText)),
|
||||
options.WithColumn("organization_id", types.TypeText),
|
||||
options.WithColumn("origin_link", types.TypeText),
|
||||
options.WithColumn("image_link", types.TypeText),
|
||||
options.WithColumn("description", types.TypeText),
|
||||
options.WithColumn("full_price", types.TypeFloat),
|
||||
options.WithColumn("discount", types.TypeFloat),
|
||||
options.WithColumn("duration", types.TypeInterval),
|
||||
options.WithColumn("starts_at", types.TypeDatetime),
|
||||
options.WithColumn("created_at", types.TypeDatetime),
|
||||
options.WithColumn("updated_at", types.TypeDatetime),
|
||||
options.WithColumn("deleted_at", types.Optional(types.TypeDatetime)),
|
||||
options.WithPrimaryKeyColumn("id"),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
type courseDB struct {
|
||||
ID string
|
||||
ExternalID *string
|
||||
Name string
|
||||
SourceType string
|
||||
SourceName *string
|
||||
OrganizationID string
|
||||
OriginLink string
|
||||
ImageLink string
|
||||
Description string
|
||||
FullPrice float64
|
||||
Discount float64
|
||||
Duration time.Duration
|
||||
StartAt time.Time
|
||||
CreatedAt time.Time
|
||||
UpdatedAt time.Time
|
||||
DeletedAt *time.Time
|
||||
}
|
||||
|
||||
func (c *courseDB) getNamedValues() []named.Value {
|
||||
return []named.Value{
|
||||
named.Required("id", &c.ID),
|
||||
named.Optional("external_id", &c.ExternalID),
|
||||
named.Required("source_type", &c.SourceType),
|
||||
named.Optional("source_name", &c.SourceName),
|
||||
named.Required("organization_id", &c.OrganizationID),
|
||||
named.Required("origin_link", &c.OriginLink),
|
||||
named.Required("image_link", &c.ImageLink),
|
||||
named.Required("description", &c.Description),
|
||||
named.Required("duration", &c.Duration),
|
||||
named.Required("starts_at", &c.StartAt),
|
||||
named.Required("created_at", &c.CreatedAt),
|
||||
named.Required("updated_at", &c.UpdatedAt),
|
||||
named.Optional("deleted_at", &c.DeletedAt),
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
sourceTypeUnknown = ""
|
||||
sourceTypeManual = "m"
|
||||
sourceTypeParsed = "p"
|
||||
)
|
||||
|
||||
func mapCourseDB(cdb courseDB) domain.Course {
|
||||
var st domain.SourceType
|
||||
switch cdb.SourceType {
|
||||
case sourceTypeUnknown:
|
||||
st = domain.SourceTypeUnset
|
||||
case sourceTypeManual:
|
||||
st = domain.SourceTypeManual
|
||||
case sourceTypeParsed:
|
||||
st = domain.SourceTypeParsed
|
||||
}
|
||||
|
||||
return domain.Course{
|
||||
ID: cdb.ID,
|
||||
ExternalID: nullable.NewValuePtr(cdb.ExternalID),
|
||||
Name: cdb.Name,
|
||||
SourceType: st,
|
||||
SourceName: nullable.NewValuePtr(cdb.SourceName),
|
||||
OrganizationID: cdb.OrganizationID,
|
||||
OriginLink: cdb.OriginLink,
|
||||
ImageLink: cdb.ImageLink,
|
||||
Description: cdb.Description,
|
||||
FullPrice: cdb.FullPrice,
|
||||
Discount: cdb.Discount,
|
||||
Duration: cdb.Duration,
|
||||
StartsAt: cdb.StartAt,
|
||||
CreatedAt: cdb.CreatedAt,
|
||||
UpdatedAt: cdb.UpdatedAt,
|
||||
DeletedAt: nullable.NewValuePtr(cdb.DeletedAt),
|
||||
}
|
||||
}
|
||||
|
||||
@ -28,7 +28,6 @@ type Course struct {
|
||||
FullPrice float64
|
||||
// Discount for the course.
|
||||
Discount float64
|
||||
Keywords []string
|
||||
|
||||
// Duration for the course. It will be splitted in values like:
|
||||
// full month / full day / full hour.
|
||||
|
||||
43
internal/kurious/ports/cron.go
Normal file
43
internal/kurious/ports/cron.go
Normal file
@ -0,0 +1,43 @@
|
||||
package ports
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
|
||||
"git.loyso.art/frx/kurious/internal/common/xlog"
|
||||
"git.loyso.art/frx/kurious/internal/kurious/service"
|
||||
|
||||
"github.com/robfig/cron/v3"
|
||||
)
|
||||
|
||||
type BackgroundParser struct {
|
||||
scheduler *cron.Cron
|
||||
}
|
||||
|
||||
func NewBackgroundParser(ctx context.Context, svc service.Application, log *slog.Logger) *BackgroundParser {
|
||||
clog := xlog.WrapSLogger(ctx, log)
|
||||
scheduler := cron.New(cron.WithSeconds(), cron.WithChain(
|
||||
cron.Recover(clog),
|
||||
))
|
||||
|
||||
bp := &BackgroundParser{
|
||||
scheduler: scheduler,
|
||||
}
|
||||
|
||||
return bp
|
||||
}
|
||||
|
||||
func (bp *BackgroundParser) Run() {
|
||||
bp.scheduler.Run()
|
||||
}
|
||||
|
||||
func (bp *BackgroundParser) Shutdown(ctx context.Context) error {
|
||||
sdctx := bp.scheduler.Stop()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-sdctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -14,7 +14,8 @@ import (
|
||||
)
|
||||
|
||||
type ApplicationConfig struct {
|
||||
LogConfig config.LogConfig
|
||||
LogConfig config.Log
|
||||
YDB config.YDB
|
||||
}
|
||||
|
||||
type Application struct {
|
||||
@ -26,11 +27,13 @@ type Application struct {
|
||||
|
||||
func NewApplication(ctx context.Context, cfg ApplicationConfig) (Application, error) {
|
||||
log := config.NewSLogger(cfg.LogConfig)
|
||||
courseadapter, err := adapters.NewYDBCourseRepository()
|
||||
ydbConnection, err := adapters.NewYDBConnection(ctx, cfg.YDB, log.With(slog.String("db", "ydb")))
|
||||
if err != nil {
|
||||
return Application{}, fmt.Errorf("making ydb course repository: %w", err)
|
||||
return Application{}, fmt.Errorf("making ydb connection: %w", err)
|
||||
}
|
||||
|
||||
courseadapter := ydbConnection.CourseRepository()
|
||||
|
||||
application := app.Application{
|
||||
Commands: app.Commands{
|
||||
InsertCourse: command.NewCreateCourseHandler(courseadapter, log),
|
||||
@ -43,7 +46,7 @@ func NewApplication(ctx context.Context, cfg ApplicationConfig) (Application, er
|
||||
}
|
||||
|
||||
out := Application{Application: application}
|
||||
out.closers = append(out.closers, courseadapter)
|
||||
out.closers = append(out.closers, ydbConnection)
|
||||
out.log = log
|
||||
|
||||
return out, nil
|
||||
|
||||
Reference in New Issue
Block a user