Files
smthqueue/internal/postgres/client.go

108 lines
1.8 KiB
Go

package postgres
import (
"context"
"fmt"
"io"
"sync/atomic"
"github.com/jackc/pgx/v5/pgxpool"
"golang.org/x/exp/slog"
)
type DatabaseStats struct {
MaxConnections int32
TotalConnections int32
AcquiredConnections int32
IdleConnections int32
}
type Metrics struct {
EnqueuedMessages uint64
DequeuedMessages uint64
Acked uint64
Nacked uint64
}
func (m *Metrics) Clone() Metrics {
return *m
}
func (m *Metrics) Enqueue() {
atomic.AddUint64(&m.EnqueuedMessages, 1)
}
func (m *Metrics) Dequeue() {
atomic.AddUint64(&m.DequeuedMessages, 1)
}
func (m *Metrics) Ack() {
atomic.AddUint64(&m.Acked, 1)
}
func (m *Metrics) Nack() {
atomic.AddUint64(&m.Nacked, 1)
}
type Client interface {
io.Closer
Queue() QueueClient
GetMetrics() Metrics
GetDBStats() DatabaseStats
}
type Config struct {
MaxConns int64
MaxIdleConns int64
MasterDSN string
}
type client struct {
pool *pgxpool.Pool
metrics *Metrics
log *slog.Logger
}
func New(ctx context.Context, config Config, logger *slog.Logger) (*client, error) {
pgconfig, err := pgxpool.ParseConfig(config.MasterDSN)
if err != nil {
return nil, fmt.Errorf("parsing config: %w", err)
}
pool, err := pgxpool.NewWithConfig(ctx, pgconfig)
if err != nil {
return nil, fmt.Errorf("making new connection: %w", err)
}
return &client{
pool: pool,
log: logger.With(slog.String("name", "db")),
}, nil
}
func (c *client) Close() error {
if c.pool == nil {
return nil
}
c.pool.Close()
return nil
}
func (c *client) GetDBStats() DatabaseStats {
stat := c.pool.Stat()
return DatabaseStats{
MaxConnections: stat.MaxConns(),
TotalConnections: stat.TotalConns(),
AcquiredConnections: stat.AcquiredConns(),
IdleConnections: stat.IdleConns(),
}
}
func (c *client) GetMetrics() Metrics {
return c.metrics.Clone()
}