commit fbe18d86be953dea844d1be8cffdabdab1452447 Author: Aleksandr Trushkin Date: Wed Aug 7 00:20:45 2024 +0300 initial commit diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..88eaa94 --- /dev/null +++ b/go.mod @@ -0,0 +1,23 @@ +module git.loyso.art/frx/devsim + +go 1.22.2 + +require ( + github.com/golang/snappy v0.0.4 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/pgx v3.6.2+incompatible // indirect + github.com/jackc/pgx/v5 v5.6.0 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/klauspost/compress v1.13.6 // indirect + github.com/montanaflynn/stats v0.7.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect + go.mongodb.org/mongo-driver v1.16.0 // indirect + golang.org/x/crypto v0.26.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/text v0.17.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..cb67008 --- /dev/null +++ b/go.sum @@ -0,0 +1,65 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o= +github.com/jackc/pgx v3.6.2+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I= +github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= +github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= +github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.mongodb.org/mongo-driver v1.16.0 h1:tpRsfBJMROVHKpdGyc1BBEzzjDUWjItxbVSZ8Ls4BQ4= +go.mongodb.org/mongo-driver v1.16.0/go.mod h1:oB6AhJQvFQL4LEHyXi6aJzQJtBiTQHiAd83l0GdFaiw= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/entities/stats.go b/internal/entities/stats.go new file mode 100644 index 0000000..74473fd --- /dev/null +++ b/internal/entities/stats.go @@ -0,0 +1,15 @@ +package entities + +import "time" + +type DeviceID string + +type DeviceStatistics struct { + ID DeviceID + IncomingTrafficBytes int + OutgoingTrafficBytes int + IncomingRPS int + ReadRPS int + WriteRPS int + UpdatedAt time.Time +} diff --git a/internal/store/mongo/store.go b/internal/store/mongo/store.go new file mode 100644 index 0000000..bbeb9d9 --- /dev/null +++ b/internal/store/mongo/store.go @@ -0,0 +1,126 @@ +package mongo + +import ( + "context" + "errors" + "fmt" + "time" + + "git.loyso.art/frx/devsim/internal/entities" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readpref" +) + +func Dial(ctx context.Context, uri string) (*repository, error) { + opts := options.Client().ApplyURI(uri) + if err := opts.Validate(); err != nil { + return nil, fmt.Errorf("validating config: %w", err) + } + + client, err := mongo.Connect(ctx, opts) + if err != nil { + return nil, fmt.Errorf("connecting to mongo: %w", err) + } + + err = client.Ping(ctx, readpref.Primary()) + if err != nil { + return nil, fmt.Errorf("pinging mongo database: %w", err) + } + + return &repository{client: client}, nil +} + +type repository struct { + client *mongo.Client +} + +func (r *repository) StatsRepository() statsRepository { + return statsRepository{ + collection: r.client.Database("bench").Collection("device_stats"), + } +} + +type statsRepository struct { + collection *mongo.Collection +} + +type deviceStatsDB struct { + DeviceID string `bson:"_id"` + IncomingTraffic int `bson:"inc_traffic"` + OutgoinfTraffic int `bson:"out_traffic"` + IncomingRPS int `bson:"inc_rps"` + ReadRPS int `bson:"read_rps"` + WriteRPS int `bson:"write_rps"` + UpdatedAt time.Time `bson:"updated_at"` +} + +func (s deviceStatsDB) asDomain() entities.DeviceStatistics { + return entities.DeviceStatistics{ + ID: entities.DeviceID(s.DeviceID), + IncomingTrafficBytes: s.IncomingTraffic, + OutgoingTrafficBytes: s.OutgoinfTraffic, + IncomingRPS: s.IncomingRPS, + ReadRPS: s.ReadRPS, + WriteRPS: s.WriteRPS, + UpdatedAt: s.UpdatedAt, + } +} + +func (r statsRepository) Upsert(ctx context.Context, id entities.DeviceID, stats entities.DeviceStatistics) error { + opts := options.Update().SetUpsert(true) + + filter := bson.D{ + { + Key: "_id", + Value: id, + }, + } + document := deviceStatsDB{ + DeviceID: string(id), + IncomingTraffic: stats.IncomingTrafficBytes, + OutgoinfTraffic: stats.OutgoingTrafficBytes, + IncomingRPS: stats.IncomingRPS, + ReadRPS: stats.ReadRPS, + WriteRPS: stats.WriteRPS, + UpdatedAt: time.Now(), + } + + _, err := r.collection.UpdateOne(ctx, filter, document, opts) + if err != nil { + return fmt.Errorf("inserting: %w", err) + } + + return nil +} + +func (r statsRepository) List(ctx context.Context) (out []entities.DeviceStatistics, err error) { + count, err := r.collection.EstimatedDocumentCount(ctx) + if err != nil { + return nil, fmt.Errorf("getting estimated document count: %w", err) + } + out = make([]entities.DeviceStatistics, 0, count) + + cursor, err := r.collection.Find(ctx, bson.D{}) + if err != nil { + return nil, fmt.Errorf("finding documents: %w", err) + } + defer func() { + errClose := cursor.Close(ctx) + err = errors.Join(err, errClose) + }() + + for cursor.Next(ctx) { + var stat deviceStatsDB + err = cursor.Decode(&stat) + if err != nil { + return nil, fmt.Errorf("decoding item: %w", err) + } + + out = append(out, stat.asDomain()) + } + + return out, nil +} diff --git a/internal/store/pg/store.go b/internal/store/pg/store.go new file mode 100644 index 0000000..51e16e9 --- /dev/null +++ b/internal/store/pg/store.go @@ -0,0 +1,154 @@ +package pg + +import ( + "context" + "fmt" + "time" + + "git.loyso.art/frx/devsim/internal/entities" + + "github.com/jackc/pgx/v5/pgxpool" +) + +func Dial(ctx context.Context, addr string) (*repository, error) { + config, err := pgxpool.ParseConfig(addr) + if err != nil { + return nil, fmt.Errorf("parsing config: %w", err) + } + + pool, err := pgxpool.NewWithConfig(ctx, config) + if err != nil { + return nil, fmt.Errorf("connecting to db: %w", err) + } + + return &repository{db: pool}, nil +} + +type repository struct { + db *pgxpool.Pool +} + +func (r *repository) StatsRepository() statsRepository { + return statsRepository{ + db: r.db, + } +} + +type statsRepository struct { + db *pgxpool.Pool +} + +type deviceStatsDB struct { + DeviceID string + IncomingTraffic int + OutgoingTraffic int + IncomingRPS int + ReadRPS int + WriteRPS int + UpdatedAt time.Time +} + +func (s deviceStatsDB) asDomain() entities.DeviceStatistics { + return entities.DeviceStatistics{ + ID: entities.DeviceID(s.DeviceID), + IncomingTrafficBytes: s.IncomingTraffic, + OutgoingTrafficBytes: s.OutgoingTraffic, + IncomingRPS: s.IncomingRPS, + ReadRPS: s.ReadRPS, + WriteRPS: s.WriteRPS, + UpdatedAt: s.UpdatedAt, + } +} + +func (r statsRepository) Upsert(ctx context.Context, id entities.DeviceID, stats entities.DeviceStatistics) error { + const query = `INSERT INTO public.stats ( + device_id, + inc_traffic, + out_traffic, + inc_rps, + read_rps, + write_rps, + updated_at + ) VALUES ( + $1, + $2, + $3, + $4, + $5, + $6, + $7, + ) ON CONFLICT(device_id) DO UPDATE SET + inc_traffic = EXCLUDED.inc_traffic, + out_traffic = EXCLUDED.out_traffic, + inc_rps = EXCLUDED.inc_rps, + read_rps = EXCLUDED.read_rps, + write_rps = EXCLUDED.write_rps, + updated_at = EXCLUDED.updated_at + RETURNING id; + ` + + _, err := r.db.Exec( + ctx, query, + id, + stats.IncomingTrafficBytes, + stats.OutgoingTrafficBytes, + stats.IncomingRPS, + stats.ReadRPS, + stats.WriteRPS, + ) + if err != nil { + return fmt.Errorf("executing query: %w", err) + } + + return nil +} + +func (r statsRepository) List(ctx context.Context) (out []entities.DeviceStatistics, err error) { + var count int + err = r.db.QueryRow(ctx, `SELECT COUNT(device_id) FROM public.stats`).Scan(&count) + if err != nil { + return nil, fmt.Errorf("getting count: %w", err) + } + + out = make([]entities.DeviceStatistics, 0, count) + + const query = `SELECT + device_id, + inc_traffic, + out_traffic, + inc_rps, + read_rps, + write_rps, + updated_at + FROM public.stats; + ` + + rows, err := r.db.Query(ctx, query) + if err != nil { + return nil, fmt.Errorf("querying: %w", err) + } + defer rows.Close() + + for rows.Next() { + var stat deviceStatsDB + err = rows.Scan( + &stat.DeviceID, + &stat.IncomingTraffic, + &stat.OutgoingTraffic, + &stat.IncomingRPS, + &stat.ReadRPS, + &stat.WriteRPS, + &stat.UpdatedAt, + ) + if err != nil { + return nil, fmt.Errorf("scanning row: %w", err) + } + + out = append(out, stat.asDomain()) + } + if err = rows.Err(); err != nil { + return nil, fmt.Errorf("checking rows err: %w", err) + } + + return out, nil +} diff --git a/internal/store/store.go b/internal/store/store.go new file mode 100644 index 0000000..666fc8b --- /dev/null +++ b/internal/store/store.go @@ -0,0 +1,12 @@ +package store + +import ( + "context" + + "git.loyso.art/frx/devsim/internal/entities" +) + +type Stats interface { + Upsert(context.Context, entities.DeviceID, entities.DeviceStatistics) error + List(context.Context) ([]entities.DeviceStatistics, error) +}