From 25762cbae855af03d1cff4abe20107e83062ff8a Mon Sep 17 00:00:00 2001 From: Aleksandr Trushkin Date: Tue, 13 Aug 2024 23:39:44 +0300 Subject: [PATCH] improve upsert perfomance --- cmd/simulator/main.go | 26 +++++++++++---- dockers/both/Dockerfile | 49 +++++++++++++++++++++++++++ dockers/migrator/Dockerfile | 25 -------------- dockers/web/Dockerfile | 24 -------------- internal/api/http/handlerbuilder.go | 15 ++++----- internal/store/mongo/store.go | 51 +++++++++++++++++++++-------- internal/store/pg/store.go | 8 ++--- makefile | 12 +++++-- 8 files changed, 127 insertions(+), 83 deletions(-) create mode 100644 dockers/both/Dockerfile delete mode 100644 dockers/migrator/Dockerfile delete mode 100644 dockers/web/Dockerfile diff --git a/cmd/simulator/main.go b/cmd/simulator/main.go index 3322161..3deec38 100644 --- a/cmd/simulator/main.go +++ b/cmd/simulator/main.go @@ -2,9 +2,11 @@ package main import ( "context" + "errors" "log" "os" "os/signal" + "runtime" "strconv" "sync/atomic" "time" @@ -38,18 +40,26 @@ func main() { deviceCountStr := os.Getenv("DEVSIM_DEVICE_COUNT") delayStr := os.Getenv("DEVSIM_REQUEST_DELAY") - deviceCount, err := strconv.Atoi(deviceCountStr) - if err != nil { - log.Fatalf("parsing device count: %v", err) + deviceCount := runtime.GOMAXPROCS(0) + if deviceCountStr != "" { + var err error + deviceCount, err = strconv.Atoi(deviceCountStr) + if err != nil { + log.Fatalf("parsing device count: %v", err) + } } if dstAddr == "" { log.Fatal("no destination address provided") } - delay, err := time.ParseDuration(delayStr) - if err != nil { - log.Fatalf("parsing delay duration: %v", err) + delay := time.Millisecond * 20 + if delayStr != "" { + var err error + delay, err = time.ParseDuration(delayStr) + if err != nil { + log.Fatalf("parsing delay duration: %v", err) + } } log.Printf("running application with settings: destination=%s device_count=%d delay=%s", dstAddr, deviceCount, delay) @@ -113,6 +123,10 @@ func (h *deviceHandler) loop(ctx context.Context) { err := h.client.Upsert(ctx, h.stats) if err != nil { + if errors.Is(err, context.Canceled) { + return + } + log.Printf("%q: unable to upsert metrics: %v", h.stats.ID, err) failedCount++ if failedCount > 10 { diff --git a/dockers/both/Dockerfile b/dockers/both/Dockerfile new file mode 100644 index 0000000..c533d30 --- /dev/null +++ b/dockers/both/Dockerfile @@ -0,0 +1,49 @@ +FROM golang:1.22-alpine as base + +ARG VERSION="unknown" +ARG REVISION="unknown" +ARG BUILDTIME="" + +WORKDIR /go/src/git.loyso.art/frx/devsim + +ENV GOCACHE=/go/pkg/mod/ + +RUN --mount=type=cache,target=/go/pkg/mod/ \ + --mount=type=bind,source=go.sum,target=go.sum \ + --mount=type=bind,source=go.mod,target=go.mod \ + go mod download -x && go mod verify +COPY . . + +FROM base as build-web +RUN --mount=type=cache,target=/go/pkg/mod/ \ + go build \ + -ldflags "-w -s -X 'git.loyso.art/frx/devsim.Version=${VERSION}' -X 'git.loyso.art/frx/devsim.Revision=${REVISION}' -X 'git.loyso.art/frx/devsim.BuildTime=${BUILDTIME}'" \ + -o /go/bin/web \ + /go/src/git.loyso.art/frx/devsim/cmd/web/main.go + +FROM base as build-migrator +RUN --mount=type=cache,target=/go/pkg/mod/ \ + go build -o /go/bin/migrator \ + /go/src/git.loyso.art/frx/devsim/cmd/migrator/main.go + +FROM gcr.io/distroless/static-debian12@sha256:ce46866b3a5170db3b49364900fb3168dc0833dfb46c26da5c77f22abb01d8c3 as web + +WORKDIR /app +COPY --from=build-web /go/bin/web /app/web + +ENV DEVSIM_HTTP_ADDR=":80" +EXPOSE 80 + +ENTRYPOINT ["/app/web"] + +FROM gcr.io/distroless/static-debian12@sha256:ce46866b3a5170db3b49364900fb3168dc0833dfb46c26da5c77f22abb01d8c3 as migrator + +WORKDIR /app +COPY --from=build-migrator /go/bin/migrator /app/migrator + +COPY assets/db/migrations/ /app/db/migrations/ +ENV DEVSIM_PG_MIGRATOR="/app/migrations" + +ENTRYPOINT ["/app/migrator"] + + diff --git a/dockers/migrator/Dockerfile b/dockers/migrator/Dockerfile deleted file mode 100644 index 017e01c..0000000 --- a/dockers/migrator/Dockerfile +++ /dev/null @@ -1,25 +0,0 @@ -FROM golang:1.22-alpine as golang - -ARG VERSION="unknown" -ARG REVISION="unknown" -ARG BUILDTIME="" - -WORKDIR /go/src/git.loyso.art/frx/devsim -COPY . . - -RUN go mod download && \ - go mod verify && \ - CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \ - -o /go/bin/migrator /go/src/git.loyso.art/frx/devsim/cmd/migrator/main.go - -FROM gcr.io/distroless/static-debian12@sha256:ce46866b3a5170db3b49364900fb3168dc0833dfb46c26da5c77f22abb01d8c3 - -WORKDIR /app -COPY --from=golang /go/bin/migrator /app/migrator -COPY assets/db/migrations/ /app/db/migrations/ - -ENV DEVSIM_PG_MIGRATOR="/app/migrations" - -ENTRYPOINT ["/app/migrator"] - - diff --git a/dockers/web/Dockerfile b/dockers/web/Dockerfile deleted file mode 100644 index b6e9c41..0000000 --- a/dockers/web/Dockerfile +++ /dev/null @@ -1,24 +0,0 @@ -FROM golang:1.22-alpine as golang - -ARG VERSION="unknown" -ARG REVISION="unknown" -ARG BUILDTIME="" - -WORKDIR /go/src/git.loyso.art/frx/devsim -COPY . . - -RUN go mod download && \ - go mod verify && \ - CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags "-w -s -X 'git.loyso.art/frx/devsim.Version=${VERSION}' -X 'git.loyso.art/frx/devsim.Revision=${REVISION}' -X 'git.loyso.art/frx/devsim.BuildTime=${BUILDTIME}'" \ - -o /go/bin/app /go/src/git.loyso.art/frx/devsim/cmd/web/main.go - -FROM gcr.io/distroless/static-debian12@sha256:ce46866b3a5170db3b49364900fb3168dc0833dfb46c26da5c77f22abb01d8c3 - -WORKDIR /app -COPY --from=golang /go/bin/app /app/web - -ENV DEVSIM_HTTP_ADDR=":80" -EXPOSE 80 - -ENTRYPOINT ["/app/web"] - diff --git a/internal/api/http/handlerbuilder.go b/internal/api/http/handlerbuilder.go index f594d72..3368d18 100644 --- a/internal/api/http/handlerbuilder.go +++ b/internal/api/http/handlerbuilder.go @@ -20,15 +20,14 @@ func NewHandlersBuilder() *handlersBuilder { // MountStatsHandlers mounts stats related handlers. func (h *handlersBuilder) MountStatsHandlers(sr store.Stats, log *slog.Logger) { - log = log.With(slog.String("api", "http")) + // log = log.With(slog.String("api", "http")) + // mws := multipleMiddlewares( + // middlewarePanicRecovery(log), + // middlewareLogger(log), + // ) - mws := multipleMiddlewares( - middlewarePanicRecovery(log), - middlewareLogger(log), - ) - - h.mux.Handle("/api/v1/stats/", mws(listStatsHandler(sr))) - h.mux.Handle("/api/v1/stats/{id}", mws(postStatsHandler(sr))) + h.mux.Handle("/api/v1/stats/", listStatsHandler(sr)) + h.mux.Handle("/api/v1/stats/{id}", postStatsHandler(sr)) } func (s *handlersBuilder) MountProfileHandlers() { diff --git a/internal/store/mongo/store.go b/internal/store/mongo/store.go index 903b65b..850ff96 100644 --- a/internal/store/mongo/store.go +++ b/internal/store/mongo/store.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "git.loyso.art/frx/devsim/internal/entities" @@ -30,11 +31,16 @@ func Dial(ctx context.Context, uri string) (*repository, error) { return nil, fmt.Errorf("pinging mongo database: %w", err) } - return &repository{client: client}, nil + updateOptions := options.Update().SetUpsert(true) + return &repository{ + client: client, + updateOptions: updateOptions, + }, nil } type repository struct { - client *mongo.Client + client *mongo.Client + updateOptions *options.UpdateOptions } func (r *repository) Close() error { @@ -46,22 +52,24 @@ func (r *repository) Close() error { func (r *repository) StatsRepository() statsRepository { return statsRepository{ - collection: r.client.Database("bench").Collection("device_stats"), + collection: r.client.Database("bench").Collection("device_stats"), + updateOptions: r.updateOptions, } } type statsRepository struct { - collection *mongo.Collection + collection *mongo.Collection + updateOptions *options.UpdateOptions } type deviceStatsDB struct { + UpdatedAt time.Time `bson:"updated_at"` DeviceID string `bson:"_id"` IncomingTraffic int `bson:"inc_traffic"` OutgoinfTraffic int `bson:"out_traffic"` IncomingRPS int `bson:"inc_rps"` ReadRPS int `bson:"read_rps"` WriteRPS int `bson:"write_rps"` - UpdatedAt time.Time `bson:"updated_at"` } func (s deviceStatsDB) asDomain() entities.DeviceStatistics { @@ -76,15 +84,26 @@ func (s deviceStatsDB) asDomain() entities.DeviceStatistics { } } -func (r statsRepository) Upsert(ctx context.Context, stats entities.DeviceStatistics) error { - opts := options.Update().SetUpsert(true) +var bsonSyncPool = sync.Pool{ + New: func() any { + m := make(bson.M, 1) + return &m + }, +} - filter := bson.D{ - { - Key: "_id", - Value: stats.ID, - }, - } +func getPoolD() *bson.M { + return bsonSyncPool.Get().(*bson.M) +} + +func putPoolD(m *bson.M) { + bsonSyncPool.Put(m) +} + +type filterByID struct { + ID entities.DeviceID `bson:"_id"` +} + +func (r statsRepository) Upsert(ctx context.Context, stats entities.DeviceStatistics) error { document := deviceStatsDB{ DeviceID: string(stats.ID), IncomingTraffic: stats.IncomingTrafficBytes, @@ -95,9 +114,13 @@ func (r statsRepository) Upsert(ctx context.Context, stats entities.DeviceStatis UpdatedAt: time.Now(), } + // updatePtr := getPoolD() + // update := *updatePtr + // defer putPoolD(updatePtr) + update := bson.M{"$set": document} - _, err := r.collection.UpdateOne(ctx, filter, update, opts) + _, err := r.collection.UpdateOne(ctx, filterByID{ID: stats.ID}, update, r.updateOptions) if err != nil { return fmt.Errorf("inserting: %w", err) } diff --git a/internal/store/pg/store.go b/internal/store/pg/store.go index ebeec84..bc2f652 100644 --- a/internal/store/pg/store.go +++ b/internal/store/pg/store.go @@ -31,7 +31,7 @@ type repository struct { func (r *repository) StatsRepository() statsRepository { return statsRepository{ - db: r.db, + q: queries.New(r.db), } } @@ -41,7 +41,7 @@ func (r *repository) Close() error { } type statsRepository struct { - db *pgxpool.Pool + q *queries.Queries } type deviceStatsDB struct { @@ -67,7 +67,7 @@ func (s deviceStatsDB) asDomain() entities.DeviceStatistics { } func (r statsRepository) Upsert(ctx context.Context, stats entities.DeviceStatistics) error { - err := queries.New(r.db).UpsertDeviceMetrics(ctx, queries.UpsertDeviceMetricsParams{ + err := r.q.UpsertDeviceMetrics(ctx, queries.UpsertDeviceMetricsParams{ DeviceID: string(stats.ID), IncTraffic: int32(stats.IncomingTrafficBytes), OutTraffic: int32(stats.OutgoingTrafficBytes), @@ -83,7 +83,7 @@ func (r statsRepository) Upsert(ctx context.Context, stats entities.DeviceStatis } func (r statsRepository) List(ctx context.Context) (out []entities.DeviceStatistics, err error) { - stats, err := queries.New(r.db).ListDeviceStats(ctx) + stats, err := r.q.ListDeviceStats(ctx) if err != nil { return nil, fmt.Errorf("listing device stats: %w", err) } diff --git a/makefile b/makefile index 19a9e93..a337bcf 100644 --- a/makefile +++ b/makefile @@ -2,7 +2,10 @@ VERSION=$(shell git tag --sort=v:refname 2>/dev/null | head -n1) REVISION=$(shell git rev-parse --short HEAD) BUILDTIME=$(shell date -u +%FT%T) +PHONY: build.docker.web build.docker.migrator build.docker + build.docker: build.docker.web build.docker.migrator + echo "finished" build.docker.web: docker build\ @@ -10,7 +13,8 @@ build.docker.web: --build-arg REVISION=${REVISION}\ --build-arg BUILDTIME=${BUILDTIME}\ -t git.loyso.art/devsim-web:latest\ - -f ./dockers/web/Dockerfile\ + -f ./dockers/both/Dockerfile\ + --target=web\ . build.docker.migrator: @@ -19,7 +23,8 @@ build.docker.migrator: --build-arg REVISION=${REVISION}\ --build-arg BUILDTIME=${BUILDTIME}\ -t git.loyso.art/devsim-migrator:latest\ - -f ./dockers/migrator/Dockerfile\ + -f ./dockers/both/Dockerfile\ + --target=migrator\ . build: @@ -27,3 +32,6 @@ build: -ldflags "-w -s -X 'git.loyso.art/frx/devsim.version=${VERSION}' -X 'git.loyso.art/frx/devsim.revision=${REVISION}' -X 'git.loyso.art/frx/devsim.buildTime=${BUILDTIME}Z'" \ -o bin/web ./cmd/web/*.go +build.client: + CGO_ENABALED=0 go build \ + -o bin/client ./cmd/simulator/main.go