Files
devsim/internal/store/mongo/store.go

134 lines
3.2 KiB
Go

package mongo
import (
"context"
"errors"
"fmt"
"time"
"git.loyso.art/frx/devsim/internal/entities"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
)
func Dial(ctx context.Context, uri string) (*repository, error) {
opts := options.Client().ApplyURI(uri)
if err := opts.Validate(); err != nil {
return nil, fmt.Errorf("validating config: %w", err)
}
client, err := mongo.Connect(ctx, opts)
if err != nil {
return nil, fmt.Errorf("connecting to mongo: %w", err)
}
err = client.Ping(ctx, readpref.Primary())
if err != nil {
return nil, fmt.Errorf("pinging mongo database: %w", err)
}
return &repository{client: client}, nil
}
type repository struct {
client *mongo.Client
}
func (r *repository) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
return r.client.Disconnect(ctx)
}
func (r *repository) StatsRepository() statsRepository {
return statsRepository{
collection: r.client.Database("bench").Collection("device_stats"),
}
}
type statsRepository struct {
collection *mongo.Collection
}
type deviceStatsDB struct {
DeviceID string `bson:"_id"`
IncomingTraffic int `bson:"inc_traffic"`
OutgoinfTraffic int `bson:"out_traffic"`
IncomingRPS int `bson:"inc_rps"`
ReadRPS int `bson:"read_rps"`
WriteRPS int `bson:"write_rps"`
UpdatedAt time.Time `bson:"updated_at"`
}
func (s deviceStatsDB) asDomain() entities.DeviceStatistics {
return entities.DeviceStatistics{
ID: entities.DeviceID(s.DeviceID),
IncomingTrafficBytes: s.IncomingTraffic,
OutgoingTrafficBytes: s.OutgoinfTraffic,
IncomingRPS: s.IncomingRPS,
ReadRPS: s.ReadRPS,
WriteRPS: s.WriteRPS,
UpdatedAt: s.UpdatedAt,
}
}
func (r statsRepository) Upsert(ctx context.Context, stats entities.DeviceStatistics) error {
opts := options.Update().SetUpsert(true)
filter := bson.D{
{
Key: "_id",
Value: stats.ID,
},
}
document := deviceStatsDB{
DeviceID: string(stats.ID),
IncomingTraffic: stats.IncomingTrafficBytes,
OutgoinfTraffic: stats.OutgoingTrafficBytes,
IncomingRPS: stats.IncomingRPS,
ReadRPS: stats.ReadRPS,
WriteRPS: stats.WriteRPS,
UpdatedAt: time.Now(),
}
_, err := r.collection.UpdateOne(ctx, filter, document, opts)
if err != nil {
return fmt.Errorf("inserting: %w", err)
}
return nil
}
func (r statsRepository) List(ctx context.Context) (out []entities.DeviceStatistics, err error) {
count, err := r.collection.EstimatedDocumentCount(ctx)
if err != nil {
return nil, fmt.Errorf("getting estimated document count: %w", err)
}
out = make([]entities.DeviceStatistics, 0, count)
cursor, err := r.collection.Find(ctx, bson.D{})
if err != nil {
return nil, fmt.Errorf("finding documents: %w", err)
}
defer func() {
errClose := cursor.Close(ctx)
err = errors.Join(err, errClose)
}()
for cursor.Next(ctx) {
var stat deviceStatsDB
err = cursor.Decode(&stat)
if err != nil {
return nil, fmt.Errorf("decoding item: %w", err)
}
out = append(out, stat.asDomain())
}
return out, nil
}