diff --git a/.gitignore b/.gitignore index f4d432a..0010f27 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,4 @@ # Dependency directories (remove the comment below to include it) # vendor/ - +*.json diff --git a/cmd/simple/config.go b/cmd/simple/config.go new file mode 100644 index 0000000..4cec05a --- /dev/null +++ b/cmd/simple/config.go @@ -0,0 +1,37 @@ +package main + +import ( + "encoding/json" + "os" +) + +type config struct { + DSN string `json:"dsn"` + Topic string `json:"topic"` +} + +func parseEnvConfig() (cfg config) { + const defaultPath = "./cli.json" + + data, err := os.ReadFile(defaultPath) + if err != nil { + if !os.IsNotExist(err) { + panic(err.Error()) + } + } else { + err := json.Unmarshal(data, &cfg) + if err != nil { + panic(err.Error()) + } + } + + if dsn := os.Getenv("SMTH_DSN"); dsn != "" { + cfg.DSN = dsn + } + + if topic := os.Getenv("SMTH_TOPIC"); topic != "" { + cfg.Topic = topic + } + + return cfg +} diff --git a/cmd/simple/main.go b/cmd/simple/main.go new file mode 100644 index 0000000..2d732e6 --- /dev/null +++ b/cmd/simple/main.go @@ -0,0 +1,232 @@ +package main + +import ( + "context" + "errors" + "fmt" + "os" + "os/signal" + "runtime" + "sync/atomic" + "time" + + "git.loyso.art/frx/smthqueue/internal/postgres" + + "github.com/spf13/cobra" + "golang.org/x/exp/slog" + "golang.org/x/sync/errgroup" +) + +func main() { + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancel() + + err := app(ctx) + if err != nil { + println("error running app: " + err.Error()) + os.Exit(1) + } +} + +func app(ctx context.Context) error { + log := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) + + cfg := parseEnvConfig() + + setupCommands(cfg, log) + + err := rootCommand.ExecuteContext(ctx) + if err != nil { + return err + } + + return nil +} + +func setupCommands(cfg config, log *slog.Logger) { + rootCommand.AddCommand( + clientCommand, + ) + + clientCommand.AddCommand( + clientConsumerCommand, + clientProducerCommand, + ) + + app := application{ + cfg: cfg, + log: log.With(slog.String("app", "cli")), + } + + clientConsumerCommand.RunE = app.handleConsumerCommand + clientProducerCommand.RunE = app.handleProducerCommand +} + +var rootCommand = &cobra.Command{ + Use: "smthqueue", + Short: "entry point for commands", +} + +var clientCommand = &cobra.Command{ + Use: "client", + Short: "a simple client for testing", +} + +var clientConsumerCommand = &cobra.Command{ + Use: "consumer [topic]", + Short: "runs application in consumer mode", + RunE: nil, +} + +var clientProducerCommand = &cobra.Command{ + Use: "producer [topic]", + Short: "runs application in producer mode", + RunE: nil, +} + +type application struct { + cfg config + log *slog.Logger +} + +type rpsReporter struct { + log *slog.Logger + accumulator int32 + db postgres.Client +} + +func (r *rpsReporter) inc() { + atomic.AddInt32(&r.accumulator, 1) +} + +func (r *rpsReporter) reset() int32 { + return atomic.SwapInt32(&r.accumulator, 0) +} + +func (r *rpsReporter) loop() { + ticker := time.NewTicker(time.Second) + for range ticker.C { + rps := r.reset() + dbStats := r.db.GetStats() + r.log.Info( + "tick", + slog.Int64("rps", int64(rps)), + slog.Int64("db_total_conns", int64(dbStats.TotalConnections)), + slog.Int64("db_acquried_conns", int64(dbStats.AcquiredConnections)), + slog.Int64("db_idle_conns", int64(dbStats.IdleConnections)), + ) + } +} + +func (a *application) handleConsumerCommand(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + pg, err := a.getDB(ctx) + if err != nil { + return err + } + + reporter := &rpsReporter{ + log: a.log, + db: pg, + } + + go reporter.loop() + + eg, egctx := errgroup.WithContext(ctx) + + queueClient := pg.Queue() + + workers := runtime.NumCPU() + a.log.InfoContext(egctx, "running workers", slog.Int("count", workers)) + for i := 0; i < workers; i++ { + eg.Go(func() error { + for { + message, err := queueClient.Dequeue(egctx, postgres.DequeueParams{ + Topic: a.cfg.Topic, + Timeout: time.Second, + }) + if err != nil { + if errors.Is(err, postgres.ErrNoMessage) { + time.Sleep(time.Second / 1) + continue + } + + a.log.Error("unable to dequeue message", slog.Any("err", err)) + + return nil + } + + a.log.DebugContext(egctx, "scheduling new message", slog.String("message_id", message.ID)) + + a.log.DebugContext(egctx, "handling new message", slog.Any("message", message)) + err = queueClient.Ack(egctx, message) + if err != nil { + a.log.ErrorContext(egctx, "unable to ack message", slog.Any("err", err)) + continue + } + + reporter.inc() + + a.log.DebugContext(egctx, "message acked", slog.String("message_id", message.ID)) + } + }) + + } + + return eg.Wait() +} + +func (a *application) handleProducerCommand(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + pg, err := a.getDB(ctx) + if err != nil { + return err + } + + queueClient := pg.Queue() + eg, egctx := errgroup.WithContext(ctx) + workers := runtime.NumCPU() + for i := 0; i < workers; i++ { + eg.Go(func() error { + for { + select { + case <-egctx.Done(): + return nil + default: + qm, err := queueClient.Enqueue(egctx, postgres.EnqueueParams{ + Topic: a.cfg.Topic, + Payload: []byte("simple message"), + VisibleTimeout: time.Second, + }) + if err != nil { + a.log.ErrorContext(egctx, "unable to enqueue message", slog.Any("err", err)) + time.Sleep(time.Second) + continue + } + + a.log.DebugContext(egctx, "message queued", slog.Any("message", qm)) + } + } + }) + } + + return eg.Wait() +} + +func (a *application) getDB(ctx context.Context) (postgres.Client, error) { + pgcfg := postgres.Config{ + MaxConns: 0, + MaxIdleConns: 0, + MasterDSN: a.cfg.DSN, + } + client, err := postgres.New(ctx, pgcfg, a.log) + if err != nil { + return nil, fmt.Errorf("making postgres client: %w", err) + } + + return client, nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..5538373 --- /dev/null +++ b/go.mod @@ -0,0 +1,33 @@ +module git.loyso.art/frx/smthqueue + +go 1.19 + +require ( + github.com/go-stack/stack v1.8.1 // indirect + github.com/gofrs/uuid v4.4.0+incompatible // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/pgx v3.6.2+incompatible // indirect + github.com/jackc/pgx/v5 v5.4.3 // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/lib/pq v1.10.9 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/rs/zerolog v1.31.0 // indirect + github.com/satori/go.uuid v1.2.0 // indirect + github.com/shopspring/decimal v1.3.1 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + github.com/spf13/cobra v1.7.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect + go.uber.org/multierr v1.10.0 // indirect + go.uber.org/zap v1.26.0 // indirect + golang.org/x/crypto v0.14.0 // indirect + golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect + golang.org/x/sync v0.1.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/term v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect + gopkg.in/inconshreveable/log15.v2 v2.16.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..9dbed02 --- /dev/null +++ b/go.sum @@ -0,0 +1,73 @@ +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw= +github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA= +github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o= +github.com/jackc/pgx v3.6.2+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I= +github.com/jackc/pgx/v5 v5.4.3 h1:cxFyXhxlvAifxnkKKdlxv8XqUf59tDlYjnV5YYfsJJY= +github.com/jackc/pgx/v5 v5.4.3/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A= +github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= +github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= +github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= +github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= +golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/inconshreveable/log15.v2 v2.16.0 h1:LWHLVX8KbBMkQFSqfno4901Z4Wg8L3B7Cu0n4K/Q7MA= +gopkg.in/inconshreveable/log15.v2 v2.16.0/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/postgres/client.go b/internal/postgres/client.go new file mode 100644 index 0000000..2447dcd --- /dev/null +++ b/internal/postgres/client.go @@ -0,0 +1,73 @@ +package postgres + +import ( + "context" + "fmt" + "io" + + "github.com/jackc/pgx/v5/pgxpool" + "golang.org/x/exp/slog" +) + +type DatabaseStats struct { + MaxConnections int32 + TotalConnections int32 + AcquiredConnections int32 + IdleConnections int32 +} + +type Client interface { + io.Closer + + Queue() QueueClient + + GetStats() DatabaseStats +} + +type Config struct { + MaxConns int64 + MaxIdleConns int64 + MasterDSN string +} + +type client struct { + pool *pgxpool.Pool + log *slog.Logger +} + +func New(ctx context.Context, config Config, logger *slog.Logger) (*client, error) { + pgconfig, err := pgxpool.ParseConfig(config.MasterDSN) + if err != nil { + return nil, fmt.Errorf("parsing config: %w", err) + } + + pool, err := pgxpool.NewWithConfig(ctx, pgconfig) + if err != nil { + return nil, fmt.Errorf("making new connection: %w", err) + } + + return &client{ + pool: pool, + log: logger.With(slog.String("name", "db")), + }, nil +} + +func (c *client) Close() error { + if c.pool == nil { + return nil + } + + c.pool.Close() + return nil +} + +func (c *client) GetStats() DatabaseStats { + stat := c.pool.Stat() + + return DatabaseStats{ + MaxConnections: stat.MaxConns(), + TotalConnections: stat.TotalConns(), + AcquiredConnections: stat.AcquiredConns(), + IdleConnections: stat.IdleConns(), + } +} diff --git a/internal/postgres/error.go b/internal/postgres/error.go new file mode 100644 index 0000000..df4eb13 --- /dev/null +++ b/internal/postgres/error.go @@ -0,0 +1,11 @@ +package postgres + +type Error string + +func (err Error) Error() string { return string(err) } + +const ( + ErrVersionIDMismatch Error = "version ids does not match" + ErrNoMessage Error = "no messages in queue" + ErrNotImplemented Error = "not implemented" +) diff --git a/internal/postgres/queue.go b/internal/postgres/queue.go new file mode 100644 index 0000000..06f449b --- /dev/null +++ b/internal/postgres/queue.go @@ -0,0 +1,275 @@ +package postgres + +import ( + "context" + "crypto/rand" + "encoding/hex" + "errors" + "fmt" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "golang.org/x/exp/slog" +) + +type QueuedMessage struct { + // ID of the message. Unique. + ID string + Topic string + Payload []byte + CreatedAt time.Time + UpdatedAt time.Time + + // visibleAt utilizes visibility in the queue. This is used + // if message was not commited for a long time. (Acked or Nacked) + visibleAt time.Time + // versionID of the message. For future use to provide consistency. + // In case message is tried to be acked with different version id, + // this message will be discarded and error will be returned. + versionID uint64 +} + +type DequeueParams struct { + // Timeout sets visibleAfter value to the future. Used in case + // retry is needed, if this message should be handled for sure + // atleast once. + // If timeout is 0, this message will be deleted from queue. + Timeout time.Duration + Topic string +} + +type EnqueueParams struct { + // Topic to which this message belongs to. + Topic string + // Payload of the message. + Payload []byte + // VisibleTimeout + VisibleTimeout time.Duration +} + +type QueueClient interface { + // Enqueue a message into message bus. + Enqueue(context.Context, EnqueueParams) (QueuedMessage, error) + // Dequeue a message from message bus. + Dequeue(context.Context, DequeueParams) (QueuedMessage, error) + + // Ack removes message from queue in case versionID matches. + Ack(context.Context, QueuedMessage) error + // Nack sets visibilityAfter to now() and also updates versionID. + Nack(context.Context, QueuedMessage) error +} + +func (c *client) Queue() QueueClient { + return &queueClient{ + pool: c.pool, + logger: c.log.WithGroup("queue"), + } +} + +type queueClient struct { + pool *pgxpool.Pool + logger *slog.Logger +} + +const tableCreateQuery = ` +CREATE TABLE smth.queue ( + id TEXT NOT NULL, + version_id BIGINT NOT NULL, + topic TEXT NOT NULL DEFAULT '', + payload BYTEA NULL, + created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL, + updated_at TIMESTAMP WITHOUT TIME ZONE NOT NULL, + visible_at TIMESTAMP WITHOUT TIME ZONE NOT NULL, + + CONSTRAINT PRIMARY KEY (id) +); +CREATE INDEX queue_visible_at_idx ON smth.queue(visible_at ASC); +CREATE INDEX queue_topic_idx ON smth.queue(topic);` + +func (c *queueClient) scanIntoMessage(row pgx.Row) (qm QueuedMessage, err error) { + err = row.Scan( + &qm.ID, + &qm.Topic, + &qm.Payload, + &qm.CreatedAt, + &qm.UpdatedAt, + &qm.visibleAt, + &qm.versionID, + ) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return qm, ErrNoMessage + } + + return qm, fmt.Errorf("scanning row: %w", err) + } + + return qm, nil +} + +func (c *queueClient) Enqueue(ctx context.Context, params EnqueueParams) (qm QueuedMessage, err error) { + const initialVersion = 1 + const query = `INSERT INTO smth.queue + (id, topic, payload, created_at, updated_at, visible_at, version_id) + VALUES + ($1, $2, $3, NOW(), NOW(), $4, $5) + RETURNING + id, topic, payload, created_at, updated_at, visible_at, version_id` + + args := []any{ + c.generateID(), + params.Topic, + params.Payload, + time.Now().Add(params.VisibleTimeout).UTC(), + initialVersion, + } + + qt := traceMethod(ctx, c.logger, "Enqueue") + defer qt.finish(&err) + + qt.query(query, args...) + + qm, err = c.scanIntoMessage(c.pool.QueryRow(ctx, query, args...)) + if err != nil { + return qm, fmt.Errorf("scanning row: %w", err) + } + + return qm, nil +} + +func (c *queueClient) Dequeue(ctx context.Context, params DequeueParams) (qm QueuedMessage, err error) { + const queryDeletePrefix = `DELETE FROM smth.queue` + const queryUpdatePrefix = `UPDATE smth.queue SET updated_at = NOW(), version_id = version_id + 1, visible_at = $2` + const querySuffix = ` WHERE id = ( + SELECT id + FROM smth.queue + WHERE + topic = $1 + and visible_at < NOW() + ORDER BY visible_at ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) RETURNING id, topic, payload, created_at, updated_at, visible_at, version_id` + + var query string + args := append( + make([]any, 0, 2), + params.Topic, + ) + if params.Timeout == 0 { + query = queryDeletePrefix + querySuffix + } else { + query = queryUpdatePrefix + querySuffix + args = append(args, time.Now().UTC().Add(params.Timeout)) + } + + qt := traceMethod(ctx, c.logger, "Dequeue") + defer qt.finish(&err) + + qt.query(query, args...) + + qm, err = c.scanIntoMessage(c.pool.QueryRow(ctx, query, args...)) + if err != nil { + return qm, fmt.Errorf("querying: %w", err) + } + + return qm, nil +} + +func (c *queueClient) Ack(ctx context.Context, qm QueuedMessage) (err error) { + const query = `DELETE FROM smth.queue` + + ` WHERE id = $1` + + ` AND version_id = $2` + + return c.modifyByMessage(ctx, qm, "Ack", query) +} + +func (c *queueClient) Nack(ctx context.Context, qm QueuedMessage) error { + const query = `UPDATE smth.queue SET` + + ` visible_at = TO_TIMESTMAP(0)` + + `, updated_at = NOW()` + + ` WHERE id = $1 AND version_id = $2` + + return c.modifyByMessage(ctx, qm, "Nack", query) +} + +func (c *queueClient) modifyByMessage(ctx context.Context, qm QueuedMessage, method, query string) (err error) { + if qm.versionID == 0 { + panic("queued message was not fetched") + } + + args := []any{ + &qm.ID, + &qm.versionID, + } + + qt := traceMethod(ctx, c.logger, method) + defer qt.finish(&err) + + qt.query(query, args...) + + tag, err := c.pool.Exec(ctx, query, args...) + if err != nil { + return fmt.Errorf("executing query: %w", err) + } + + affected := tag.RowsAffected() + qt.setResultCount(affected) + + if affected == 0 { + return ErrVersionIDMismatch + } + + return nil +} + +func (c *queueClient) generateID() string { + var idByte [8]byte + _, _ = rand.Read(idByte[:]) + return hex.EncodeToString(idByte[:]) +} + +type queryTracer struct { + ctx context.Context + logger *slog.Logger + start time.Time + count int64 +} + +func (qt *queryTracer) query(query string, args ...any) { + qt.logger.DebugContext(qt.ctx, "executing query", slog.String("query", query), slog.Any("args", args)) +} + +func (qt *queryTracer) setResultCount(count int64) { + qt.count = count +} + +func (qt *queryTracer) finish(errptr *error) { + var err error + if errptr != nil { + err = *errptr + } + + var level slog.Level + if err == nil { + level = slog.LevelDebug + } else { + level = slog.LevelDebug + } + + qt.logger.Log(qt.ctx, level, + "query finished", + slog.Bool("success", err == nil), + slog.Duration("elapsed", time.Since(qt.start)), + slog.Int64("rows_count", qt.count), + ) +} + +func traceMethod(ctx context.Context, log *slog.Logger, method string) *queryTracer { + return &queryTracer{ + ctx: ctx, + logger: log.With(slog.String("method", method)), + start: time.Now(), + } +}