diff --git a/cmd/utility/main.go b/cmd/utility/main.go new file mode 100644 index 0000000..f363836 --- /dev/null +++ b/cmd/utility/main.go @@ -0,0 +1,63 @@ +package main + +import ( + "fmt" + "net" + "net/http" + "os" +) + +func main() { + monitorAddress := os.Getenv("DEVSIM_MONITOR_ADDR") + if monitorAddress == "" { + return + } + + host, port, err := net.SplitHostPort(monitorAddress) + if err != nil { + fmt.Fprintf(os.Stderr, "provided address is invalid: %v", err) + os.Exit(1) + } + + if host == "" { + host = "127.0.0.1" + } + + if len(os.Args) == 1 { + fmt.Fprintf(os.Stderr, "no command provided") + os.Exit(1) + } + + destinationAddress := net.JoinHostPort(host, port) + + cmd := os.Args[1] + var resp *http.Response + switch cmd { + case "health": + resp, err = http.Get("http://" + destinationAddress + "/health") + case "ready": + resp, err = http.Get("http://" + destinationAddress + "/ready") + default: + fmt.Fprintf(os.Stderr, "unable to check %s", cmd) + os.Exit(1) + } + if err != nil { + fmt.Fprintf(os.Stderr, "unable to proceed request %v", err) + os.Exit(1) + } + + code := resp.StatusCode + status := resp.Header.Get("X-Readiness-Probe") + + if code != http.StatusOK { + println("code not ok") + os.Exit(1) + } + + if status != "" && status != "ok" { + println("status not ok") + os.Exit(1) + } + + println("ok") +} diff --git a/cmd/web/main.go b/cmd/web/main.go index 8de5a2e..282385c 100644 --- a/cmd/web/main.go +++ b/cmd/web/main.go @@ -8,27 +8,30 @@ import ( "log/slog" "os" "os/signal" + "time" "golang.org/x/sync/errgroup" "git.loyso.art/frx/devsim" "git.loyso.art/frx/devsim/internal/api/http" + "git.loyso.art/frx/devsim/internal/probe" "git.loyso.art/frx/devsim/internal/store" + "git.loyso.art/frx/devsim/internal/store/memory" "git.loyso.art/frx/devsim/internal/store/mongo" "git.loyso.art/frx/devsim/internal/store/pg" "git.loyso.art/frx/devsim/pkg/collections" ) var availableStoreTypes = collections.NewSet([]string{ - "pg", "mongo", + "pg", "mongo", "memory", }...) func main() { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) defer cancel() - log := slog.New(slog.NewJSONHandler(io.Discard, &slog.HandlerOptions{ - Level: slog.LevelInfo, + log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelDebug, })) log.InfoContext( @@ -79,8 +82,9 @@ func (s *pgSettings) fromEnv() { } type applicationSettings struct { - listenAddr string - storeType string + listenAddr string + monitorAddr string + storeType string pg pgSettings mongo mongoSettings @@ -88,9 +92,11 @@ type applicationSettings struct { func loadConfigFromEnv() applicationSettings { const webaddr = ":9123" + const monitoraddr = ":9124" var cfg applicationSettings cfg.listenAddr = valueOrDefault(os.Getenv("DEVSIM_HTTP_ADDR"), webaddr) + cfg.monitorAddr = valueOrDefault(os.Getenv("DEVSIM_MONITOR_ADDR"), monitoraddr) cfg.storeType = os.Getenv("DEVSIM_STORE_TYPE") cfg.pg.fromEnv() @@ -113,6 +119,8 @@ func (s applicationSettings) validate() (err error) { if s.mongo.DSN == "" { err = errors.Join(err, errors.New("no mongo dsn provided")) } + case "memory": + // no things to validate } if s.listenAddr == "" { @@ -131,6 +139,28 @@ func app(ctx context.Context, settings applicationSettings, log *slog.Logger) (e var repo store.Stats var closers []namedCloser + livenessBase, livenessToggle := probe.SimpleLivenessSwitcher() + readinessBase, readinessToggle := probe.SimpleReadinessSwitcher() + + pr := probe.NewReporter(time.Second * 15) + pr.RegisterLiveness(livenessBase) + pr.RegisterReadiness(readinessBase) + + mb := http.NewHandlersBuilder() + mb.MountProbeHandlers(pr) + + monitorServer := http.NewServer(settings.monitorAddr) + closers = append(closers, namedCloser{ + name: "monitorhttp", + closer: monitorServer, + }) + monitorServer.RegisterHandler(mb.Build()) + + eg, _ := errgroup.WithContext(ctx) + eg.Go(func() error { + return monitorServer.Run() + }) + switch settings.storeType { case "pg": pgconn, errDial := pg.Dial(ctx, settings.pg.DSN) @@ -154,6 +184,8 @@ func app(ctx context.Context, settings applicationSettings, log *slog.Logger) (e name: "mongo", closer: mongoconn, }) + case "memory": + repo = memory.NewStore() } hb := http.NewHandlersBuilder() @@ -166,22 +198,32 @@ func app(ctx context.Context, settings applicationSettings, log *slog.Logger) (e }) httpServer.RegisterHandler(hb.Build()) - eg, _ := errgroup.WithContext(ctx) eg.Go(func() error { return httpServer.Run() }) + eg.Go(func() error { + <-ctx.Done() + log.InfoContext(ctx, "got interruption signal") + for _, closer := range closers { + name := closer.name + closerUnit := closer.closer + errClose := closerUnit.Close() + if errClose != nil { + log.ErrorContext(ctx, "unable to close component", slog.String("component", name), slog.Any("err", errClose)) + } + } + return nil + }) + + livenessToggle() + readinessToggle(probe.ReadinessOk) err = eg.Wait() if err != nil { - log.ErrorContext(ctx, "unable to proceed the app", slog.Any("err", err)) - } - - for _, closer := range closers { - name := closer.name - closerUnit := closer.closer - errClose := closerUnit.Close() - if errClose != nil { - log.ErrorContext(ctx, "unable to close component", slog.String("component", name), slog.Any("err", errClose)) + if !errors.Is(err, context.Canceled) { + log.ErrorContext(ctx, "unable to proceed the app", slog.Any("err", err)) + } else { + log.InfoContext(ctx, "finished processing apps") } } diff --git a/compose.yaml b/compose.yaml index 1ef6d3f..67c0511 100644 --- a/compose.yaml +++ b/compose.yaml @@ -1,4 +1,17 @@ services: + web.mem: + image: git.loyso.art/devsim-web:latest + ports: + - 9125:80 + environment: + DEVSIM_STORE_TYPE: memory + healthcheck: + test: ["CMD", "/app/utils", "ready"] + interval: 10s + retries: 5 + start_period: 5s + timeout: 10s + web.mongo: image: git.loyso.art/devsim-web:latest ports: @@ -9,6 +22,12 @@ services: environment: DEVSIM_MONGO_DSN: mongodb://mongo DEVSIM_STORE_TYPE: mongo + healthcheck: + test: ["CMD", "/app/utils", "ready"] + interval: 10s + retries: 5 + start_period: 5s + timeout: 10s web.pg: image: git.loyso.art/devsim-web:latest @@ -21,6 +40,12 @@ services: environment: DEVSIM_PG_DSN: "postgres://devsim:devsim@postgres:5432/devsim?sslmode=disable" DEVSIM_STORE_TYPE: pg + healthcheck: + test: ["CMD", "/app/utils", "ready"] + interval: 10s + retries: 5 + start_period: 5s + timeout: 10s postgres-migrator: image: git.loyso.art/devsim-migrator:latest diff --git a/dockers/both/Dockerfile b/dockers/both/Dockerfile index c533d30..b8c0883 100644 --- a/dockers/both/Dockerfile +++ b/dockers/both/Dockerfile @@ -14,6 +14,12 @@ RUN --mount=type=cache,target=/go/pkg/mod/ \ go mod download -x && go mod verify COPY . . +FROM base as build-utils +RUN --mount=type=cache,target=/go/pkg/mod/ \ + go build \ + -o /go/bin/utils \ + /go/src/git.loyso.art/frx/devsim/cmd/utility/main.go + FROM base as build-web RUN --mount=type=cache,target=/go/pkg/mod/ \ go build \ @@ -30,10 +36,17 @@ FROM gcr.io/distroless/static-debian12@sha256:ce46866b3a5170db3b49364900fb3168dc WORKDIR /app COPY --from=build-web /go/bin/web /app/web +COPY --from=build-utils /go/bin/utils /app/utils ENV DEVSIM_HTTP_ADDR=":80" EXPOSE 80 +ENV DEVSIM_MONITOR_ADDR=":8080" +EXPOSE 8080 + +HEALTHCHECK --interval=10s --timeout=3s \ + CMD ["/app/utils", "health"] + ENTRYPOINT ["/app/web"] FROM gcr.io/distroless/static-debian12@sha256:ce46866b3a5170db3b49364900fb3168dc0833dfb46c26da5c77f22abb01d8c3 as migrator diff --git a/internal/api/http/handlerbuilder.go b/internal/api/http/handlerbuilder.go index 3368d18..526b6e6 100644 --- a/internal/api/http/handlerbuilder.go +++ b/internal/api/http/handlerbuilder.go @@ -5,6 +5,7 @@ import ( "net/http" "net/http/pprof" + "git.loyso.art/frx/devsim/internal/probe" "git.loyso.art/frx/devsim/internal/store" ) @@ -36,6 +37,11 @@ func (s *handlersBuilder) MountProfileHandlers() { s.mux.HandleFunc("/debug/pprof/trace", pprof.Trace) } +func (s *handlersBuilder) MountProbeHandlers(r probe.Reporter) { + s.mux.HandleFunc("/health", livenessHandler(r)) + s.mux.HandleFunc("/ready", readinessHandler(r)) +} + func (s *handlersBuilder) Build() http.Handler { return s.mux } diff --git a/internal/api/http/healthhandlers.go b/internal/api/http/healthhandlers.go new file mode 100644 index 0000000..2dcfcdc --- /dev/null +++ b/internal/api/http/healthhandlers.go @@ -0,0 +1,55 @@ +package http + +import ( + "net/http" + + "git.loyso.art/frx/devsim/internal/probe" +) + +// ListenAndServe runs server to accept incoming connections. This function blocks on +// handling connections. +func livenessHandler(reporter probe.Reporter) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + switch reporter.ReportLiveness() { + case probe.LivenessOk: + w.WriteHeader(http.StatusOK) + case probe.LivenessTimeout: + w.WriteHeader(http.StatusInternalServerError) + case probe.LivenessUnknown: + w.WriteHeader(http.StatusOK) + } + }) +} + +func readinessHandler(reporter probe.Reporter) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + var status string + var code int + switch reporter.ReportReadiness() { + case probe.ReadinessOk: + status = "ok" + code = http.StatusOK + case probe.ReadinessFailed: + status = "failed" + code = http.StatusInternalServerError + case probe.ReadinessNotReady: + status = "not-ready" + code = http.StatusOK + case probe.ReadinessUnknown: + status = "unknown" + code = http.StatusOK + } + w.Header().Set("X-Readiness-Status", status) + w.WriteHeader(code) + }) +} diff --git a/internal/interconnect/collector/client.go b/internal/interconnect/collector/client.go index 06a138b..3916e1a 100644 --- a/internal/interconnect/collector/client.go +++ b/internal/interconnect/collector/client.go @@ -39,6 +39,7 @@ func New(addr string) (*client, error) { KeepAlive: time.Second * 30, }).DialContext, MaxIdleConns: 10, + MaxConnsPerHost: 100, IdleConnTimeout: time.Second * 90, TLSHandshakeTimeout: time.Second * 5, ExpectContinueTimeout: time.Second * 1, diff --git a/internal/probe/liveness.go b/internal/probe/liveness.go new file mode 100644 index 0000000..efaa9e2 --- /dev/null +++ b/internal/probe/liveness.go @@ -0,0 +1,13 @@ +package probe + +// Liveness reports the application is alive. +type Liveness int8 + +const ( + // LivenessUnknown reports nothing. + LivenessUnknown Liveness = iota + // LivenessOk reports service is alive. + LivenessOk + // LivenessTimeout reports service was unable to answer at a time. + LivenessTimeout +) diff --git a/internal/probe/readiness.go b/internal/probe/readiness.go new file mode 100644 index 0000000..ee15557 --- /dev/null +++ b/internal/probe/readiness.go @@ -0,0 +1,30 @@ +package probe + +// Readiness reports compoent's readiness. +type Readiness int8 + +const ( + // ReadinessUnknown means rediness was unset. + ReadinessUnknown Readiness = iota + // ReadinessNotReady reports provided component is not ready. + ReadinessNotReady + // ReadinessFailed reports there were a problem with component. + ReadinessFailed + // ReadinessOk reports the component is ready to work. + ReadinessOk +) + +type ReadinessAggregate []Readiness + +func (a ReadinessAggregate) Status() Readiness { + for _, item := range a { + switch item { + case ReadinessOk, ReadinessUnknown: + continue + } + + return item + } + + return ReadinessOk +} diff --git a/internal/probe/reporter.go b/internal/probe/reporter.go new file mode 100644 index 0000000..78c66f1 --- /dev/null +++ b/internal/probe/reporter.go @@ -0,0 +1,119 @@ +package probe + +import ( + "context" + "sync" + "sync/atomic" + "time" +) + +type ( + ReadinessFunc func() Readiness + LivenessFunc func(context.Context) Liveness + + ReadinessAggregateFuncs []ReadinessFunc +) + +func (fs ReadinessAggregateFuncs) check() (a ReadinessAggregate) { + a = make(ReadinessAggregate, 0, len(fs)) + for _, f := range fs { + a = append(a, f()) + } + + return a +} + +type Reporter interface { + ReportReadiness() Readiness + ReportLiveness() Liveness + + RegisterReadiness(ReadinessFunc) + RegisterLiveness(LivenessFunc) +} + +func NewReporter(livenessTimeout time.Duration) *reporter { + return &reporter{ + livenessTimeout: livenessTimeout, + } +} + +type reporter struct { + readinessComponents ReadinessAggregateFuncs + livenessComponents []LivenessFunc + + livenessTimeout time.Duration + + mu sync.Mutex + livemu sync.Mutex + readmu sync.Mutex +} + +func (r *reporter) ReportReadiness() Readiness { + r.readmu.Lock() + defer r.readmu.Unlock() + + return r.readinessComponents.check().Status() +} + +func (r *reporter) ReportLiveness() (out Liveness) { + r.livemu.Lock() + defer r.livemu.Unlock() + + ctx, cancel := context.WithTimeout(context.Background(), r.livenessTimeout) + defer cancel() + + for _, f := range r.livenessComponents { + status := f(ctx) + if status == LivenessTimeout { + return status + } + } + + return LivenessOk +} + +func (r *reporter) RegisterReadiness(f ReadinessFunc) { + r.readmu.Lock() + defer r.readmu.Unlock() + + r.readinessComponents = append(r.readinessComponents, f) +} + +func (r *reporter) RegisterLiveness(f LivenessFunc) { + r.livemu.Lock() + defer r.livemu.Unlock() + + r.livenessComponents = append(r.livenessComponents, f) +} + +func SimpleReadinessSwitcher() (f ReadinessFunc, toggle func(newStatus Readiness)) { + var status atomic.Int32 + + f = func() Readiness { + return Readiness(status.Load()) + } + + toggle = func(newStatus Readiness) { + status.Store(int32(newStatus)) + } + + return f, toggle +} + +func SimpleLivenessSwitcher() (f LivenessFunc, toggle func()) { + var liveness atomic.Bool + + f = func(context.Context) Liveness { + if liveness.Load() { + return LivenessOk + } + + return LivenessUnknown + } + + toggle = func() { + liveness.Store(true) + } + + return f, toggle +} diff --git a/internal/store/memory/store.go b/internal/store/memory/store.go new file mode 100644 index 0000000..2e4f93e --- /dev/null +++ b/internal/store/memory/store.go @@ -0,0 +1,40 @@ +package memory + +import ( + "context" + "sync" + + "git.loyso.art/frx/devsim/internal/entities" +) + +type store struct { + stats map[entities.DeviceID]entities.DeviceStatistics + mu sync.Mutex +} + +func NewStore() *store { + return &store{ + stats: make(map[entities.DeviceID]entities.DeviceStatistics), + } +} + +func (s *store) List(ctx context.Context) ([]entities.DeviceStatistics, error) { + s.mu.Lock() + defer s.mu.Unlock() + + out := make([]entities.DeviceStatistics, 0, len(s.stats)) + for _, s := range s.stats { + out = append(out, s) + } + + return out, nil +} + +func (s *store) Upsert(ctx context.Context, stats entities.DeviceStatistics) error { + s.mu.Lock() + defer s.mu.Unlock() + + s.stats[stats.ID] = stats + + return nil +}