initial commit
This commit is contained in:
15
internal/entities/stats.go
Normal file
15
internal/entities/stats.go
Normal file
@ -0,0 +1,15 @@
|
||||
package entities
|
||||
|
||||
import "time"
|
||||
|
||||
type DeviceID string
|
||||
|
||||
type DeviceStatistics struct {
|
||||
ID DeviceID
|
||||
IncomingTrafficBytes int
|
||||
OutgoingTrafficBytes int
|
||||
IncomingRPS int
|
||||
ReadRPS int
|
||||
WriteRPS int
|
||||
UpdatedAt time.Time
|
||||
}
|
||||
126
internal/store/mongo/store.go
Normal file
126
internal/store/mongo/store.go
Normal file
@ -0,0 +1,126 @@
|
||||
package mongo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.loyso.art/frx/devsim/internal/entities"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"go.mongodb.org/mongo-driver/mongo/readpref"
|
||||
)
|
||||
|
||||
func Dial(ctx context.Context, uri string) (*repository, error) {
|
||||
opts := options.Client().ApplyURI(uri)
|
||||
if err := opts.Validate(); err != nil {
|
||||
return nil, fmt.Errorf("validating config: %w", err)
|
||||
}
|
||||
|
||||
client, err := mongo.Connect(ctx, opts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("connecting to mongo: %w", err)
|
||||
}
|
||||
|
||||
err = client.Ping(ctx, readpref.Primary())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pinging mongo database: %w", err)
|
||||
}
|
||||
|
||||
return &repository{client: client}, nil
|
||||
}
|
||||
|
||||
type repository struct {
|
||||
client *mongo.Client
|
||||
}
|
||||
|
||||
func (r *repository) StatsRepository() statsRepository {
|
||||
return statsRepository{
|
||||
collection: r.client.Database("bench").Collection("device_stats"),
|
||||
}
|
||||
}
|
||||
|
||||
type statsRepository struct {
|
||||
collection *mongo.Collection
|
||||
}
|
||||
|
||||
type deviceStatsDB struct {
|
||||
DeviceID string `bson:"_id"`
|
||||
IncomingTraffic int `bson:"inc_traffic"`
|
||||
OutgoinfTraffic int `bson:"out_traffic"`
|
||||
IncomingRPS int `bson:"inc_rps"`
|
||||
ReadRPS int `bson:"read_rps"`
|
||||
WriteRPS int `bson:"write_rps"`
|
||||
UpdatedAt time.Time `bson:"updated_at"`
|
||||
}
|
||||
|
||||
func (s deviceStatsDB) asDomain() entities.DeviceStatistics {
|
||||
return entities.DeviceStatistics{
|
||||
ID: entities.DeviceID(s.DeviceID),
|
||||
IncomingTrafficBytes: s.IncomingTraffic,
|
||||
OutgoingTrafficBytes: s.OutgoinfTraffic,
|
||||
IncomingRPS: s.IncomingRPS,
|
||||
ReadRPS: s.ReadRPS,
|
||||
WriteRPS: s.WriteRPS,
|
||||
UpdatedAt: s.UpdatedAt,
|
||||
}
|
||||
}
|
||||
|
||||
func (r statsRepository) Upsert(ctx context.Context, id entities.DeviceID, stats entities.DeviceStatistics) error {
|
||||
opts := options.Update().SetUpsert(true)
|
||||
|
||||
filter := bson.D{
|
||||
{
|
||||
Key: "_id",
|
||||
Value: id,
|
||||
},
|
||||
}
|
||||
document := deviceStatsDB{
|
||||
DeviceID: string(id),
|
||||
IncomingTraffic: stats.IncomingTrafficBytes,
|
||||
OutgoinfTraffic: stats.OutgoingTrafficBytes,
|
||||
IncomingRPS: stats.IncomingRPS,
|
||||
ReadRPS: stats.ReadRPS,
|
||||
WriteRPS: stats.WriteRPS,
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
|
||||
_, err := r.collection.UpdateOne(ctx, filter, document, opts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("inserting: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r statsRepository) List(ctx context.Context) (out []entities.DeviceStatistics, err error) {
|
||||
count, err := r.collection.EstimatedDocumentCount(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting estimated document count: %w", err)
|
||||
}
|
||||
out = make([]entities.DeviceStatistics, 0, count)
|
||||
|
||||
cursor, err := r.collection.Find(ctx, bson.D{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("finding documents: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
errClose := cursor.Close(ctx)
|
||||
err = errors.Join(err, errClose)
|
||||
}()
|
||||
|
||||
for cursor.Next(ctx) {
|
||||
var stat deviceStatsDB
|
||||
err = cursor.Decode(&stat)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decoding item: %w", err)
|
||||
}
|
||||
|
||||
out = append(out, stat.asDomain())
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
154
internal/store/pg/store.go
Normal file
154
internal/store/pg/store.go
Normal file
@ -0,0 +1,154 @@
|
||||
package pg
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.loyso.art/frx/devsim/internal/entities"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
func Dial(ctx context.Context, addr string) (*repository, error) {
|
||||
config, err := pgxpool.ParseConfig(addr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parsing config: %w", err)
|
||||
}
|
||||
|
||||
pool, err := pgxpool.NewWithConfig(ctx, config)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("connecting to db: %w", err)
|
||||
}
|
||||
|
||||
return &repository{db: pool}, nil
|
||||
}
|
||||
|
||||
type repository struct {
|
||||
db *pgxpool.Pool
|
||||
}
|
||||
|
||||
func (r *repository) StatsRepository() statsRepository {
|
||||
return statsRepository{
|
||||
db: r.db,
|
||||
}
|
||||
}
|
||||
|
||||
type statsRepository struct {
|
||||
db *pgxpool.Pool
|
||||
}
|
||||
|
||||
type deviceStatsDB struct {
|
||||
DeviceID string
|
||||
IncomingTraffic int
|
||||
OutgoingTraffic int
|
||||
IncomingRPS int
|
||||
ReadRPS int
|
||||
WriteRPS int
|
||||
UpdatedAt time.Time
|
||||
}
|
||||
|
||||
func (s deviceStatsDB) asDomain() entities.DeviceStatistics {
|
||||
return entities.DeviceStatistics{
|
||||
ID: entities.DeviceID(s.DeviceID),
|
||||
IncomingTrafficBytes: s.IncomingTraffic,
|
||||
OutgoingTrafficBytes: s.OutgoingTraffic,
|
||||
IncomingRPS: s.IncomingRPS,
|
||||
ReadRPS: s.ReadRPS,
|
||||
WriteRPS: s.WriteRPS,
|
||||
UpdatedAt: s.UpdatedAt,
|
||||
}
|
||||
}
|
||||
|
||||
func (r statsRepository) Upsert(ctx context.Context, id entities.DeviceID, stats entities.DeviceStatistics) error {
|
||||
const query = `INSERT INTO public.stats (
|
||||
device_id,
|
||||
inc_traffic,
|
||||
out_traffic,
|
||||
inc_rps,
|
||||
read_rps,
|
||||
write_rps,
|
||||
updated_at
|
||||
) VALUES (
|
||||
$1,
|
||||
$2,
|
||||
$3,
|
||||
$4,
|
||||
$5,
|
||||
$6,
|
||||
$7,
|
||||
) ON CONFLICT(device_id) DO UPDATE SET
|
||||
inc_traffic = EXCLUDED.inc_traffic,
|
||||
out_traffic = EXCLUDED.out_traffic,
|
||||
inc_rps = EXCLUDED.inc_rps,
|
||||
read_rps = EXCLUDED.read_rps,
|
||||
write_rps = EXCLUDED.write_rps,
|
||||
updated_at = EXCLUDED.updated_at
|
||||
RETURNING id;
|
||||
`
|
||||
|
||||
_, err := r.db.Exec(
|
||||
ctx, query,
|
||||
id,
|
||||
stats.IncomingTrafficBytes,
|
||||
stats.OutgoingTrafficBytes,
|
||||
stats.IncomingRPS,
|
||||
stats.ReadRPS,
|
||||
stats.WriteRPS,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("executing query: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r statsRepository) List(ctx context.Context) (out []entities.DeviceStatistics, err error) {
|
||||
var count int
|
||||
err = r.db.QueryRow(ctx, `SELECT COUNT(device_id) FROM public.stats`).Scan(&count)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting count: %w", err)
|
||||
}
|
||||
|
||||
out = make([]entities.DeviceStatistics, 0, count)
|
||||
|
||||
const query = `SELECT
|
||||
device_id,
|
||||
inc_traffic,
|
||||
out_traffic,
|
||||
inc_rps,
|
||||
read_rps,
|
||||
write_rps,
|
||||
updated_at
|
||||
FROM public.stats;
|
||||
`
|
||||
|
||||
rows, err := r.db.Query(ctx, query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var stat deviceStatsDB
|
||||
err = rows.Scan(
|
||||
&stat.DeviceID,
|
||||
&stat.IncomingTraffic,
|
||||
&stat.OutgoingTraffic,
|
||||
&stat.IncomingRPS,
|
||||
&stat.ReadRPS,
|
||||
&stat.WriteRPS,
|
||||
&stat.UpdatedAt,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scanning row: %w", err)
|
||||
}
|
||||
|
||||
out = append(out, stat.asDomain())
|
||||
}
|
||||
if err = rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("checking rows err: %w", err)
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
12
internal/store/store.go
Normal file
12
internal/store/store.go
Normal file
@ -0,0 +1,12 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.loyso.art/frx/devsim/internal/entities"
|
||||
)
|
||||
|
||||
type Stats interface {
|
||||
Upsert(context.Context, entities.DeviceID, entities.DeviceStatistics) error
|
||||
List(context.Context) ([]entities.DeviceStatistics, error)
|
||||
}
|
||||
Reference in New Issue
Block a user