atmost working example
This commit is contained in:
@ -44,7 +44,7 @@ func middlewareLogger(log *slog.Logger) middlewareFunc {
|
||||
path := r.URL.Path
|
||||
query := r.URL.Query().Encode()
|
||||
|
||||
log.InfoContext(
|
||||
log.DebugContext(
|
||||
r.Context(), "request processing",
|
||||
slog.String("request_id", requestID),
|
||||
slog.String("method", method),
|
||||
@ -54,10 +54,9 @@ func middlewareLogger(log *slog.Logger) middlewareFunc {
|
||||
|
||||
next.ServeHTTP(w, r)
|
||||
|
||||
elapsed := time.Since(start)
|
||||
log.InfoContext(
|
||||
r.Context(), "request finished",
|
||||
slog.Duration("elapsed", elapsed.Truncate(time.Millisecond)),
|
||||
slog.Int64("elapsed", time.Since(start).Milliseconds()),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
86
internal/interconnect/collector/client.go
Normal file
86
internal/interconnect/collector/client.go
Normal file
@ -0,0 +1,86 @@
|
||||
package collector
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"git.loyso.art/frx/devsim/internal/entities"
|
||||
)
|
||||
|
||||
type upsertRequest struct {
|
||||
IncomingTraffic int `json:"incoming_traffic"`
|
||||
OutgoingTraffic int `json:"outgoing_traffic"`
|
||||
IncomingRPS int `json:"incoming_rps"`
|
||||
ReadRPS int `json:"read_rps"`
|
||||
WriteRPS int `json:"write_rps"`
|
||||
}
|
||||
|
||||
type Client interface {
|
||||
Upsert(context.Context, entities.DeviceStatistics) error
|
||||
}
|
||||
|
||||
type client struct {
|
||||
httpClient *http.Client
|
||||
baseurl string
|
||||
}
|
||||
|
||||
func New(addr string) (*client, error) {
|
||||
hc := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: time.Second * 10,
|
||||
KeepAlive: time.Second * 30,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 10,
|
||||
IdleConnTimeout: time.Second * 90,
|
||||
TLSHandshakeTimeout: time.Second * 5,
|
||||
ExpectContinueTimeout: time.Second * 1,
|
||||
ForceAttemptHTTP2: true,
|
||||
},
|
||||
Timeout: time.Second * 10,
|
||||
}
|
||||
|
||||
return &client{
|
||||
httpClient: hc,
|
||||
baseurl: addr,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var upsertRequestTemplate = template.Must(template.New("request").Parse(`{"incoming_traffic":{{.IncomingTrafficBytes}},"outgoing_traffic":{{.OutgoingTrafficBytes}},"incoming_rps":{{.IncomingRPS}},"read_rps":{{.ReadRPS}},"write_rps":{{.WriteRPS}}}`))
|
||||
|
||||
func (c *client) Upsert(ctx context.Context, stat entities.DeviceStatistics) error {
|
||||
var buf bytes.Buffer
|
||||
err := upsertRequestTemplate.Lookup("request").Execute(&buf, stat)
|
||||
if err != nil {
|
||||
return fmt.Errorf("executing template: %w", err)
|
||||
}
|
||||
|
||||
path := c.baseurl + "/api/v1/stats/" + string(stat.ID)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, path, &buf)
|
||||
if err != nil {
|
||||
return fmt.Errorf("preparing http request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("executing request: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
return nil
|
||||
}
|
||||
|
||||
data, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
|
||||
if err != nil {
|
||||
return fmt.Errorf("reading body by status code %d: %w", resp.StatusCode, err)
|
||||
}
|
||||
|
||||
return fmt.Errorf("expected status 200 got %d with body: %q", resp.StatusCode, data)
|
||||
}
|
||||
32
internal/store/pg/queries/db.go
Normal file
32
internal/store/pg/queries/db.go
Normal file
@ -0,0 +1,32 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.26.0
|
||||
|
||||
package queries
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgconn"
|
||||
)
|
||||
|
||||
type DBTX interface {
|
||||
Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
|
||||
Query(context.Context, string, ...interface{}) (pgx.Rows, error)
|
||||
QueryRow(context.Context, string, ...interface{}) pgx.Row
|
||||
}
|
||||
|
||||
func New(db DBTX) *Queries {
|
||||
return &Queries{db: db}
|
||||
}
|
||||
|
||||
type Queries struct {
|
||||
db DBTX
|
||||
}
|
||||
|
||||
func (q *Queries) WithTx(tx pgx.Tx) *Queries {
|
||||
return &Queries{
|
||||
db: tx,
|
||||
}
|
||||
}
|
||||
19
internal/store/pg/queries/models.go
Normal file
19
internal/store/pg/queries/models.go
Normal file
@ -0,0 +1,19 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.26.0
|
||||
|
||||
package queries
|
||||
|
||||
import (
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
|
||||
type Stat struct {
|
||||
DeviceID string
|
||||
IncTraffic int32
|
||||
OutTraffic int32
|
||||
IncRps int32
|
||||
ReadRps int32
|
||||
WriteRps int32
|
||||
UpdatedAt pgtype.Timestamp
|
||||
}
|
||||
90
internal/store/pg/queries/queries.sql.go
Normal file
90
internal/store/pg/queries/queries.sql.go
Normal file
@ -0,0 +1,90 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.26.0
|
||||
// source: queries.sql
|
||||
|
||||
package queries
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
const listDeviceStats = `-- name: ListDeviceStats :many
|
||||
SELECT device_id, inc_traffic, out_traffic, inc_rps, read_rps, write_rps, updated_at FROM public.stats
|
||||
`
|
||||
|
||||
func (q *Queries) ListDeviceStats(ctx context.Context) ([]Stat, error) {
|
||||
rows, err := q.db.Query(ctx, listDeviceStats)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []Stat
|
||||
for rows.Next() {
|
||||
var i Stat
|
||||
if err := rows.Scan(
|
||||
&i.DeviceID,
|
||||
&i.IncTraffic,
|
||||
&i.OutTraffic,
|
||||
&i.IncRps,
|
||||
&i.ReadRps,
|
||||
&i.WriteRps,
|
||||
&i.UpdatedAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const upsertDeviceMetrics = `-- name: UpsertDeviceMetrics :exec
|
||||
INSERT INTO public.stats(
|
||||
device_id,
|
||||
inc_traffic,
|
||||
out_traffic,
|
||||
inc_rps,
|
||||
write_rps,
|
||||
read_rps,
|
||||
updated_at
|
||||
) VALUES (
|
||||
$1,
|
||||
$2,
|
||||
$3,
|
||||
$4,
|
||||
$5,
|
||||
$6,
|
||||
NOW()
|
||||
) ON CONFLICT(device_id) DO UPDATE SET
|
||||
device_id = EXCLUDED.device_id,
|
||||
inc_traffic = EXCLUDED.inc_traffic,
|
||||
out_traffic = EXCLUDED.out_traffic,
|
||||
inc_rps = EXCLUDED.inc_rps,
|
||||
write_rps = EXCLUDED.write_rps,
|
||||
read_rps = EXCLUDED.read_rps,
|
||||
updated_at = NOW()
|
||||
`
|
||||
|
||||
type UpsertDeviceMetricsParams struct {
|
||||
DeviceID string
|
||||
IncTraffic int32
|
||||
OutTraffic int32
|
||||
IncRps int32
|
||||
WriteRps int32
|
||||
ReadRps int32
|
||||
}
|
||||
|
||||
func (q *Queries) UpsertDeviceMetrics(ctx context.Context, arg UpsertDeviceMetricsParams) error {
|
||||
_, err := q.db.Exec(ctx, upsertDeviceMetrics,
|
||||
arg.DeviceID,
|
||||
arg.IncTraffic,
|
||||
arg.OutTraffic,
|
||||
arg.IncRps,
|
||||
arg.WriteRps,
|
||||
arg.ReadRps,
|
||||
)
|
||||
return err
|
||||
}
|
||||
@ -5,9 +5,10 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.loyso.art/frx/devsim/internal/entities"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
|
||||
"git.loyso.art/frx/devsim/internal/entities"
|
||||
"git.loyso.art/frx/devsim/internal/store/pg/queries"
|
||||
)
|
||||
|
||||
func Dial(ctx context.Context, addr string) (*repository, error) {
|
||||
@ -66,93 +67,38 @@ func (s deviceStatsDB) asDomain() entities.DeviceStatistics {
|
||||
}
|
||||
|
||||
func (r statsRepository) Upsert(ctx context.Context, stats entities.DeviceStatistics) error {
|
||||
const query = `INSERT INTO public.stats (
|
||||
device_id,
|
||||
inc_traffic,
|
||||
out_traffic,
|
||||
inc_rps,
|
||||
read_rps,
|
||||
write_rps,
|
||||
updated_at
|
||||
) VALUES (
|
||||
$1,
|
||||
$2,
|
||||
$3,
|
||||
$4,
|
||||
$5,
|
||||
$6,
|
||||
$7,
|
||||
) ON CONFLICT(device_id) DO UPDATE SET
|
||||
inc_traffic = EXCLUDED.inc_traffic,
|
||||
out_traffic = EXCLUDED.out_traffic,
|
||||
inc_rps = EXCLUDED.inc_rps,
|
||||
read_rps = EXCLUDED.read_rps,
|
||||
write_rps = EXCLUDED.write_rps,
|
||||
updated_at = EXCLUDED.updated_at
|
||||
RETURNING id;
|
||||
`
|
||||
|
||||
_, err := r.db.Exec(
|
||||
ctx, query,
|
||||
stats.ID,
|
||||
stats.IncomingTrafficBytes,
|
||||
stats.OutgoingTrafficBytes,
|
||||
stats.IncomingRPS,
|
||||
stats.ReadRPS,
|
||||
stats.WriteRPS,
|
||||
)
|
||||
err := queries.New(r.db).UpsertDeviceMetrics(ctx, queries.UpsertDeviceMetricsParams{
|
||||
DeviceID: string(stats.ID),
|
||||
IncTraffic: int32(stats.IncomingTrafficBytes),
|
||||
OutTraffic: int32(stats.OutgoingTrafficBytes),
|
||||
IncRps: int32(stats.IncomingRPS),
|
||||
WriteRps: int32(stats.WriteRPS),
|
||||
ReadRps: int32(stats.ReadRPS),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("executing query: %w", err)
|
||||
return fmt.Errorf("upserting device metrics: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r statsRepository) List(ctx context.Context) (out []entities.DeviceStatistics, err error) {
|
||||
var count int
|
||||
err = r.db.QueryRow(ctx, `SELECT COUNT(device_id) FROM public.stats`).Scan(&count)
|
||||
stats, err := queries.New(r.db).ListDeviceStats(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting count: %w", err)
|
||||
return nil, fmt.Errorf("listing device stats: %w", err)
|
||||
}
|
||||
|
||||
out = make([]entities.DeviceStatistics, 0, count)
|
||||
out = make([]entities.DeviceStatistics, len(stats))
|
||||
|
||||
const query = `SELECT
|
||||
device_id,
|
||||
inc_traffic,
|
||||
out_traffic,
|
||||
inc_rps,
|
||||
read_rps,
|
||||
write_rps,
|
||||
updated_at
|
||||
FROM public.stats;
|
||||
`
|
||||
|
||||
rows, err := r.db.Query(ctx, query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("querying: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var stat deviceStatsDB
|
||||
err = rows.Scan(
|
||||
&stat.DeviceID,
|
||||
&stat.IncomingTraffic,
|
||||
&stat.OutgoingTraffic,
|
||||
&stat.IncomingRPS,
|
||||
&stat.ReadRPS,
|
||||
&stat.WriteRPS,
|
||||
&stat.UpdatedAt,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("scanning row: %w", err)
|
||||
for i, stat := range stats {
|
||||
out[i] = entities.DeviceStatistics{
|
||||
IncomingTrafficBytes: int(stat.IncTraffic),
|
||||
OutgoingTrafficBytes: int(stat.OutTraffic),
|
||||
IncomingRPS: int(stat.IncRps),
|
||||
WriteRPS: int(stat.WriteRps),
|
||||
ReadRPS: int(stat.ReadRps),
|
||||
UpdatedAt: stat.UpdatedAt.Time,
|
||||
}
|
||||
|
||||
out = append(out, stat.asDomain())
|
||||
}
|
||||
if err = rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("checking rows err: %w", err)
|
||||
}
|
||||
|
||||
return out, nil
|
||||
|
||||
Reference in New Issue
Block a user