588 lines
15 KiB
Go
588 lines
15 KiB
Go
package adapters
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"path"
|
|
"strings"
|
|
"text/template"
|
|
"time"
|
|
|
|
"git.loyso.art/frx/kurious/internal/common/config"
|
|
"git.loyso.art/frx/kurious/internal/common/errors"
|
|
"git.loyso.art/frx/kurious/internal/common/nullable"
|
|
"git.loyso.art/frx/kurious/internal/common/xcontext"
|
|
"git.loyso.art/frx/kurious/internal/kurious/domain"
|
|
"git.loyso.art/frx/kurious/pkg/xdefault"
|
|
|
|
"github.com/ydb-platform/ydb-go-sdk/v3"
|
|
"github.com/ydb-platform/ydb-go-sdk/v3/table"
|
|
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
|
|
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
|
|
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
|
|
yc "github.com/ydb-platform/ydb-go-yc"
|
|
)
|
|
|
|
const (
|
|
defaultShutdownTimeout = time.Second * 10
|
|
)
|
|
|
|
type YDBConnection struct {
|
|
*ydb.Driver
|
|
|
|
log *slog.Logger
|
|
shutdownTimeout time.Duration
|
|
}
|
|
|
|
func NewYDBConnection(ctx context.Context, cfg config.YDB, log *slog.Logger) (*YDBConnection, error) {
|
|
opts := make([]ydb.Option, 0, 2)
|
|
switch auth := cfg.Auth.(type) {
|
|
case config.YCAuthIAMToken:
|
|
opts = append(opts, ydb.WithAccessTokenCredentials(auth.Token))
|
|
case config.YCAuthCAKeysFile:
|
|
opts = append(opts,
|
|
yc.WithInternalCA(),
|
|
yc.WithServiceAccountKeyFileCredentials(auth.Path),
|
|
)
|
|
}
|
|
db, err := ydb.Open(
|
|
ctx,
|
|
cfg.DSN,
|
|
opts...,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("opening connection: %w", err)
|
|
}
|
|
|
|
return &YDBConnection{
|
|
Driver: db,
|
|
shutdownTimeout: xdefault.WithFallback(cfg.ShutdownDuration, defaultShutdownTimeout),
|
|
log: log,
|
|
}, nil
|
|
}
|
|
|
|
func (conn *YDBConnection) Close() error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), conn.shutdownTimeout)
|
|
defer cancel()
|
|
|
|
return conn.Driver.Close(ctx)
|
|
}
|
|
|
|
func (conn *YDBConnection) CourseRepository() *ydbCourseRepository {
|
|
return &ydbCourseRepository{
|
|
db: conn.Driver,
|
|
log: conn.log.With(slog.String("repository", "course")),
|
|
}
|
|
}
|
|
|
|
type ydbCourseRepository struct {
|
|
db *ydb.Driver
|
|
log *slog.Logger
|
|
}
|
|
|
|
func (r *ydbCourseRepository) List(
|
|
ctx context.Context,
|
|
params domain.ListCoursesParams,
|
|
) (result domain.ListCoursesResult, err error) {
|
|
const limit = 1000
|
|
const queryName = "list"
|
|
// const query = `
|
|
// DECLARE $limit AS Int32;
|
|
// DECLARE $id AS Text;
|
|
// SELECT
|
|
// id,
|
|
// external_id,
|
|
// source_type,
|
|
// source_name,
|
|
// course_thematic,
|
|
// learning_type,
|
|
// organization_id,
|
|
// origin_link,
|
|
// image_link,
|
|
// name,
|
|
// description,
|
|
// full_price,
|
|
// discount,
|
|
// duration,
|
|
// starts_at,
|
|
// created_at,
|
|
// updated_at,
|
|
// deleted_at
|
|
// FROM
|
|
// courses
|
|
// WHERE
|
|
// id > $id
|
|
// ORDER BY id
|
|
// LIMIT $limit;`
|
|
//
|
|
const fields = `id, external_id, source_type, source_name, course_thematic, learning_type, organization_id, origin_link, image_link, name, description, full_price, discount, duration, starts_at, created_at, updated_at, deleted_at`
|
|
|
|
if params.Limit == 0 {
|
|
params.Limit = limit
|
|
}
|
|
|
|
qtParams := queryTemplateParams{
|
|
Fields: fields,
|
|
Table: "courses",
|
|
Suffix: "ORDER BY id\nLIMIT $limit",
|
|
Declares: []queryTemplateDeclaration{{
|
|
Name: "limit",
|
|
Type: "Int32",
|
|
}, {
|
|
Name: "id",
|
|
Type: "Text",
|
|
}},
|
|
Conditions: []string{
|
|
"id > $id",
|
|
},
|
|
}
|
|
|
|
options := make([]table.ParameterOption, 0, 4)
|
|
appendParams := func(name string, value string) {
|
|
if value == "" {
|
|
return
|
|
}
|
|
|
|
ydbvalue := types.TextValue(value)
|
|
d := queryTemplateDeclaration{
|
|
Name: name,
|
|
Type: ydbvalue.Type().String(),
|
|
}
|
|
qtParams.Declares = append(qtParams.Declares, d)
|
|
qtParams.Conditions = append(qtParams.Conditions, d.Name+"="+d.Arg())
|
|
options = append(options, table.ValueParam(d.Arg(), ydbvalue))
|
|
}
|
|
appendParams("course_thematic", params.CourseThematic)
|
|
appendParams("learning_type", params.LearningType)
|
|
|
|
var sb strings.Builder
|
|
err = template.Must(template.New("").Parse(queryTemplateSelect)).Execute(&sb, qtParams)
|
|
if err != nil {
|
|
return result, fmt.Errorf("executing template: %w", err)
|
|
}
|
|
|
|
query := sb.String()
|
|
|
|
courses := make([]domain.Course, 0, 1_000)
|
|
readTx := table.TxControl(
|
|
table.BeginTx(
|
|
table.WithOnlineReadOnly(),
|
|
),
|
|
table.CommitTx(),
|
|
)
|
|
err = r.db.Table().Do(
|
|
ctx,
|
|
func(ctx context.Context, s table.Session) error {
|
|
start := time.Now()
|
|
defer func() {
|
|
since := time.Since(start)
|
|
xcontext.LogInfo(
|
|
ctx, r.log,
|
|
"executed query",
|
|
slog.String("name", queryName),
|
|
slog.Duration("elapsed", since),
|
|
)
|
|
}()
|
|
|
|
var lastKnownID string
|
|
for {
|
|
queryParams := table.NewQueryParameters(
|
|
table.ValueParam("$limit", types.Int32Value(limit)),
|
|
table.ValueParam("$id", types.TextValue(lastKnownID)),
|
|
)
|
|
_, res, err := s.Execute(
|
|
ctx, readTx, query, queryParams,
|
|
options.WithCollectStatsModeBasic(),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("executing: %w", err)
|
|
}
|
|
|
|
if !res.NextResultSet(ctx) || !res.HasNextRow() {
|
|
break
|
|
}
|
|
|
|
for res.NextRow() {
|
|
var cdb courseDB
|
|
err = res.ScanNamed(cdb.getNamedValues()...)
|
|
if err != nil {
|
|
return fmt.Errorf("scanning row: %w", err)
|
|
}
|
|
|
|
courses = append(courses, mapCourseDB(cdb))
|
|
}
|
|
if err = res.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
lastKnownID = courses[len(courses)-1].ID
|
|
}
|
|
return nil
|
|
},
|
|
table.WithIdempotent())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return courses, err
|
|
}
|
|
|
|
func (r *ydbCourseRepository) Get(ctx context.Context, id string) (course domain.Course, err error) {
|
|
const queryName = "get"
|
|
|
|
courses := make([]domain.Course, 0, 1)
|
|
readTx := table.TxControl(
|
|
table.BeginTx(
|
|
table.WithOnlineReadOnly(),
|
|
),
|
|
table.CommitTx(),
|
|
)
|
|
err = r.db.Table().Do(
|
|
ctx,
|
|
func(ctx context.Context, s table.Session) error {
|
|
start := time.Now()
|
|
defer func() {
|
|
since := time.Since(start)
|
|
xcontext.LogInfo(
|
|
ctx, r.log,
|
|
"executed query",
|
|
slog.String("name", queryName),
|
|
slog.Duration("elapsed", since),
|
|
)
|
|
}()
|
|
|
|
_, res, err := s.Execute(
|
|
ctx,
|
|
readTx,
|
|
`
|
|
DECLARE $id AS Text;
|
|
SELECT
|
|
id,
|
|
external_id,
|
|
source_type,
|
|
source_name,
|
|
course_thematic,
|
|
learning_type,
|
|
organization_id,
|
|
origin_link,
|
|
image_link,
|
|
name,
|
|
description,
|
|
full_price,
|
|
discount,
|
|
duration,
|
|
starts_at,
|
|
created_at,
|
|
updated_at,
|
|
deleted_at
|
|
FROM
|
|
courses
|
|
WHERE
|
|
id = $id;
|
|
`,
|
|
table.NewQueryParameters(
|
|
table.ValueParam("$id", types.TextValue(id)),
|
|
),
|
|
options.WithCollectStatsModeBasic(),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("executing: %w", err)
|
|
}
|
|
|
|
for res.NextResultSet(ctx) {
|
|
for res.NextRow() {
|
|
var cdb courseDB
|
|
_ = res.ScanNamed(cdb.getNamedValues()...)
|
|
courses = append(courses, mapCourseDB(cdb))
|
|
}
|
|
}
|
|
if err = res.Err(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
},
|
|
table.WithIdempotent())
|
|
if err != nil {
|
|
return domain.Course{}, err
|
|
}
|
|
|
|
if len(courses) == 0 {
|
|
return course, errors.ErrNotFound
|
|
}
|
|
|
|
return courses[0], err
|
|
}
|
|
|
|
func (r *ydbCourseRepository) GetByExternalID(ctx context.Context, id string) (domain.Course, error) {
|
|
return domain.Course{}, nil
|
|
}
|
|
|
|
func createCourseParamsAsStruct(params domain.CreateCourseParams) types.Value {
|
|
st := mapSourceTypeFromDomain(params.SourceType)
|
|
now := time.Now()
|
|
return types.StructValue(
|
|
types.StructFieldValue("id", types.TextValue(params.ID)),
|
|
types.StructFieldValue("name", types.TextValue(params.Name)),
|
|
types.StructFieldValue("source_type", types.TextValue(st)),
|
|
types.StructFieldValue("source_name", types.NullableTextValue(params.SourceName.ValutPtr())),
|
|
types.StructFieldValue("course_thematic", types.TextValue(params.CourseThematic)),
|
|
types.StructFieldValue("learning_type", types.TextValue(params.LearningType)),
|
|
types.StructFieldValue("external_id", types.NullableTextValue(params.ExternalID.ValutPtr())),
|
|
types.StructFieldValue("organization_id", types.TextValue(params.OrganizationID)),
|
|
types.StructFieldValue("origin_link", types.TextValue(params.OriginLink)),
|
|
types.StructFieldValue("image_link", types.TextValue(params.ImageLink)),
|
|
types.StructFieldValue("description", types.TextValue(params.Description)),
|
|
types.StructFieldValue("full_price", types.DoubleValue(params.FullPrice)),
|
|
types.StructFieldValue("discount", types.DoubleValue(params.Discount)),
|
|
types.StructFieldValue("duration", types.IntervalValueFromDuration(params.Duration)),
|
|
types.StructFieldValue("starts_at", types.DatetimeValueFromTime(params.StartsAt)),
|
|
types.StructFieldValue("created_at", types.DatetimeValueFromTime(now)),
|
|
types.StructFieldValue("updated_at", types.DatetimeValueFromTime(now)),
|
|
types.StructFieldValue("deleted_at", types.NullableDatetimeValue(nil)),
|
|
)
|
|
}
|
|
|
|
func (r *ydbCourseRepository) CreateBatch(ctx context.Context, params ...domain.CreateCourseParams) error {
|
|
// -- PRAGMA TablePathPrefix("courses");
|
|
const upsertQuery = `DECLARE $courseData AS List<Struct<
|
|
id: Text,
|
|
external_id: Optional<Text>,
|
|
name: Text,
|
|
source_type: Text,
|
|
source_name: Optional<Text>,
|
|
course_thematic: Text,
|
|
learning_type: Text,
|
|
organization_id: Text,
|
|
origin_link: Text,
|
|
image_link: Text,
|
|
description: Text,
|
|
full_price: Double,
|
|
discount: Double,
|
|
duration: Interval,
|
|
starts_at: Datetime,
|
|
created_at: Datetime,
|
|
updated_at: Datetime,
|
|
deleted_at: Optional<Datetime>>>;
|
|
|
|
REPLACE INTO
|
|
courses
|
|
SELECT
|
|
id,
|
|
external_id,
|
|
name,
|
|
source_type,
|
|
source_name,
|
|
course_thematic,
|
|
learning_type,
|
|
organization_id,
|
|
origin_link,
|
|
image_link,
|
|
description,
|
|
full_price,
|
|
discount,
|
|
duration,
|
|
starts_at,
|
|
created_at,
|
|
updated_at,
|
|
deleted_at
|
|
FROM AS_TABLE($courseData);`
|
|
|
|
writeTx := table.TxControl(
|
|
table.BeginTx(
|
|
table.WithSerializableReadWrite(),
|
|
),
|
|
table.CommitTx(),
|
|
)
|
|
err := r.db.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
|
|
listValues := mapSlice(params, createCourseParamsAsStruct)
|
|
queryParams := table.NewQueryParameters(
|
|
table.ValueParam("$courseData", types.ListValue(listValues...)),
|
|
)
|
|
_, _, err := s.Execute(ctx, writeTx, upsertQuery, queryParams)
|
|
if err != nil {
|
|
return fmt.Errorf("executing query: %w", err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
func (r *ydbCourseRepository) Create(ctx context.Context, params domain.CreateCourseParams) (domain.Course, error) {
|
|
err := r.CreateBatch(ctx, params)
|
|
if err != nil {
|
|
return domain.Course{}, err
|
|
}
|
|
|
|
return domain.Course{}, nil
|
|
}
|
|
|
|
func (r *ydbCourseRepository) Delete(ctx context.Context, id string) error {
|
|
return nil
|
|
}
|
|
|
|
func (r *ydbCourseRepository) CreateCourseTable(ctx context.Context) error {
|
|
return r.db.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
|
|
return s.CreateTable(
|
|
ctx,
|
|
path.Join(r.db.Name(), "courses"),
|
|
options.WithColumn("id", types.TypeText),
|
|
options.WithColumn("external_id", types.Optional(types.TypeText)),
|
|
options.WithColumn("name", types.TypeText),
|
|
options.WithColumn("source_type", types.TypeText),
|
|
options.WithColumn("source_name", types.Optional(types.TypeText)),
|
|
options.WithColumn("course_thematic", types.TypeText),
|
|
options.WithColumn("learning_type", types.TypeText),
|
|
options.WithColumn("organization_id", types.TypeText),
|
|
options.WithColumn("origin_link", types.TypeText),
|
|
options.WithColumn("image_link", types.TypeText),
|
|
options.WithColumn("description", types.TypeText),
|
|
options.WithColumn("full_price", types.TypeDouble),
|
|
options.WithColumn("discount", types.TypeDouble),
|
|
options.WithColumn("duration", types.TypeInterval),
|
|
options.WithColumn("starts_at", types.TypeDatetime),
|
|
options.WithColumn("created_at", types.TypeDatetime),
|
|
options.WithColumn("updated_at", types.TypeDatetime),
|
|
options.WithColumn("deleted_at", types.Optional(types.TypeDatetime)),
|
|
options.WithPrimaryKeyColumn("id"),
|
|
)
|
|
})
|
|
}
|
|
|
|
type courseDB struct {
|
|
ID string
|
|
ExternalID *string
|
|
Name string
|
|
SourceType string
|
|
SourceName *string
|
|
CourseThematic string
|
|
LearningType string
|
|
OrganizationID string
|
|
OriginLink string
|
|
ImageLink string
|
|
Description string
|
|
FullPrice float64
|
|
Discount float64
|
|
Duration time.Duration
|
|
StartAt time.Time
|
|
CreatedAt time.Time
|
|
UpdatedAt time.Time
|
|
DeletedAt *time.Time
|
|
}
|
|
|
|
func (c *courseDB) getNamedValues() []named.Value {
|
|
return []named.Value{
|
|
named.Required("id", &c.ID),
|
|
named.Required("name", &c.Name),
|
|
named.Optional("external_id", &c.ExternalID),
|
|
named.Required("source_type", &c.SourceType),
|
|
named.Optional("source_name", &c.SourceName),
|
|
named.Required("course_thematic", &c.CourseThematic),
|
|
named.Required("learning_type", &c.LearningType),
|
|
named.Required("organization_id", &c.OrganizationID),
|
|
named.Required("origin_link", &c.OriginLink),
|
|
named.Required("image_link", &c.ImageLink),
|
|
named.Required("description", &c.Description),
|
|
named.Required("full_price", &c.FullPrice),
|
|
named.Required("discount", &c.Discount),
|
|
named.Required("duration", &c.Duration),
|
|
named.Required("starts_at", &c.StartAt),
|
|
named.Required("created_at", &c.CreatedAt),
|
|
named.Required("updated_at", &c.UpdatedAt),
|
|
named.Optional("deleted_at", &c.DeletedAt),
|
|
}
|
|
}
|
|
|
|
const (
|
|
sourceTypeUnknown = ""
|
|
sourceTypeManual = "m"
|
|
sourceTypeParsed = "p"
|
|
)
|
|
|
|
func mapSourceTypeToDomain(in string) (st domain.SourceType) {
|
|
switch in {
|
|
case sourceTypeUnknown:
|
|
st = domain.SourceTypeUnset
|
|
case sourceTypeManual:
|
|
st = domain.SourceTypeManual
|
|
case sourceTypeParsed:
|
|
st = domain.SourceTypeParsed
|
|
}
|
|
|
|
return st
|
|
}
|
|
|
|
func mapSourceTypeFromDomain(in domain.SourceType) string {
|
|
var st string
|
|
switch in {
|
|
case domain.SourceTypeManual:
|
|
st = sourceTypeManual
|
|
case domain.SourceTypeParsed:
|
|
st = sourceTypeParsed
|
|
default:
|
|
st = sourceTypeUnknown
|
|
}
|
|
|
|
return st
|
|
}
|
|
|
|
func mapCourseDB(cdb courseDB) domain.Course {
|
|
st := mapSourceTypeToDomain(cdb.SourceType)
|
|
|
|
return domain.Course{
|
|
ID: cdb.ID,
|
|
ExternalID: nullable.NewValuePtr(cdb.ExternalID),
|
|
Name: cdb.Name,
|
|
SourceType: st,
|
|
SourceName: nullable.NewValuePtr(cdb.SourceName),
|
|
Thematic: cdb.CourseThematic,
|
|
LearningType: cdb.LearningType,
|
|
OrganizationID: cdb.OrganizationID,
|
|
OriginLink: cdb.OriginLink,
|
|
ImageLink: cdb.ImageLink,
|
|
Description: cdb.Description,
|
|
FullPrice: cdb.FullPrice,
|
|
Discount: cdb.Discount,
|
|
Duration: cdb.Duration,
|
|
StartsAt: cdb.StartAt,
|
|
CreatedAt: cdb.CreatedAt,
|
|
UpdatedAt: cdb.UpdatedAt,
|
|
DeletedAt: nullable.NewValuePtr(cdb.DeletedAt),
|
|
}
|
|
}
|
|
|
|
func mapSlice[T, U any](in []T, f func(T) U) []U {
|
|
out := make([]U, len(in))
|
|
for i, value := range in {
|
|
out[i] = f(value)
|
|
}
|
|
|
|
return out
|
|
}
|
|
|
|
type queryTemplateDeclaration struct {
|
|
Name string
|
|
Type string
|
|
}
|
|
|
|
func (d queryTemplateDeclaration) Arg() string {
|
|
return "$" + d.Name
|
|
}
|
|
|
|
type queryTemplateParams struct {
|
|
Declares []queryTemplateDeclaration
|
|
Fields string
|
|
Table string
|
|
Conditions []string
|
|
Suffix string
|
|
}
|
|
|
|
const queryTemplateSelect = `
|
|
{{ range .Declares }}DECLARE ${{.Name}} AS {{.Type}}\n{{end}}
|
|
SELECT {{.Fields}}
|
|
FROM {{.Table}}
|
|
WHERE {{ range .Conditions }}{{.}}\n{{end}}
|
|
{{.Suffix}}`
|