From 0046755c7dd768c45c7428c3540addf0ca5b948a Mon Sep 17 00:00:00 2001 From: Aleksandr Trushkin Date: Sat, 10 Aug 2024 14:14:38 +0300 Subject: [PATCH] provide a way to run app --- Dockerfile | 17 +++ cmd/web/main.go | 182 +++++++++++++++++++++++++ internal/api/http/handlerbuilder.go | 42 ++++++ internal/api/http/middlewares.go | 76 +++++++++++ internal/api/http/server.go | 197 +++++----------------------- internal/api/http/server_test.go | 4 +- internal/api/http/statshandlers.go | 79 +++++++++++ internal/store/mongo/store.go | 7 + internal/store/pg/store.go | 7 +- pkg/collections/set.go | 25 ++++ 10 files changed, 468 insertions(+), 168 deletions(-) create mode 100644 Dockerfile create mode 100644 cmd/web/main.go create mode 100644 internal/api/http/handlerbuilder.go create mode 100644 internal/api/http/middlewares.go create mode 100644 internal/api/http/statshandlers.go create mode 100644 pkg/collections/set.go diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..e921d33 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,17 @@ +FROM golang:1.22-alpine as golang + +WORKDIR /app +COPY . . + +RUN go mod download && go mod verify && CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /server . + +FROM gcr.io/distroless/static-debian12@sha256:ce46866b3a5170db3b49364900fb3168dc0833dfb46c26da5c77f22abb01d8c3 + +WORKDIR /app +COPY --from=golang /server . + +EXPOSE 9123 +ENV DEVSIM_HTTP_ADDR=":9123" + +CMD ["/server"] + diff --git a/cmd/web/main.go b/cmd/web/main.go new file mode 100644 index 0000000..4bc59a5 --- /dev/null +++ b/cmd/web/main.go @@ -0,0 +1,182 @@ +package main + +import ( + "context" + "errors" + "fmt" + "io" + "log/slog" + "os" + "os/signal" + + "git.loyso.art/frx/devsim/internal/api/http" + "git.loyso.art/frx/devsim/internal/store" + "git.loyso.art/frx/devsim/internal/store/mongo" + "git.loyso.art/frx/devsim/internal/store/pg" + "git.loyso.art/frx/devsim/pkg/collections" + "golang.org/x/sync/errgroup" +) + +var availableStoreTypes = collections.NewSet([]string{ + "pg", "mongo", +}...) + +func main() { + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancel() + + log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{})) + + var settings applicationSettings + settings.fromEnv() + + err := settings.validate() + if err != nil { + log.ErrorContext(ctx, "unable to validate settings", slog.Any("err", err)) + os.Exit(1) + } + + err = app(ctx, settings, log) + if err != nil { + log.ErrorContext(ctx, "unable to run app", slog.Any("err", err)) + os.Exit(1) + } +} + +type mongoSettings struct { + DSN string +} + +func (s *mongoSettings) fromEnv() { + *s = mongoSettings{ + DSN: os.Getenv("DEVSIM_MONGO_DSN"), + } +} + +type pgSettings struct { + DSN string +} + +func (s *pgSettings) fromEnv() { + *s = pgSettings{ + DSN: os.Getenv("DEVSIM_PG_DSN"), + } +} + +type applicationSettings struct { + listenAddr string + storeType string + + pg pgSettings + mongo mongoSettings +} + +func (s *applicationSettings) fromEnv() { + const webaddr = ":9123" + + *s = applicationSettings{ + listenAddr: valueOrDefault(os.Getenv("DEVSIM_HTTP_ADDR"), webaddr), + storeType: os.Getenv("DEVSIM_STORE_TYPE"), + } + + s.pg.fromEnv() + s.mongo.fromEnv() +} + +func (s *applicationSettings) validate() (err error) { + if !availableStoreTypes.Contains(s.storeType) { + err = errors.Join(err, errors.New("store_type value is unsupported")) + } + + switch s.storeType { + case "pg": + if s.pg.DSN == "" { + err = errors.Join(err, errors.New("no postgres dsn provided")) + } + case "mongo": + if s.mongo.DSN == "" { + err = errors.Join(err, errors.New("no mongo dsn provided")) + } + } + + if s.listenAddr == "" { + err = errors.Join(err, errors.New("no listen address provided")) + } + + return err +} + +type namedCloser struct { + closer io.Closer + name string +} + +func app(ctx context.Context, settings applicationSettings, log *slog.Logger) (err error) { + var repo store.Stats + var closers []namedCloser + + switch settings.storeType { + case "pg": + pgconn, err := pg.Dial(ctx, settings.pg.DSN) + if err != nil { + return fmt.Errorf("connecting to postgres: %w", err) + } + + repo = pgconn.StatsRepository() + closers = append(closers, namedCloser{ + name: "postgres", + closer: pgconn, + }) + case "mongo": + mongoconn, err := mongo.Dial(ctx, settings.mongo.DSN) + if err != nil { + return fmt.Errorf("connecting to mongo: %w", err) + } + + repo = mongoconn.StatsRepository() + closers = append(closers, namedCloser{ + name: "mongo", + closer: mongoconn, + }) + } + + hb := http.NewHandlersBuilder() + hb.MountStatsHandlers(repo, log) + + httpServer := http.NewServer(settings.listenAddr) + closers = append(closers, namedCloser{ + name: "http", + closer: httpServer, + }) + httpServer.RegisterHandler(hb.Build()) + + eg, _ := errgroup.WithContext(ctx) + eg.Go(func() error { + return httpServer.Run() + }) + + err = eg.Wait() + if err != nil { + log.ErrorContext(ctx, "unable to proceed the app", slog.Any("err", err)) + } + + for _, closer := range closers { + name := closer.name + closerUnit := closer.closer + errClose := closerUnit.Close() + if errClose != nil { + log.ErrorContext(ctx, "unable to close component", slog.String("component", name), slog.Any("err", errClose)) + } + } + + return nil +} + +func valueOrDefault[x comparable](value, fallback x) x { + var v x + if value == v { + return fallback + } + + return value +} diff --git a/internal/api/http/handlerbuilder.go b/internal/api/http/handlerbuilder.go new file mode 100644 index 0000000..f594d72 --- /dev/null +++ b/internal/api/http/handlerbuilder.go @@ -0,0 +1,42 @@ +package http + +import ( + "log/slog" + "net/http" + "net/http/pprof" + + "git.loyso.art/frx/devsim/internal/store" +) + +type handlersBuilder struct { + mux *http.ServeMux +} + +func NewHandlersBuilder() *handlersBuilder { + return &handlersBuilder{ + mux: http.NewServeMux(), + } +} + +// MountStatsHandlers mounts stats related handlers. +func (h *handlersBuilder) MountStatsHandlers(sr store.Stats, log *slog.Logger) { + log = log.With(slog.String("api", "http")) + + mws := multipleMiddlewares( + middlewarePanicRecovery(log), + middlewareLogger(log), + ) + + h.mux.Handle("/api/v1/stats/", mws(listStatsHandler(sr))) + h.mux.Handle("/api/v1/stats/{id}", mws(postStatsHandler(sr))) +} + +func (s *handlersBuilder) MountProfileHandlers() { + s.mux.HandleFunc("/debug/pprof", pprof.Index) + s.mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + s.mux.HandleFunc("/debug/pprof/trace", pprof.Trace) +} + +func (s *handlersBuilder) Build() http.Handler { + return s.mux +} diff --git a/internal/api/http/middlewares.go b/internal/api/http/middlewares.go new file mode 100644 index 0000000..17f3dda --- /dev/null +++ b/internal/api/http/middlewares.go @@ -0,0 +1,76 @@ +package http + +import ( + "log/slog" + "net/http" + "time" +) + +type middlewareFunc func(http.Handler) http.Handler + +func middlewarePanicRecovery(log *slog.Logger) middlewareFunc { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer func() { + rec := recover() + if rec == nil { + return + } + + log.ErrorContext( + r.Context(), "panic acquired during request", + slog.Any("panic", rec), + ) + }() + + next.ServeHTTP(w, r) + }) + } +} + +func middlewareLogger(log *slog.Logger) middlewareFunc { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + + requestID := r.Header.Get("x-request-id") + if requestID == "" { + requestID = randomID() + } + + w.Header().Set("x-request-id", requestID) + + method := r.Method + path := r.URL.Path + query := r.URL.Query().Encode() + + log.InfoContext( + r.Context(), "request processing", + slog.String("request_id", requestID), + slog.String("method", method), + slog.String("path", path), + slog.String("query", query), + ) + + next.ServeHTTP(w, r) + + elapsed := time.Since(start) + log.InfoContext( + r.Context(), "request finished", + slog.Duration("elapsed", elapsed.Truncate(time.Millisecond)), + ) + }) + } +} + +func multipleMiddlewares(mws ...middlewareFunc) middlewareFunc { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + for _, mw := range mws { + next = mw(next) + } + + next.ServeHTTP(w, r) + }) + } +} diff --git a/internal/api/http/server.go b/internal/api/http/server.go index a449a8f..7ff6a7d 100644 --- a/internal/api/http/server.go +++ b/internal/api/http/server.go @@ -1,188 +1,55 @@ package http import ( - "encoding/json" - "fmt" - "log/slog" + "context" + "crypto/rand" + "encoding/hex" + "errors" "net/http" - "net/http/pprof" "time" - - "git.loyso.art/frx/devsim/internal/entities" - "git.loyso.art/frx/devsim/internal/store" ) -type handlersBuilder struct { - mux *http.ServeMux +type Server struct { + srv *http.Server } -func NewHandlersBuilder() *handlersBuilder { - return &handlersBuilder{ - mux: http.NewServeMux(), +func NewServer(addr string) *Server { + srv := http.Server{ + ReadTimeout: time.Second * 3, + WriteTimeout: time.Second * 3, + IdleTimeout: time.Second * 2, + ReadHeaderTimeout: time.Second * 3, + Addr: addr, + } + + return &Server{ + srv: &srv, } } -// MountStatsHandlers mounts stats related handlers. -func (h *handlersBuilder) MountStatsHandlers(sr store.Stats, log *slog.Logger) { - log = log.With(slog.String("api", "http")) - - mws := multipleMiddlewares( - middlewarePanicRecovery(log), - middlewareLogger(log), - ) - - h.mux.Handle("/api/v1/stats/", mws(listStatsHandler(sr))) - h.mux.Handle("/api/v1/stats/{id}", mws(postStatsHandler(sr))) +func (s *Server) RegisterHandler(h http.Handler) { + s.srv.Handler = h } -func (s *handlersBuilder) MountProfileHandlers() { - s.mux.HandleFunc("/debug/pprof", pprof.Index) - s.mux.HandleFunc("/debug/pprof/profile", pprof.Profile) - s.mux.HandleFunc("/debug/pprof/trace", pprof.Trace) -} - -func (s *handlersBuilder) Build() http.Handler { - return s.mux -} - -// ListenAndServe runs server to accept incoming connections. This function blocks on -// handling connections. -func listStatsHandler(sr store.Stats) http.HandlerFunc { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) - return - } - - stats, err := sr.List(r.Context()) - if err != nil { - http.Error(w, fmt.Errorf("listing stats: %w", err).Error(), http.StatusInternalServerError) - return - } - - w.Header().Set("content-type", "application/json") - err = json.NewEncoder(w).Encode(stats) - if err != nil { - http.Error(w, fmt.Errorf("encoding payload: %w", err).Error(), http.StatusInternalServerError) - } - }) -} - -func postStatsHandler(sr store.Stats) http.HandlerFunc { - type request struct { - IncomingTraffic int `json:"incoming_traffic"` - OutgoingTraffic int `json:"outgoing_traffic"` - IncomingRPS int `json:"incoming_rps"` - ReadRPS int `json:"read_rps"` - WriteRPS int `json:"write_rps"` +func (s *Server) Run() error { + err := s.srv.ListenAndServe() + if errors.Is(err, http.ErrServerClosed) { + return nil } - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) - return - } - - id := r.PathValue("id") - if id == "" { - http.Error(w, "no id provided", http.StatusBadRequest) - return - } - - var reqbody request - err := json.NewDecoder(r.Body).Decode(&reqbody) - if err != nil { - http.Error(w, fmt.Errorf("decoding body: %w", err).Error(), http.StatusBadRequest) - return - } - - ctx := r.Context() - err = sr.Upsert(ctx, entities.DeviceStatistics{ - ID: entities.DeviceID(id), - IncomingTrafficBytes: reqbody.IncomingTraffic, - OutgoingTrafficBytes: reqbody.OutgoingTraffic, - IncomingRPS: reqbody.IncomingRPS, - ReadRPS: reqbody.ReadRPS, - WriteRPS: reqbody.WriteRPS, - }) - if err != nil { - http.Error(w, fmt.Errorf("upserting stat metric: %w", err).Error(), http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusOK) - }) + return err } -type middlewareFunc func(http.Handler) http.Handler +func (s *Server) Close() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() -func middlewarePanicRecovery(log *slog.Logger) middlewareFunc { - return func(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - defer func() { - rec := recover() - if rec == nil { - return - } - - log.ErrorContext( - r.Context(), "panic acquired during request", - slog.Any("panic", rec), - ) - }() - - next.ServeHTTP(w, r) - }) - } -} - -func middlewareLogger(log *slog.Logger) middlewareFunc { - return func(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - start := time.Now() - - requestID := r.Header.Get("x-request-id") - if requestID == "" { - requestID = randomID() - } - - w.Header().Set("x-request-id", requestID) - - method := r.Method - path := r.URL.Path - query := r.URL.Query().Encode() - - log.InfoContext( - r.Context(), "request processing", - slog.String("request_id", requestID), - slog.String("method", method), - slog.String("path", path), - slog.String("query", query), - ) - - next.ServeHTTP(w, r) - - elapsed := time.Since(start) - log.InfoContext( - r.Context(), "request finished", - slog.Duration("elapsed", elapsed.Truncate(time.Millisecond)), - ) - }) - } -} - -func multipleMiddlewares(mws ...middlewareFunc) middlewareFunc { - return func(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - for _, mw := range mws { - next = mw(next) - } - - next.ServeHTTP(w, r) - }) - } + return s.srv.Shutdown(ctx) } func randomID() string { - return "" + var randombuffer [4]byte + _, _ = rand.Read(randombuffer[:]) + + return hex.EncodeToString(randombuffer[:]) } diff --git a/internal/api/http/server_test.go b/internal/api/http/server_test.go index b3ea060..33bd9ac 100644 --- a/internal/api/http/server_test.go +++ b/internal/api/http/server_test.go @@ -22,7 +22,7 @@ func prepareEssentials(t testing.TB) (*mock.MockedStore, *slog.Logger) { return mock.NewMock(), log } -func TestList(t *testing.T) { +func TestListSuccess(t *testing.T) { require := require.New(t) store, log := prepareEssentials(t) @@ -63,7 +63,7 @@ func TestList(t *testing.T) { require.ElementsMatch(stats, expectedStatistics) } -func TestUpsert(t *testing.T) { +func TestUpsertSuccess(t *testing.T) { require := require.New(t) store, log := prepareEssentials(t) diff --git a/internal/api/http/statshandlers.go b/internal/api/http/statshandlers.go new file mode 100644 index 0000000..657abec --- /dev/null +++ b/internal/api/http/statshandlers.go @@ -0,0 +1,79 @@ +package http + +import ( + "encoding/json" + "fmt" + "net/http" + + "git.loyso.art/frx/devsim/internal/entities" + "git.loyso.art/frx/devsim/internal/store" +) + +// ListenAndServe runs server to accept incoming connections. This function blocks on +// handling connections. +func listStatsHandler(sr store.Stats) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + stats, err := sr.List(r.Context()) + if err != nil { + http.Error(w, fmt.Errorf("listing stats: %w", err).Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("content-type", "application/json") + err = json.NewEncoder(w).Encode(stats) + if err != nil { + http.Error(w, fmt.Errorf("encoding payload: %w", err).Error(), http.StatusInternalServerError) + } + }) +} + +func postStatsHandler(sr store.Stats) http.HandlerFunc { + type request struct { + IncomingTraffic int `json:"incoming_traffic"` + OutgoingTraffic int `json:"outgoing_traffic"` + IncomingRPS int `json:"incoming_rps"` + ReadRPS int `json:"read_rps"` + WriteRPS int `json:"write_rps"` + } + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + id := r.PathValue("id") + if id == "" { + http.Error(w, "no id provided", http.StatusBadRequest) + return + } + + var reqbody request + err := json.NewDecoder(r.Body).Decode(&reqbody) + if err != nil { + http.Error(w, fmt.Errorf("decoding body: %w", err).Error(), http.StatusBadRequest) + return + } + + ctx := r.Context() + err = sr.Upsert(ctx, entities.DeviceStatistics{ + ID: entities.DeviceID(id), + IncomingTrafficBytes: reqbody.IncomingTraffic, + OutgoingTrafficBytes: reqbody.OutgoingTraffic, + IncomingRPS: reqbody.IncomingRPS, + ReadRPS: reqbody.ReadRPS, + WriteRPS: reqbody.WriteRPS, + }) + if err != nil { + http.Error(w, fmt.Errorf("upserting stat metric: %w", err).Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + }) +} diff --git a/internal/store/mongo/store.go b/internal/store/mongo/store.go index c221758..e826e03 100644 --- a/internal/store/mongo/store.go +++ b/internal/store/mongo/store.go @@ -37,6 +37,13 @@ type repository struct { client *mongo.Client } +func (r *repository) Close() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + return r.client.Disconnect(ctx) +} + func (r *repository) StatsRepository() statsRepository { return statsRepository{ collection: r.client.Database("bench").Collection("device_stats"), diff --git a/internal/store/pg/store.go b/internal/store/pg/store.go index bd8fafa..4a3a097 100644 --- a/internal/store/pg/store.go +++ b/internal/store/pg/store.go @@ -34,18 +34,23 @@ func (r *repository) StatsRepository() statsRepository { } } +func (r *repository) Close() error { + r.db.Close() + return nil +} + type statsRepository struct { db *pgxpool.Pool } type deviceStatsDB struct { + UpdatedAt time.Time DeviceID string IncomingTraffic int OutgoingTraffic int IncomingRPS int ReadRPS int WriteRPS int - UpdatedAt time.Time } func (s deviceStatsDB) asDomain() entities.DeviceStatistics { diff --git a/pkg/collections/set.go b/pkg/collections/set.go new file mode 100644 index 0000000..08cf4c2 --- /dev/null +++ b/pkg/collections/set.go @@ -0,0 +1,25 @@ +package collections + +type Set[V comparable] map[V]struct{} + +func NewSet[V comparable](values ...V) Set[V] { + out := make(map[V]struct{}, len(values)) + for _, value := range values { + out[value] = struct{}{} + } + + return out +} + +func (s Set[V]) Array() []V { + out := make([]V, 0, len(s)) + for k := range s { + out = append(out, k) + } + return out +} + +func (s Set[V]) Contains(other V) bool { + _, ok := s[other] + return ok +}