Files
kurious/internal/kurious/adapters/sqlite_course_repository.go
Aleksandr Trushkin 605e117586 add pagination
2024-04-07 23:49:06 +03:00

445 lines
12 KiB
Go

package adapters
import (
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
"strings"
"time"
"git.loyso.art/frx/kurious/internal/common/config"
"git.loyso.art/frx/kurious/internal/common/nullable"
"git.loyso.art/frx/kurious/internal/common/xcontext"
"git.loyso.art/frx/kurious/internal/kurious/domain"
"git.loyso.art/frx/kurious/migrations/sqlite"
"git.loyso.art/frx/kurious/pkg/xdefault"
"github.com/jmoiron/sqlx"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
_ "modernc.org/sqlite"
)
var sqliteTracer = otel.Tracer("sqlite")
type sqliteConnection struct {
db *sqlx.DB
shutdownTimeout time.Duration
log *slog.Logger
}
func NewSqliteConnection(ctx context.Context, cfg config.Sqlite, log *slog.Logger) (*sqliteConnection, error) {
conn, err := sqlx.Open("sqlite", cfg.DSN)
if err != nil {
return nil, fmt.Errorf("openning db connection: %w", err)
}
err = sqlite.RunMigrations(ctx, conn.DB, log)
if err != nil {
return nil, fmt.Errorf("running migrations: %w", err)
}
return &sqliteConnection{
db: conn,
log: log,
shutdownTimeout: xdefault.WithFallback(cfg.ShutdownTimeout, defaultShutdownTimeout),
}, nil
}
func (c *sqliteConnection) Close() error {
_, cancel := context.WithTimeout(context.Background(), c.shutdownTimeout)
defer cancel()
return c.db.Close()
}
func (c *sqliteConnection) CourseRepository() *sqliteCourseRepository {
return &sqliteCourseRepository{
db: c.db,
log: c.log.With(slog.String("repository", "course")),
}
}
type sqliteCourseRepository struct {
db *sqlx.DB
log *slog.Logger
}
func (r *sqliteCourseRepository) List(
ctx context.Context,
params domain.ListCoursesParams,
) (result domain.ListCoursesResult, err error) {
const queryTemplate = `SELECT %s from courses WHERE 1=1`
ctx, span := sqliteTracer.Start(ctx, "sqlite.list")
defer func() {
if err != nil {
span.RecordError(err)
}
span.End()
}()
if params.NextPageToken != "" && params.Offset > 0 {
panic("could not use next_page_token and offset at the same time")
}
query := fmt.Sprintf(queryTemplate, coursesFieldsStr)
args := make([]any, 0, 6)
if params.LearningType != "" {
args = append(args, params.LearningType)
query += " AND learning_type = ?"
}
if params.CourseThematic != "" {
args = append(args, params.CourseThematic)
query += " AND course_thematic = ?"
}
if params.OrganizationID != "" {
args = append(args, params.OrganizationID)
query += " AND organization_id = ?"
}
if params.NextPageToken != "" {
args = append(args, params.NextPageToken)
query += " AND id > ?"
}
query += " ORDER BY course_thematic, learning_type, id ASC"
if params.Limit > 0 {
query += " LIMIT ?"
args = append(args, params.Limit)
}
if params.Offset > 0 {
query += " OFFSET ?"
args = append(args, params.Offset)
}
span.SetAttributes(
attribute.String("query", query),
)
scanF := func(s rowsScanner) (err error) {
var cdb sqliteCourseDB
err = s.StructScan(&cdb)
if err != nil {
return err
}
result.Courses = append(result.Courses, cdb.AsDomain())
return nil
}
err = scanRows(ctx, r.db, scanF, query, args...)
if err != nil {
return result, err
}
if params.Limit > 0 && len(result.Courses) == params.Limit {
lastIDx := len(result.Courses) - 1
result.NextPageToken = result.Courses[lastIDx].ID
}
result.Count, err = r.listCount(ctx, params)
if err != nil {
xcontext.LogWithWarnError(ctx, r.log, err, "unable to list count")
}
span.SetAttributes(
attribute.Int("items_count", len(result.Courses)),
attribute.Int("total_items", result.Count),
)
return result, nil
}
func (r *sqliteCourseRepository) ListLearningTypes(
ctx context.Context,
) (result domain.ListLearningTypeResult, err error) {
const query = "SELECT DISTINCT learning_type FROM courses"
err = r.db.SelectContext(ctx, &result.LearningTypeIDs, query)
if err != nil {
return result, fmt.Errorf("executing query: %w", err)
}
return result, nil
}
func (r *sqliteCourseRepository) ListCourseThematics(
ctx context.Context,
params domain.ListCourseThematicsParams,
) (result domain.ListCourseThematicsResult, err error) {
const queryTemplate = "SELECT DISTINCT course_thematic FROM courses WHERE 1=1"
query := queryTemplate
args := make([]any, 0, 1)
if params.LearningTypeID != "" {
args = append(args, params.LearningTypeID)
query += " AND learning_type = ?"
}
err = r.db.SelectContext(ctx, &result.CourseThematicIDs, query, args...)
if err != nil {
return result, fmt.Errorf("executing query: %w", err)
}
return result, nil
}
func (r *sqliteCourseRepository) Get(
ctx context.Context,
id string,
) (course domain.Course, err error) {
const queryTemplate = `SELECT %s FROM courses WHERE id = ?`
query := fmt.Sprintf(queryTemplate, coursesFieldsStr)
var courseDB sqliteCourseDB
err = r.db.GetContext(ctx, &courseDB, query, id)
if err != nil {
return course, fmt.Errorf("executing query: %w", err)
}
return courseDB.AsDomain(), nil
}
func (r *sqliteCourseRepository) GetByExternalID(
ctx context.Context, id string,
) (course domain.Course, err error) {
return course, errors.New("not implemented")
}
func (r *sqliteCourseRepository) CreateBatch(ctx context.Context, params ...domain.CreateCourseParams) error {
tx, err := r.db.BeginTxx(ctx, &sql.TxOptions{Isolation: sql.LevelDefault})
if err != nil {
return fmt.Errorf("beginning tx: %w", err)
}
defer func() {
var errTx error
if err != nil {
errTx = tx.Rollback()
} else {
errTx = tx.Commit()
}
err = errors.Join(err, errTx)
}()
const queryTempalate = `INSERT INTO courses` +
` (%s) VALUES (%s)`
placeholders := strings.TrimSuffix(strings.Repeat("?,", len(coursesFields)), ",")
query := fmt.Sprintf(queryTempalate, coursesFieldsStr, placeholders)
stmt, err := tx.PrepareContext(ctx, query)
if err != nil {
return fmt.Errorf("preparing statement: %w", err)
}
for _, param := range params {
_, err := stmt.ExecContext(ctx, createCourseParamsAsValues(param)...)
if err != nil {
return fmt.Errorf("executing statement query: %w", err)
}
}
return nil
}
func (r *sqliteCourseRepository) Create(ctx context.Context, params domain.CreateCourseParams) (domain.Course, error) {
err := r.CreateBatch(ctx, params)
return domain.Course{}, err
}
func (r *sqliteCourseRepository) UpdateCourseDescription(ctx context.Context, id, description string) error {
return errors.New("unimplemented")
}
func (r *sqliteCourseRepository) Delete(ctx context.Context, id string) error {
return errors.New("unimplemented")
}
func (r *sqliteCourseRepository) listCount(ctx context.Context, params domain.ListCoursesParams) (count int, err error) {
const queryTemplate = `SELECT COUNT(id) FROM courses WHERE 1=1`
ctx, span := sqliteTracer.Start(ctx, "sqlite.listCount")
defer func() {
if err != nil {
span.RecordError(err)
}
span.End()
}()
query := queryTemplate
args := make([]any, 0, 6)
if params.LearningType != "" {
args = append(args, params.LearningType)
query += " AND learning_type = ?"
}
if params.CourseThematic != "" {
args = append(args, params.CourseThematic)
query += " AND course_thematic = ?"
}
if params.OrganizationID != "" {
args = append(args, params.OrganizationID)
query += " AND organization_id = ?"
}
err = r.db.GetContext(ctx, &count, query, args...)
if err != nil {
return count, fmt.Errorf("sending query: %w", err)
}
return count, nil
}
type rowsScanner interface {
sqlx.ColScanner
StructScan(dest any) error
}
func scanRows(ctx context.Context, db *sqlx.DB, f func(rowsScanner) error, query string, args ...any) error {
rows, err := db.QueryxContext(ctx, query, args...)
if err != nil {
return fmt.Errorf("querying rows: %w", err)
}
defer func() {
err = errors.Join(err, rows.Close())
}()
for rows.Next() {
err = f(rows)
if err != nil {
return fmt.Errorf("scanning row: %w", err)
}
}
if err = rows.Err(); err != nil {
return fmt.Errorf("checking rows for errors: %w", err)
}
return nil
}
func createCourseParamsAsValues(params domain.CreateCourseParams) []any {
now := time.Now()
return []any{
params.ID,
nullableValueAsString(params.ExternalID),
mapSourceTypeFromDomain(params.SourceType),
nullableValueAsString(params.SourceName),
params.CourseThematic,
params.LearningType,
params.OrganizationID,
params.OriginLink,
params.ImageLink,
params.Name,
params.Description,
params.FullPrice,
params.Discount,
params.Duration.Truncate(time.Second).Milliseconds() / 1000,
params.StartsAt,
now,
now,
sql.NullTime{},
}
}
type sqliteCourseDB struct {
ID string `db:"id"`
ExternalID sql.NullString `db:"external_id"`
SourceType string `db:"source_type"`
SourceName sql.NullString `db:"source_name"`
ThematicID string `db:"course_thematic"`
LearningTypeID string `db:"learning_type"`
OrganizationID string `db:"organization_id"`
OriginLink string `db:"origin_link"`
ImageLink string `db:"image_link"`
Name string `db:"name"`
Description string `db:"description"`
FullPrice float64 `db:"full_price"`
Discount float64 `db:"discount"`
Duration int64 `db:"duration"`
CreatedAt time.Time `db:"created_at"`
StartsAt sql.NullTime `db:"starts_at"`
UpdatedAt time.Time `db:"updated_at"`
DeletedAt sql.NullTime `db:"deleted_at"`
}
func nullStringAsDomain(s sql.NullString) nullable.Value[string] {
if s.Valid {
return nullable.NewValue(s.String)
}
return nullable.Value[string]{}
}
func nullTimeAsDomain(s sql.NullTime) nullable.Value[time.Time] {
if s.Valid {
return nullable.NewValue(s.Time)
}
return nullable.Value[time.Time]{}
}
func nullableValueAsString(v nullable.Value[string]) sql.NullString {
return sql.NullString{
Valid: v.Valid(),
String: v.Value(),
}
}
func nullableValueAsTime(v nullable.Value[time.Time]) sql.NullTime {
return sql.NullTime{
Valid: v.Valid(),
Time: v.Value(),
}
}
func (c sqliteCourseDB) AsDomain() domain.Course {
return domain.Course{
ID: c.ID,
OrganizationID: c.OrganizationID,
OriginLink: c.OriginLink,
ImageLink: c.ImageLink,
Name: c.Name,
Description: c.Description,
FullPrice: c.FullPrice,
Discount: c.Discount,
ThematicID: c.ThematicID,
LearningTypeID: c.LearningTypeID,
Duration: time.Second * time.Duration(c.Duration),
StartsAt: c.StartsAt.Time,
CreatedAt: c.CreatedAt,
UpdatedAt: c.UpdatedAt,
ExternalID: nullStringAsDomain(c.ExternalID),
SourceType: mapSourceTypeToDomain(c.SourceType),
SourceName: nullStringAsDomain(c.SourceName),
DeletedAt: nullTimeAsDomain(c.DeletedAt),
}
}
func (c *sqliteCourseDB) FromDomain(d domain.Course) {
*c = sqliteCourseDB{
ID: d.ID,
OrganizationID: d.OrganizationID,
OriginLink: d.OriginLink,
ImageLink: d.ImageLink,
Name: d.Name,
Description: d.Description,
FullPrice: d.FullPrice,
Discount: d.Discount,
ThematicID: d.ThematicID,
LearningTypeID: d.LearningTypeID,
SourceType: mapSourceTypeFromDomain(d.SourceType),
Duration: d.Duration.Truncate(time.Second).Milliseconds() / 1000,
CreatedAt: d.CreatedAt,
UpdatedAt: d.UpdatedAt,
ExternalID: nullableValueAsString(d.ExternalID),
SourceName: nullableValueAsString(d.SourceName),
DeletedAt: nullableValueAsTime(d.DeletedAt),
StartsAt: sql.NullTime{
Time: d.StartsAt,
Valid: true,
},
}
}