diff --git a/cmd/simple/main.go b/cmd/simple/main.go index 2d732e6..214a237 100644 --- a/cmd/simple/main.go +++ b/cmd/simple/main.go @@ -109,7 +109,7 @@ func (r *rpsReporter) loop() { ticker := time.NewTicker(time.Second) for range ticker.C { rps := r.reset() - dbStats := r.db.GetStats() + dbStats := r.db.GetDBStats() r.log.Info( "tick", slog.Int64("rps", int64(rps)), diff --git a/internal/postgres/client.go b/internal/postgres/client.go index 2447dcd..db9b158 100644 --- a/internal/postgres/client.go +++ b/internal/postgres/client.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "sync/atomic" "github.com/jackc/pgx/v5/pgxpool" "golang.org/x/exp/slog" @@ -16,12 +17,40 @@ type DatabaseStats struct { IdleConnections int32 } +type Metrics struct { + EnqueuedMessages uint64 + DequeuedMessages uint64 + Acked uint64 + Nacked uint64 +} + +func (m *Metrics) Clone() Metrics { + return *m +} + +func (m *Metrics) Enqueue() { + atomic.AddUint64(&m.EnqueuedMessages, 1) +} + +func (m *Metrics) Dequeue() { + atomic.AddUint64(&m.DequeuedMessages, 1) +} + +func (m *Metrics) Ack() { + atomic.AddUint64(&m.Acked, 1) +} + +func (m *Metrics) Nack() { + atomic.AddUint64(&m.Nacked, 1) +} + type Client interface { io.Closer Queue() QueueClient - GetStats() DatabaseStats + GetMetrics() Metrics + GetDBStats() DatabaseStats } type Config struct { @@ -31,8 +60,9 @@ type Config struct { } type client struct { - pool *pgxpool.Pool - log *slog.Logger + pool *pgxpool.Pool + metrics *Metrics + log *slog.Logger } func New(ctx context.Context, config Config, logger *slog.Logger) (*client, error) { @@ -61,7 +91,7 @@ func (c *client) Close() error { return nil } -func (c *client) GetStats() DatabaseStats { +func (c *client) GetDBStats() DatabaseStats { stat := c.pool.Stat() return DatabaseStats{ @@ -71,3 +101,7 @@ func (c *client) GetStats() DatabaseStats { IdleConnections: stat.IdleConns(), } } + +func (c *client) GetMetrics() Metrics { + return c.metrics.Clone() +} diff --git a/internal/postgres/queue.go b/internal/postgres/queue.go index 06f449b..aa191df 100644 --- a/internal/postgres/queue.go +++ b/internal/postgres/queue.go @@ -62,14 +62,16 @@ type QueueClient interface { func (c *client) Queue() QueueClient { return &queueClient{ - pool: c.pool, - logger: c.log.WithGroup("queue"), + pool: c.pool, + metrics: c.metrics, + logger: c.log.WithGroup("queue"), } } type queueClient struct { - pool *pgxpool.Pool - logger *slog.Logger + pool *pgxpool.Pool + metrics *Metrics + logger *slog.Logger } const tableCreateQuery = ` @@ -88,7 +90,7 @@ CREATE INDEX queue_visible_at_idx ON smth.queue(visible_at ASC); CREATE INDEX queue_topic_idx ON smth.queue(topic);` func (c *queueClient) scanIntoMessage(row pgx.Row) (qm QueuedMessage, err error) { - err = row.Scan( + if err = row.Scan( &qm.ID, &qm.Topic, &qm.Payload, @@ -96,8 +98,7 @@ func (c *queueClient) scanIntoMessage(row pgx.Row) (qm QueuedMessage, err error) &qm.UpdatedAt, &qm.visibleAt, &qm.versionID, - ) - if err != nil { + ); err != nil { if errors.Is(err, pgx.ErrNoRows) { return qm, ErrNoMessage } @@ -109,13 +110,14 @@ func (c *queueClient) scanIntoMessage(row pgx.Row) (qm QueuedMessage, err error) } func (c *queueClient) Enqueue(ctx context.Context, params EnqueueParams) (qm QueuedMessage, err error) { + defer c.metrics.Enqueue() + const initialVersion = 1 - const query = `INSERT INTO smth.queue - (id, topic, payload, created_at, updated_at, visible_at, version_id) - VALUES - ($1, $2, $3, NOW(), NOW(), $4, $5) - RETURNING - id, topic, payload, created_at, updated_at, visible_at, version_id` + const query = `INSERT INTO smth.queue` + + ` (id, topic, payload, created_at, updated_at, visible_at, version_id)` + + ` VALUES` + + ` ($1, $2, $3, NOW(), NOW(), $4, $5)` + + ` RETURNING id, topic, payload, created_at, updated_at, visible_at, version_id` args := []any{ c.generateID(), @@ -139,37 +141,34 @@ func (c *queueClient) Enqueue(ctx context.Context, params EnqueueParams) (qm Que } func (c *queueClient) Dequeue(ctx context.Context, params DequeueParams) (qm QueuedMessage, err error) { + defer c.metrics.Dequeue() + const queryDeletePrefix = `DELETE FROM smth.queue` const queryUpdatePrefix = `UPDATE smth.queue SET updated_at = NOW(), version_id = version_id + 1, visible_at = $2` - const querySuffix = ` WHERE id = ( - SELECT id - FROM smth.queue - WHERE - topic = $1 - and visible_at < NOW() - ORDER BY visible_at ASC - LIMIT 1 - FOR UPDATE SKIP LOCKED - ) RETURNING id, topic, payload, created_at, updated_at, visible_at, version_id` - - var query string - args := append( - make([]any, 0, 2), - params.Topic, - ) - if params.Timeout == 0 { - query = queryDeletePrefix + querySuffix - } else { - query = queryUpdatePrefix + querySuffix - args = append(args, time.Now().UTC().Add(params.Timeout)) - } + const querySuffix = ` WHERE id = (` + + `SELECT id` + + ` FROM smth.queue` + + ` WHERE` + + ` topic = $1` + + ` and visible_at < NOW()` + + ` ORDER BY visible_at ASC` + + ` LIMIT 1` + + ` FOR UPDATE SKIP LOCKED` + + `) RETURNING id, topic, payload, created_at, updated_at, visible_at, version_id` qt := traceMethod(ctx, c.logger, "Dequeue") defer qt.finish(&err) - qt.query(query, args...) - - qm, err = c.scanIntoMessage(c.pool.QueryRow(ctx, query, args...)) + if params.Timeout == 0 { + query := queryDeletePrefix + querySuffix + qt.query(query, params.Topic) + qm, err = c.scanIntoMessage(c.pool.QueryRow(ctx, query, params.Topic)) + } else { + query := queryUpdatePrefix + querySuffix + visibleAt := time.Now().UTC().Add(params.Timeout) + qt.query(query, params.Topic, visibleAt) + qm, err = c.scanIntoMessage(c.pool.QueryRow(ctx, query, params.Topic, visibleAt)) + } if err != nil { return qm, fmt.Errorf("querying: %w", err) } @@ -178,6 +177,8 @@ func (c *queueClient) Dequeue(ctx context.Context, params DequeueParams) (qm Que } func (c *queueClient) Ack(ctx context.Context, qm QueuedMessage) (err error) { + defer c.metrics.Ack() + const query = `DELETE FROM smth.queue` + ` WHERE id = $1` + ` AND version_id = $2` @@ -186,9 +187,12 @@ func (c *queueClient) Ack(ctx context.Context, qm QueuedMessage) (err error) { } func (c *queueClient) Nack(ctx context.Context, qm QueuedMessage) error { - const query = `UPDATE smth.queue SET` + - ` visible_at = TO_TIMESTMAP(0)` + - `, updated_at = NOW()` + + defer c.metrics.Nack() + + const query = `UPDATE smth.queue` + + ` SET` + + ` visible_at = TO_TIMESTMAP(0),` + + ` updated_at = NOW()` + ` WHERE id = $1 AND version_id = $2` return c.modifyByMessage(ctx, qm, "Nack", query) @@ -196,20 +200,15 @@ func (c *queueClient) Nack(ctx context.Context, qm QueuedMessage) error { func (c *queueClient) modifyByMessage(ctx context.Context, qm QueuedMessage, method, query string) (err error) { if qm.versionID == 0 { - panic("queued message was not fetched") - } - - args := []any{ - &qm.ID, - &qm.versionID, + return errors.New("versionID cannot be 0") } qt := traceMethod(ctx, c.logger, method) defer qt.finish(&err) - qt.query(query, args...) + qt.query(query, qm.ID, qm.versionID) - tag, err := c.pool.Exec(ctx, query, args...) + tag, err := c.pool.Exec(ctx, query, qm.ID, qm.versionID) if err != nil { return fmt.Errorf("executing query: %w", err) }