add metrics and optimize query
This commit is contained in:
@ -109,7 +109,7 @@ func (r *rpsReporter) loop() {
|
||||
ticker := time.NewTicker(time.Second)
|
||||
for range ticker.C {
|
||||
rps := r.reset()
|
||||
dbStats := r.db.GetStats()
|
||||
dbStats := r.db.GetDBStats()
|
||||
r.log.Info(
|
||||
"tick",
|
||||
slog.Int64("rps", int64(rps)),
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"golang.org/x/exp/slog"
|
||||
@ -16,12 +17,40 @@ type DatabaseStats struct {
|
||||
IdleConnections int32
|
||||
}
|
||||
|
||||
type Metrics struct {
|
||||
EnqueuedMessages uint64
|
||||
DequeuedMessages uint64
|
||||
Acked uint64
|
||||
Nacked uint64
|
||||
}
|
||||
|
||||
func (m *Metrics) Clone() Metrics {
|
||||
return *m
|
||||
}
|
||||
|
||||
func (m *Metrics) Enqueue() {
|
||||
atomic.AddUint64(&m.EnqueuedMessages, 1)
|
||||
}
|
||||
|
||||
func (m *Metrics) Dequeue() {
|
||||
atomic.AddUint64(&m.DequeuedMessages, 1)
|
||||
}
|
||||
|
||||
func (m *Metrics) Ack() {
|
||||
atomic.AddUint64(&m.Acked, 1)
|
||||
}
|
||||
|
||||
func (m *Metrics) Nack() {
|
||||
atomic.AddUint64(&m.Nacked, 1)
|
||||
}
|
||||
|
||||
type Client interface {
|
||||
io.Closer
|
||||
|
||||
Queue() QueueClient
|
||||
|
||||
GetStats() DatabaseStats
|
||||
GetMetrics() Metrics
|
||||
GetDBStats() DatabaseStats
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
@ -32,6 +61,7 @@ type Config struct {
|
||||
|
||||
type client struct {
|
||||
pool *pgxpool.Pool
|
||||
metrics *Metrics
|
||||
log *slog.Logger
|
||||
}
|
||||
|
||||
@ -61,7 +91,7 @@ func (c *client) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *client) GetStats() DatabaseStats {
|
||||
func (c *client) GetDBStats() DatabaseStats {
|
||||
stat := c.pool.Stat()
|
||||
|
||||
return DatabaseStats{
|
||||
@ -71,3 +101,7 @@ func (c *client) GetStats() DatabaseStats {
|
||||
IdleConnections: stat.IdleConns(),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) GetMetrics() Metrics {
|
||||
return c.metrics.Clone()
|
||||
}
|
||||
|
||||
@ -63,12 +63,14 @@ type QueueClient interface {
|
||||
func (c *client) Queue() QueueClient {
|
||||
return &queueClient{
|
||||
pool: c.pool,
|
||||
metrics: c.metrics,
|
||||
logger: c.log.WithGroup("queue"),
|
||||
}
|
||||
}
|
||||
|
||||
type queueClient struct {
|
||||
pool *pgxpool.Pool
|
||||
metrics *Metrics
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
@ -88,7 +90,7 @@ CREATE INDEX queue_visible_at_idx ON smth.queue(visible_at ASC);
|
||||
CREATE INDEX queue_topic_idx ON smth.queue(topic);`
|
||||
|
||||
func (c *queueClient) scanIntoMessage(row pgx.Row) (qm QueuedMessage, err error) {
|
||||
err = row.Scan(
|
||||
if err = row.Scan(
|
||||
&qm.ID,
|
||||
&qm.Topic,
|
||||
&qm.Payload,
|
||||
@ -96,8 +98,7 @@ func (c *queueClient) scanIntoMessage(row pgx.Row) (qm QueuedMessage, err error)
|
||||
&qm.UpdatedAt,
|
||||
&qm.visibleAt,
|
||||
&qm.versionID,
|
||||
)
|
||||
if err != nil {
|
||||
); err != nil {
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
return qm, ErrNoMessage
|
||||
}
|
||||
@ -109,13 +110,14 @@ func (c *queueClient) scanIntoMessage(row pgx.Row) (qm QueuedMessage, err error)
|
||||
}
|
||||
|
||||
func (c *queueClient) Enqueue(ctx context.Context, params EnqueueParams) (qm QueuedMessage, err error) {
|
||||
defer c.metrics.Enqueue()
|
||||
|
||||
const initialVersion = 1
|
||||
const query = `INSERT INTO smth.queue
|
||||
(id, topic, payload, created_at, updated_at, visible_at, version_id)
|
||||
VALUES
|
||||
($1, $2, $3, NOW(), NOW(), $4, $5)
|
||||
RETURNING
|
||||
id, topic, payload, created_at, updated_at, visible_at, version_id`
|
||||
const query = `INSERT INTO smth.queue` +
|
||||
` (id, topic, payload, created_at, updated_at, visible_at, version_id)` +
|
||||
` VALUES` +
|
||||
` ($1, $2, $3, NOW(), NOW(), $4, $5)` +
|
||||
` RETURNING id, topic, payload, created_at, updated_at, visible_at, version_id`
|
||||
|
||||
args := []any{
|
||||
c.generateID(),
|
||||
@ -139,37 +141,34 @@ func (c *queueClient) Enqueue(ctx context.Context, params EnqueueParams) (qm Que
|
||||
}
|
||||
|
||||
func (c *queueClient) Dequeue(ctx context.Context, params DequeueParams) (qm QueuedMessage, err error) {
|
||||
defer c.metrics.Dequeue()
|
||||
|
||||
const queryDeletePrefix = `DELETE FROM smth.queue`
|
||||
const queryUpdatePrefix = `UPDATE smth.queue SET updated_at = NOW(), version_id = version_id + 1, visible_at = $2`
|
||||
const querySuffix = ` WHERE id = (
|
||||
SELECT id
|
||||
FROM smth.queue
|
||||
WHERE
|
||||
topic = $1
|
||||
and visible_at < NOW()
|
||||
ORDER BY visible_at ASC
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
) RETURNING id, topic, payload, created_at, updated_at, visible_at, version_id`
|
||||
|
||||
var query string
|
||||
args := append(
|
||||
make([]any, 0, 2),
|
||||
params.Topic,
|
||||
)
|
||||
if params.Timeout == 0 {
|
||||
query = queryDeletePrefix + querySuffix
|
||||
} else {
|
||||
query = queryUpdatePrefix + querySuffix
|
||||
args = append(args, time.Now().UTC().Add(params.Timeout))
|
||||
}
|
||||
const querySuffix = ` WHERE id = (` +
|
||||
`SELECT id` +
|
||||
` FROM smth.queue` +
|
||||
` WHERE` +
|
||||
` topic = $1` +
|
||||
` and visible_at < NOW()` +
|
||||
` ORDER BY visible_at ASC` +
|
||||
` LIMIT 1` +
|
||||
` FOR UPDATE SKIP LOCKED` +
|
||||
`) RETURNING id, topic, payload, created_at, updated_at, visible_at, version_id`
|
||||
|
||||
qt := traceMethod(ctx, c.logger, "Dequeue")
|
||||
defer qt.finish(&err)
|
||||
|
||||
qt.query(query, args...)
|
||||
|
||||
qm, err = c.scanIntoMessage(c.pool.QueryRow(ctx, query, args...))
|
||||
if params.Timeout == 0 {
|
||||
query := queryDeletePrefix + querySuffix
|
||||
qt.query(query, params.Topic)
|
||||
qm, err = c.scanIntoMessage(c.pool.QueryRow(ctx, query, params.Topic))
|
||||
} else {
|
||||
query := queryUpdatePrefix + querySuffix
|
||||
visibleAt := time.Now().UTC().Add(params.Timeout)
|
||||
qt.query(query, params.Topic, visibleAt)
|
||||
qm, err = c.scanIntoMessage(c.pool.QueryRow(ctx, query, params.Topic, visibleAt))
|
||||
}
|
||||
if err != nil {
|
||||
return qm, fmt.Errorf("querying: %w", err)
|
||||
}
|
||||
@ -178,6 +177,8 @@ func (c *queueClient) Dequeue(ctx context.Context, params DequeueParams) (qm Que
|
||||
}
|
||||
|
||||
func (c *queueClient) Ack(ctx context.Context, qm QueuedMessage) (err error) {
|
||||
defer c.metrics.Ack()
|
||||
|
||||
const query = `DELETE FROM smth.queue` +
|
||||
` WHERE id = $1` +
|
||||
` AND version_id = $2`
|
||||
@ -186,9 +187,12 @@ func (c *queueClient) Ack(ctx context.Context, qm QueuedMessage) (err error) {
|
||||
}
|
||||
|
||||
func (c *queueClient) Nack(ctx context.Context, qm QueuedMessage) error {
|
||||
const query = `UPDATE smth.queue SET` +
|
||||
` visible_at = TO_TIMESTMAP(0)` +
|
||||
`, updated_at = NOW()` +
|
||||
defer c.metrics.Nack()
|
||||
|
||||
const query = `UPDATE smth.queue` +
|
||||
` SET` +
|
||||
` visible_at = TO_TIMESTMAP(0),` +
|
||||
` updated_at = NOW()` +
|
||||
` WHERE id = $1 AND version_id = $2`
|
||||
|
||||
return c.modifyByMessage(ctx, qm, "Nack", query)
|
||||
@ -196,20 +200,15 @@ func (c *queueClient) Nack(ctx context.Context, qm QueuedMessage) error {
|
||||
|
||||
func (c *queueClient) modifyByMessage(ctx context.Context, qm QueuedMessage, method, query string) (err error) {
|
||||
if qm.versionID == 0 {
|
||||
panic("queued message was not fetched")
|
||||
}
|
||||
|
||||
args := []any{
|
||||
&qm.ID,
|
||||
&qm.versionID,
|
||||
return errors.New("versionID cannot be 0")
|
||||
}
|
||||
|
||||
qt := traceMethod(ctx, c.logger, method)
|
||||
defer qt.finish(&err)
|
||||
|
||||
qt.query(query, args...)
|
||||
qt.query(query, qm.ID, qm.versionID)
|
||||
|
||||
tag, err := c.pool.Exec(ctx, query, args...)
|
||||
tag, err := c.pool.Exec(ctx, query, qm.ID, qm.versionID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("executing query: %w", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user