127 lines
3.1 KiB
Go
127 lines
3.1 KiB
Go
package mongo
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"git.loyso.art/frx/devsim/internal/entities"
|
|
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
"go.mongodb.org/mongo-driver/mongo/readpref"
|
|
)
|
|
|
|
func Dial(ctx context.Context, uri string) (*repository, error) {
|
|
opts := options.Client().ApplyURI(uri)
|
|
if err := opts.Validate(); err != nil {
|
|
return nil, fmt.Errorf("validating config: %w", err)
|
|
}
|
|
|
|
client, err := mongo.Connect(ctx, opts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("connecting to mongo: %w", err)
|
|
}
|
|
|
|
err = client.Ping(ctx, readpref.Primary())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("pinging mongo database: %w", err)
|
|
}
|
|
|
|
return &repository{client: client}, nil
|
|
}
|
|
|
|
type repository struct {
|
|
client *mongo.Client
|
|
}
|
|
|
|
func (r *repository) StatsRepository() statsRepository {
|
|
return statsRepository{
|
|
collection: r.client.Database("bench").Collection("device_stats"),
|
|
}
|
|
}
|
|
|
|
type statsRepository struct {
|
|
collection *mongo.Collection
|
|
}
|
|
|
|
type deviceStatsDB struct {
|
|
DeviceID string `bson:"_id"`
|
|
IncomingTraffic int `bson:"inc_traffic"`
|
|
OutgoinfTraffic int `bson:"out_traffic"`
|
|
IncomingRPS int `bson:"inc_rps"`
|
|
ReadRPS int `bson:"read_rps"`
|
|
WriteRPS int `bson:"write_rps"`
|
|
UpdatedAt time.Time `bson:"updated_at"`
|
|
}
|
|
|
|
func (s deviceStatsDB) asDomain() entities.DeviceStatistics {
|
|
return entities.DeviceStatistics{
|
|
ID: entities.DeviceID(s.DeviceID),
|
|
IncomingTrafficBytes: s.IncomingTraffic,
|
|
OutgoingTrafficBytes: s.OutgoinfTraffic,
|
|
IncomingRPS: s.IncomingRPS,
|
|
ReadRPS: s.ReadRPS,
|
|
WriteRPS: s.WriteRPS,
|
|
UpdatedAt: s.UpdatedAt,
|
|
}
|
|
}
|
|
|
|
func (r statsRepository) Upsert(ctx context.Context, stats entities.DeviceStatistics) error {
|
|
opts := options.Update().SetUpsert(true)
|
|
|
|
filter := bson.D{
|
|
{
|
|
Key: "_id",
|
|
Value: stats.ID,
|
|
},
|
|
}
|
|
document := deviceStatsDB{
|
|
DeviceID: string(stats.ID),
|
|
IncomingTraffic: stats.IncomingTrafficBytes,
|
|
OutgoinfTraffic: stats.OutgoingTrafficBytes,
|
|
IncomingRPS: stats.IncomingRPS,
|
|
ReadRPS: stats.ReadRPS,
|
|
WriteRPS: stats.WriteRPS,
|
|
UpdatedAt: time.Now(),
|
|
}
|
|
|
|
_, err := r.collection.UpdateOne(ctx, filter, document, opts)
|
|
if err != nil {
|
|
return fmt.Errorf("inserting: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r statsRepository) List(ctx context.Context) (out []entities.DeviceStatistics, err error) {
|
|
count, err := r.collection.EstimatedDocumentCount(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getting estimated document count: %w", err)
|
|
}
|
|
out = make([]entities.DeviceStatistics, 0, count)
|
|
|
|
cursor, err := r.collection.Find(ctx, bson.D{})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("finding documents: %w", err)
|
|
}
|
|
defer func() {
|
|
errClose := cursor.Close(ctx)
|
|
err = errors.Join(err, errClose)
|
|
}()
|
|
|
|
for cursor.Next(ctx) {
|
|
var stat deviceStatsDB
|
|
err = cursor.Decode(&stat)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("decoding item: %w", err)
|
|
}
|
|
|
|
out = append(out, stat.asDomain())
|
|
}
|
|
|
|
return out, nil
|
|
}
|