Files
devsim/internal/store/mongo/store.go

159 lines
3.8 KiB
Go

package mongo
import (
"context"
"errors"
"fmt"
"sync"
"time"
"git.loyso.art/frx/devsim/internal/entities"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
)
func Dial(ctx context.Context, uri string) (*repository, error) {
opts := options.Client().ApplyURI(uri)
if err := opts.Validate(); err != nil {
return nil, fmt.Errorf("validating config: %w", err)
}
client, err := mongo.Connect(ctx, opts)
if err != nil {
return nil, fmt.Errorf("connecting to mongo: %w", err)
}
err = client.Ping(ctx, readpref.Primary())
if err != nil {
return nil, fmt.Errorf("pinging mongo database: %w", err)
}
updateOptions := options.Update().SetUpsert(true)
return &repository{
client: client,
updateOptions: updateOptions,
}, nil
}
type repository struct {
client *mongo.Client
updateOptions *options.UpdateOptions
}
func (r *repository) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
return r.client.Disconnect(ctx)
}
func (r *repository) StatsRepository() statsRepository {
return statsRepository{
collection: r.client.Database("bench").Collection("device_stats"),
updateOptions: r.updateOptions,
}
}
type statsRepository struct {
collection *mongo.Collection
updateOptions *options.UpdateOptions
}
type deviceStatsDB struct {
UpdatedAt time.Time `bson:"updated_at"`
DeviceID string `bson:"_id"`
IncomingTraffic int `bson:"inc_traffic"`
OutgoinfTraffic int `bson:"out_traffic"`
IncomingRPS int `bson:"inc_rps"`
ReadRPS int `bson:"read_rps"`
WriteRPS int `bson:"write_rps"`
}
func (s deviceStatsDB) asDomain() entities.DeviceStatistics {
return entities.DeviceStatistics{
ID: entities.DeviceID(s.DeviceID),
IncomingTrafficBytes: s.IncomingTraffic,
OutgoingTrafficBytes: s.OutgoinfTraffic,
IncomingRPS: s.IncomingRPS,
ReadRPS: s.ReadRPS,
WriteRPS: s.WriteRPS,
UpdatedAt: s.UpdatedAt,
}
}
var bsonSyncPool = sync.Pool{
New: func() any {
m := make(bson.M, 1)
return &m
},
}
func getPoolD() *bson.M {
return bsonSyncPool.Get().(*bson.M)
}
func putPoolD(m *bson.M) {
bsonSyncPool.Put(m)
}
type filterByID struct {
ID entities.DeviceID `bson:"_id"`
}
func (r statsRepository) Upsert(ctx context.Context, stats entities.DeviceStatistics) error {
document := deviceStatsDB{
DeviceID: string(stats.ID),
IncomingTraffic: stats.IncomingTrafficBytes,
OutgoinfTraffic: stats.OutgoingTrafficBytes,
IncomingRPS: stats.IncomingRPS,
ReadRPS: stats.ReadRPS,
WriteRPS: stats.WriteRPS,
UpdatedAt: time.Now(),
}
// updatePtr := getPoolD()
// update := *updatePtr
// defer putPoolD(updatePtr)
update := bson.M{"$set": document}
_, err := r.collection.UpdateOne(ctx, filterByID{ID: stats.ID}, update, r.updateOptions)
if err != nil {
return fmt.Errorf("inserting: %w", err)
}
return nil
}
func (r statsRepository) List(ctx context.Context) (out []entities.DeviceStatistics, err error) {
count, err := r.collection.EstimatedDocumentCount(ctx)
if err != nil {
return nil, fmt.Errorf("getting estimated document count: %w", err)
}
out = make([]entities.DeviceStatistics, 0, count)
cursor, err := r.collection.Find(ctx, bson.D{})
if err != nil {
return nil, fmt.Errorf("finding documents: %w", err)
}
defer func() {
errClose := cursor.Close(ctx)
err = errors.Join(err, errClose)
}()
for cursor.Next(ctx) {
var stat deviceStatsDB
err = cursor.Decode(&stat)
if err != nil {
return nil, fmt.Errorf("decoding item: %w", err)
}
out = append(out, stat.asDomain())
}
return out, nil
}