package main import ( "context" "log" "os" "os/signal" "strconv" "sync/atomic" "time" "golang.org/x/sync/errgroup" "git.loyso.art/frx/devsim/internal/entities" "git.loyso.art/frx/devsim/internal/interconnect/collector" ) var requestsDone = atomic.Uint64{} func requestReporter(ctx context.Context) { ticker := time.Tick(time.Second) for { select { case <-ticker: case <-ctx.Done(): return } requests := requestsDone.Swap(0) log.Printf("rps: %d", requests) } } func main() { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) defer cancel() dstAddr := os.Getenv("DEVSIM_HTTP_ADDR") deviceCountStr := os.Getenv("DEVSIM_DEVICE_COUNT") delayStr := os.Getenv("DEVSIM_REQUEST_DELAY") deviceCount, err := strconv.Atoi(deviceCountStr) if err != nil { log.Fatalf("parsing device count: %v", err) } if dstAddr == "" { log.Fatal("no destination address provided") } delay, err := time.ParseDuration(delayStr) if err != nil { log.Fatalf("parsing delay duration: %v", err) } log.Printf("running application with settings: destination=%s device_count=%d delay=%s", dstAddr, deviceCount, delay) client, err := collector.New(dstAddr) if err != nil { log.Fatalf("unable to create collector http client: %v", err) } eg, egctx := errgroup.WithContext(ctx) eg.Go(func() error { requestReporter(egctx) return nil }) for i := 0; i < deviceCount; i++ { dh := newDeviceHandler(i+1, delay, client) eg.Go(func() error { dh.loop(egctx) return nil }) } err = eg.Wait() if err != nil { log.Printf("error during execution: %v", err) cancel() } } type deviceHandler struct { stats entities.DeviceStatistics client collector.Client delay time.Duration } func newDeviceHandler(id int, delay time.Duration, client collector.Client) *deviceHandler { deviceID := entities.DeviceID(strconv.Itoa(id)) dh := deviceHandler{ delay: delay, client: client, } dh.stats.ID = deviceID return &dh } func (h *deviceHandler) loop(ctx context.Context) { failedCount := 0 for { start := time.Now() h.stats.IncomingTrafficBytes++ h.stats.OutgoingTrafficBytes++ h.stats.WriteRPS = (h.stats.WriteRPS + 2) % 255 h.stats.ReadRPS = (h.stats.ReadRPS + 1) % 255 h.stats.IncomingRPS = h.stats.WriteRPS + h.stats.ReadRPS err := h.client.Upsert(ctx, h.stats) if err != nil { log.Printf("%q: unable to upsert metrics: %v", h.stats.ID, err) failedCount++ if failedCount > 10 { log.Println("too much fails, exiting") return } continue } requestsDone.Add(1) failedCount = 0 elapsed := time.Since(start) left := h.delay - elapsed if left > 0 { select { case <-ctx.Done(): return case <-time.After(left): continue } } else { if ctx.Err() != nil { return } } } }