From ba6ac26bac068e93dc6f01f038d922d324d367cb Mon Sep 17 00:00:00 2001 From: Aleksandr Trushkin Date: Sat, 10 Aug 2024 02:15:00 +0300 Subject: [PATCH] add http server --- go.mod | 13 ++- go.sum | 13 ++- internal/api/http/server.go | 188 +++++++++++++++++++++++++++++++ internal/api/http/server_test.go | 110 ++++++++++++++++++ internal/store/mock/store.go | 32 ++++++ internal/store/mongo/store.go | 6 +- internal/store/pg/store.go | 4 +- internal/store/store.go | 2 +- readme.md | 26 +++++ 9 files changed, 380 insertions(+), 14 deletions(-) create mode 100644 internal/api/http/server.go create mode 100644 internal/api/http/server_test.go create mode 100644 internal/store/mock/store.go diff --git a/go.mod b/go.mod index 88eaa94..572adcd 100644 --- a/go.mod +++ b/go.mod @@ -3,21 +3,26 @@ module git.loyso.art/frx/devsim go 1.22.2 require ( + github.com/jackc/pgx/v5 v5.6.0 + github.com/stretchr/testify v1.9.0 + go.mongodb.org/mongo-driver v1.16.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect - github.com/jackc/pgx v3.6.2+incompatible // indirect - github.com/jackc/pgx/v5 v5.6.0 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/klauspost/compress v1.13.6 // indirect github.com/montanaflynn/stats v0.7.1 // indirect - github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect - go.mongodb.org/mongo-driver v1.16.0 // indirect golang.org/x/crypto v0.26.0 // indirect golang.org/x/sync v0.8.0 // indirect golang.org/x/text v0.17.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index cb67008..977abe3 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,14 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o= -github.com/jackc/pgx v3.6.2+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I= github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= @@ -15,12 +17,13 @@ github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQ github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= @@ -63,3 +66,5 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/api/http/server.go b/internal/api/http/server.go new file mode 100644 index 0000000..a449a8f --- /dev/null +++ b/internal/api/http/server.go @@ -0,0 +1,188 @@ +package http + +import ( + "encoding/json" + "fmt" + "log/slog" + "net/http" + "net/http/pprof" + "time" + + "git.loyso.art/frx/devsim/internal/entities" + "git.loyso.art/frx/devsim/internal/store" +) + +type handlersBuilder struct { + mux *http.ServeMux +} + +func NewHandlersBuilder() *handlersBuilder { + return &handlersBuilder{ + mux: http.NewServeMux(), + } +} + +// MountStatsHandlers mounts stats related handlers. +func (h *handlersBuilder) MountStatsHandlers(sr store.Stats, log *slog.Logger) { + log = log.With(slog.String("api", "http")) + + mws := multipleMiddlewares( + middlewarePanicRecovery(log), + middlewareLogger(log), + ) + + h.mux.Handle("/api/v1/stats/", mws(listStatsHandler(sr))) + h.mux.Handle("/api/v1/stats/{id}", mws(postStatsHandler(sr))) +} + +func (s *handlersBuilder) MountProfileHandlers() { + s.mux.HandleFunc("/debug/pprof", pprof.Index) + s.mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + s.mux.HandleFunc("/debug/pprof/trace", pprof.Trace) +} + +func (s *handlersBuilder) Build() http.Handler { + return s.mux +} + +// ListenAndServe runs server to accept incoming connections. This function blocks on +// handling connections. +func listStatsHandler(sr store.Stats) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + stats, err := sr.List(r.Context()) + if err != nil { + http.Error(w, fmt.Errorf("listing stats: %w", err).Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("content-type", "application/json") + err = json.NewEncoder(w).Encode(stats) + if err != nil { + http.Error(w, fmt.Errorf("encoding payload: %w", err).Error(), http.StatusInternalServerError) + } + }) +} + +func postStatsHandler(sr store.Stats) http.HandlerFunc { + type request struct { + IncomingTraffic int `json:"incoming_traffic"` + OutgoingTraffic int `json:"outgoing_traffic"` + IncomingRPS int `json:"incoming_rps"` + ReadRPS int `json:"read_rps"` + WriteRPS int `json:"write_rps"` + } + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + id := r.PathValue("id") + if id == "" { + http.Error(w, "no id provided", http.StatusBadRequest) + return + } + + var reqbody request + err := json.NewDecoder(r.Body).Decode(&reqbody) + if err != nil { + http.Error(w, fmt.Errorf("decoding body: %w", err).Error(), http.StatusBadRequest) + return + } + + ctx := r.Context() + err = sr.Upsert(ctx, entities.DeviceStatistics{ + ID: entities.DeviceID(id), + IncomingTrafficBytes: reqbody.IncomingTraffic, + OutgoingTrafficBytes: reqbody.OutgoingTraffic, + IncomingRPS: reqbody.IncomingRPS, + ReadRPS: reqbody.ReadRPS, + WriteRPS: reqbody.WriteRPS, + }) + if err != nil { + http.Error(w, fmt.Errorf("upserting stat metric: %w", err).Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + }) +} + +type middlewareFunc func(http.Handler) http.Handler + +func middlewarePanicRecovery(log *slog.Logger) middlewareFunc { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer func() { + rec := recover() + if rec == nil { + return + } + + log.ErrorContext( + r.Context(), "panic acquired during request", + slog.Any("panic", rec), + ) + }() + + next.ServeHTTP(w, r) + }) + } +} + +func middlewareLogger(log *slog.Logger) middlewareFunc { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + + requestID := r.Header.Get("x-request-id") + if requestID == "" { + requestID = randomID() + } + + w.Header().Set("x-request-id", requestID) + + method := r.Method + path := r.URL.Path + query := r.URL.Query().Encode() + + log.InfoContext( + r.Context(), "request processing", + slog.String("request_id", requestID), + slog.String("method", method), + slog.String("path", path), + slog.String("query", query), + ) + + next.ServeHTTP(w, r) + + elapsed := time.Since(start) + log.InfoContext( + r.Context(), "request finished", + slog.Duration("elapsed", elapsed.Truncate(time.Millisecond)), + ) + }) + } +} + +func multipleMiddlewares(mws ...middlewareFunc) middlewareFunc { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + for _, mw := range mws { + next = mw(next) + } + + next.ServeHTTP(w, r) + }) + } +} + +func randomID() string { + return "" +} diff --git a/internal/api/http/server_test.go b/internal/api/http/server_test.go new file mode 100644 index 0000000..b3ea060 --- /dev/null +++ b/internal/api/http/server_test.go @@ -0,0 +1,110 @@ +package http_test + +import ( + "bytes" + "encoding/json" + "log/slog" + stdhttp "net/http" + "net/http/httptest" + "os" + "testing" + + "git.loyso.art/frx/devsim/internal/api/http" + "git.loyso.art/frx/devsim/internal/entities" + "git.loyso.art/frx/devsim/internal/store/mock" + + "github.com/stretchr/testify/require" +) + +func prepareEssentials(t testing.TB) (*mock.MockedStore, *slog.Logger) { + t.Helper() + log := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{})) + return mock.NewMock(), log +} + +func TestList(t *testing.T) { + require := require.New(t) + store, log := prepareEssentials(t) + + expectedStatistics := []entities.DeviceStatistics{ + { + ID: entities.DeviceID("test-1"), + OutgoingTrafficBytes: 10, + }, + { + ID: entities.DeviceID("test-2"), + IncomingTrafficBytes: 20, + }, + } + + store.RegisterOnList(func() ([]entities.DeviceStatistics, error) { + return expectedStatistics, nil + }) + + hb := http.NewHandlersBuilder() + hb.MountStatsHandlers(store, log) + + handler := hb.Build() + + server := httptest.NewServer(handler) + defer server.Close() + + httpClient := server.Client() + req, err := stdhttp.NewRequest(stdhttp.MethodGet, server.URL+"/api/v1/stats", nil) + require.NoError(err) + + resp, err := httpClient.Do(req) + require.NoError(err) + + stats := make([]entities.DeviceStatistics, 0, 2) + err = json.NewDecoder(resp.Body).Decode(&stats) + require.NoError(err) + + require.ElementsMatch(stats, expectedStatistics) +} + +func TestUpsert(t *testing.T) { + require := require.New(t) + store, log := prepareEssentials(t) + + expectedStatistics := entities.DeviceStatistics{ + ID: entities.DeviceID("test-1"), + IncomingTrafficBytes: 5, + OutgoingTrafficBytes: 10, + IncomingRPS: 8, + WriteRPS: 6, + ReadRPS: 3, + } + + incomingStats := new(entities.DeviceStatistics) + store.RegisterOnUpsert(func(ds entities.DeviceStatistics) error { + *incomingStats = ds + return nil + }) + + hb := http.NewHandlersBuilder() + hb.MountStatsHandlers(store, log) + + handler := hb.Build() + + server := httptest.NewServer(handler) + defer server.Close() + + httpClient := server.Client() + + requestBody, _ := json.Marshal(map[string]any{ + "incoming_traffic": 5, + "outgoing_traffic": 10, + "incoming_rps": 8, + "write_rps": 6, + "read_rps": 3, + }) + + req, err := stdhttp.NewRequest(stdhttp.MethodPost, server.URL+"/api/v1/stats/"+string(expectedStatistics.ID), bytes.NewReader(requestBody)) + require.NoError(err) + + resp, err := httpClient.Do(req) + require.NoError(err) + require.Equal(resp.StatusCode, stdhttp.StatusOK) + require.Equal(*incomingStats, expectedStatistics) +} diff --git a/internal/store/mock/store.go b/internal/store/mock/store.go new file mode 100644 index 0000000..5b456f5 --- /dev/null +++ b/internal/store/mock/store.go @@ -0,0 +1,32 @@ +package mock + +import ( + "context" + + "git.loyso.art/frx/devsim/internal/entities" +) + +type MockedStore struct { + onList func() ([]entities.DeviceStatistics, error) + onUpsert func(stat entities.DeviceStatistics) (err error) +} + +func NewMock() *MockedStore { + return &MockedStore{} +} + +func (m *MockedStore) RegisterOnList(f func() ([]entities.DeviceStatistics, error)) { + m.onList = f +} + +func (m *MockedStore) RegisterOnUpsert(f func(entities.DeviceStatistics) error) { + m.onUpsert = f +} + +func (m *MockedStore) List(context.Context) ([]entities.DeviceStatistics, error) { + return m.onList() +} + +func (m *MockedStore) Upsert(_ context.Context, stats entities.DeviceStatistics) error { + return m.onUpsert(stats) +} diff --git a/internal/store/mongo/store.go b/internal/store/mongo/store.go index bbeb9d9..c221758 100644 --- a/internal/store/mongo/store.go +++ b/internal/store/mongo/store.go @@ -69,17 +69,17 @@ func (s deviceStatsDB) asDomain() entities.DeviceStatistics { } } -func (r statsRepository) Upsert(ctx context.Context, id entities.DeviceID, stats entities.DeviceStatistics) error { +func (r statsRepository) Upsert(ctx context.Context, stats entities.DeviceStatistics) error { opts := options.Update().SetUpsert(true) filter := bson.D{ { Key: "_id", - Value: id, + Value: stats.ID, }, } document := deviceStatsDB{ - DeviceID: string(id), + DeviceID: string(stats.ID), IncomingTraffic: stats.IncomingTrafficBytes, OutgoinfTraffic: stats.OutgoingTrafficBytes, IncomingRPS: stats.IncomingRPS, diff --git a/internal/store/pg/store.go b/internal/store/pg/store.go index 51e16e9..bd8fafa 100644 --- a/internal/store/pg/store.go +++ b/internal/store/pg/store.go @@ -60,7 +60,7 @@ func (s deviceStatsDB) asDomain() entities.DeviceStatistics { } } -func (r statsRepository) Upsert(ctx context.Context, id entities.DeviceID, stats entities.DeviceStatistics) error { +func (r statsRepository) Upsert(ctx context.Context, stats entities.DeviceStatistics) error { const query = `INSERT INTO public.stats ( device_id, inc_traffic, @@ -89,7 +89,7 @@ func (r statsRepository) Upsert(ctx context.Context, id entities.DeviceID, stats _, err := r.db.Exec( ctx, query, - id, + stats.ID, stats.IncomingTrafficBytes, stats.OutgoingTrafficBytes, stats.IncomingRPS, diff --git a/internal/store/store.go b/internal/store/store.go index 666fc8b..5846922 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -7,6 +7,6 @@ import ( ) type Stats interface { - Upsert(context.Context, entities.DeviceID, entities.DeviceStatistics) error List(context.Context) ([]entities.DeviceStatistics, error) + Upsert(context.Context, entities.DeviceStatistics) error } diff --git a/readme.md b/readme.md index 3aabef6..1f2cc06 100644 --- a/readme.md +++ b/readme.md @@ -16,3 +16,29 @@ Plans to add the following databases: TODO, but! All dependencies will be covered by docker. + +## Architecture + +This application contains just of two layers: + +- External Layer that accepts http connections +- Store Layer that stores entities inside some store hidden by implementation for specific database. + +During to simple logic, there is no need to add additional business-logic layer since its all about +saving incoming metrics into database. + +### External Layer + +Typically simple HTTP Server which allows to interact with application by the following handlers: + +| Handler | Request | Parameters | Description | +| ------------- | ------------------------- | ----------------------------------------------------------------------------------- | ----------------------------------- | +| Upsert metric | POST `/api/v1/stats/{id}` | {id} -- ID of the device. String. JSON-encoded body for stats (see below for model) | Upserts stats for the passed device | +| List metrics | GET `/api/v1/stats` | | Lists all available metrics | + +### Store Layer + +Provides two methods to interacts with statistics: + +- `List` - list available metrics; +- `Upsert` - update metric by device id.