add opentelemetry tracing

This commit is contained in:
Aleksandr Trushkin
2024-04-02 15:23:22 +03:00
parent e7c2832865
commit 68810d93a7
40 changed files with 1459 additions and 3048 deletions

View File

@ -9,9 +9,12 @@ import (
)
type Config struct {
Log config.Log `json:"log"`
YDB config.YDB `json:"ydb"`
HTTP config.HTTP `json:"http"`
Log config.Log `json:"log"`
YDB config.YDB `json:"ydb"`
Sqlite config.Sqlite `json:"sqlite"`
HTTP config.HTTP `json:"http"`
Tracing config.Trace `json:"tracing"`
DBEngine string `json:"db_engine"`
}
func readFromFile(path string, defaultConfigF func() Config) (Config, error) {

View File

@ -11,8 +11,12 @@ import (
"git.loyso.art/frx/kurious/internal/common/generator"
"git.loyso.art/frx/kurious/internal/common/xcontext"
xhttp "git.loyso.art/frx/kurious/internal/kurious/ports/http"
"git.loyso.art/frx/kurious/pkg/xdefault"
"github.com/gorilla/mux"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
func makePathTemplate(params ...string) string {
@ -29,50 +33,32 @@ func makePathTemplate(params ...string) string {
return sb.String()
}
func setupHTTPWithTempl(srv xhttp.Server, router *mux.Router, log *slog.Logger) {
coursesRouter := router.PathPrefix("/courses").Subrouter().StrictSlash(true)
func setupCoursesHTTP(srv xhttp.Server, router *mux.Router, _ *slog.Logger) {
coursesAPI := srv.Courses()
coursesAPI := srv.CoursesByTempl()
coursesRouter.HandleFunc("/", coursesAPI.List).Methods(http.MethodGet)
coursesListLearningOnlyPath := makePathTemplate(xhttp.LearningTypePathParam)
coursesRouter.HandleFunc(coursesListLearningOnlyPath, coursesAPI.List).Methods(http.MethodGet)
coursesListFullPath := makePathTemplate(xhttp.LearningTypePathParam, xhttp.ThematicTypePathParam)
coursesRouter.HandleFunc(coursesListFullPath, coursesAPI.List).Methods(http.MethodGet)
}
func setupHTTPWithGoTemplates(srv xhttp.Server, router *mux.Router, log *slog.Logger) {
coursesAPI := srv.Courses(true)
router.Handle("/", http.RedirectHandler("/courses", http.StatusPermanentRedirect))
coursesRouter := router.PathPrefix("/courses").Subrouter().StrictSlash(true)
coursesRouter.HandleFunc("/", coursesAPI.List).Methods(http.MethodGet)
coursesListLearningOnlyPath := makePathTemplate(xhttp.LearningTypePathParam)
coursesRouter.HandleFunc(coursesListLearningOnlyPath, coursesAPI.List).Methods(http.MethodGet)
coursesListFullPath := makePathTemplate(xhttp.LearningTypePathParam, xhttp.ThematicTypePathParam)
coursesRouter.HandleFunc(coursesListFullPath, coursesAPI.List).Methods(http.MethodGet)
courseRouter := router.PathPrefix("/course").PathPrefix("/{course_id}").Subrouter()
courseRouter.HandleFunc("/", coursesAPI.Get).Methods(http.MethodGet)
courseRouter.HandleFunc("/short", coursesAPI.GetShort).Methods(http.MethodGet)
courseRouter.HandleFunc("/editdesc", coursesAPI.RenderEditDescription).Methods(http.MethodGet)
courseRouter.HandleFunc("/description", coursesAPI.UpdateCourseDescription).Methods(http.MethodPut)
coursesListLearningOnlyPath := makePathTemplate(xhttp.LearningTypePathParam)
coursesListFullPath := makePathTemplate(xhttp.LearningTypePathParam, xhttp.ThematicTypePathParam)
muxHandleFunc(coursesRouter, "/", coursesAPI.Index).Methods(http.MethodGet)
muxHandleFunc(coursesRouter, coursesListLearningOnlyPath, coursesAPI.List).Methods(http.MethodGet)
muxHandleFunc(coursesRouter, coursesListFullPath, coursesAPI.List).Methods(http.MethodGet)
}
func setupHTTP(cfg config.HTTP, srv xhttp.Server, log *slog.Logger) *http.Server {
router := mux.NewRouter()
router.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
router.Use(
middlewareCustomWriterInjector(),
mux.CORSMethodMiddleware(router),
middlewareLogger(log),
middlewareTrace(),
)
router.Use(mux.CORSMethodMiddleware(router))
router.Use(middlewareLogger(log, cfg.Engine))
if cfg.Engine == "templ" {
setupHTTPWithTempl(srv, router, log)
} else {
setupHTTPWithGoTemplates(srv, router, log)
}
setupCoursesHTTP(srv, router, log)
if cfg.MountLive {
fs := http.FileServer(http.Dir("./assets/kurious/static/"))
@ -103,7 +89,7 @@ func setupHTTP(cfg config.HTTP, srv xhttp.Server, log *slog.Logger) *http.Server
}
} else {
fs := kurious.AsHTTPFileHandler()
router.PathPrefix("/").Handler(fs).Methods(http.MethodGet)
router.PathPrefix("/*").Handler(fs).Methods(http.MethodGet)
}
return &http.Server{
@ -112,7 +98,46 @@ func setupHTTP(cfg config.HTTP, srv xhttp.Server, log *slog.Logger) *http.Server
}
}
func middlewareLogger(log *slog.Logger, engine string) mux.MiddlewareFunc {
func middlewareCustomWriterInjector() mux.MiddlewareFunc {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
wr := wrapWithCustomWriter(w)
next.ServeHTTP(wr, r)
})
}
}
func middlewareTrace() mux.MiddlewareFunc {
reqidAttr := attribute.Key("http.request_id")
statusAttr := attribute.Key("http.status_code")
payloadAttr := attribute.Key("http.payload_size")
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
reqid := xcontext.GetRequestID(ctx)
var span trace.Span
ctx, span = webtracer.Start(ctx, r.URL.String(), trace.WithAttributes(reqidAttr.String(reqid)))
defer span.End()
next.ServeHTTP(w, r.WithContext(ctx))
if wr, ok := w.(*customResponseWriter); ok {
statusCode := xdefault.WithFallback(wr.statusCode, http.StatusOK)
span.SetAttributes(
statusAttr.Int(statusCode),
payloadAttr.Int(wr.wroteBytes),
)
if statusCode > 399 {
span.SetStatus(codes.Error, "error during request")
}
}
})
}
}
func middlewareLogger(log *slog.Logger) mux.MiddlewareFunc {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
@ -120,11 +145,12 @@ func middlewareLogger(log *slog.Logger, engine string) mux.MiddlewareFunc {
if requestID == "" {
requestID = generator.RandomInt64ID()
}
ctx = xcontext.WithLogFields(
ctx,
slog.String("request_id", requestID),
slog.String("engine", engine),
)
ctx = xcontext.WithRequestID(ctx, requestID)
xcontext.LogInfo(
ctx, log, "incoming request",
@ -134,12 +160,45 @@ func middlewareLogger(log *slog.Logger, engine string) mux.MiddlewareFunc {
start := time.Now()
next.ServeHTTP(w, r.WithContext(ctx))
elapsed := time.Since(start).Truncate(time.Millisecond)
elapsed := slog.Duration("elapsed", time.Since(start).Truncate(time.Millisecond))
xcontext.LogInfo(
ctx, log, "request processed",
slog.Duration("elapsed", elapsed),
)
logfields := make([]slog.Attr, 0, 3)
logfields = append(logfields, elapsed)
if wr, ok := w.(*customResponseWriter); ok {
statusCode := xdefault.WithFallback(wr.statusCode, http.StatusOK)
logfields = append(
logfields,
slog.Int("status_code", statusCode),
slog.Int("bytes_wrote", wr.wroteBytes),
)
}
xcontext.LogInfo(ctx, log, "request processed", logfields...)
})
}
}
func wrapWithCustomWriter(origin http.ResponseWriter) *customResponseWriter {
return &customResponseWriter{
ResponseWriter: origin,
}
}
type customResponseWriter struct {
http.ResponseWriter
statusCode int
wroteBytes int
}
func (w *customResponseWriter) WriteHeader(statusCode int) {
w.statusCode = statusCode
w.ResponseWriter.WriteHeader(statusCode)
}
func (w *customResponseWriter) Write(data []byte) (n int, err error) {
n, err = w.ResponseWriter.Write(data)
w.wroteBytes += n
return n, err
}

View File

@ -46,6 +46,17 @@ func app(ctx context.Context) error {
log := config.NewSLogger(cfg.Log)
shutdownOtel, err := setupOtelSDK(ctx, cfg.Tracing)
if err != nil {
return fmt.Errorf("setting up otel sdk: %w", err)
}
defer func() {
err := shutdownOtel(ctx)
if err != nil {
xcontext.LogWithError(ctx, log, err, "shutting down sdk")
}
}()
sravniClient, err := sravni.NewClient(ctx, log, false)
if err != nil {
return fmt.Errorf("unable to make new sravni client: %w", err)
@ -71,9 +82,20 @@ func app(ctx context.Context) error {
mapper := adapters.NewMemoryMapper(courseThematcisMapped, learningTypeMapped)
var dbengine service.RepositoryEngine
switch cfg.DBEngine {
case "ydb":
dbengine = service.RepositoryEngineYDB
case "sqlite":
dbengine = service.RepositoryEngineSqlite
default:
dbengine = service.RepositoryEngineUnknown
}
app, err := service.NewApplication(ctx, service.ApplicationConfig{
LogConfig: cfg.Log,
YDB: cfg.YDB,
Sqlite: cfg.Sqlite,
Engine: dbengine,
}, mapper)
if err != nil {
return fmt.Errorf("making new application: %w", err)
@ -113,6 +135,11 @@ func app(ctx context.Context) error {
xcontext.LogInfo(ctx, log, "server closed successfuly")
err = shutdownOtel(sdctx)
if err != nil {
return fmt.Errorf("shutting down sdk: %w", err)
}
return nil
})

132
cmd/kuriweb/trace.go Normal file
View File

@ -0,0 +1,132 @@
package main
import (
"context"
"errors"
"fmt"
"net/http"
"time"
"git.loyso.art/frx/kurious/internal/common/config"
"git.loyso.art/frx/kurious/pkg/xdefault"
"github.com/gorilla/mux"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
)
var webtracer = otel.Tracer("kuriweb")
type shutdownFunc func(context.Context) error
func setupOtelSDK(ctx context.Context, cfg config.Trace) (shutdown shutdownFunc, err error) {
var shutdownFuncs []shutdownFunc
shutdown = func(ctx context.Context) error {
var err error
for _, f := range shutdownFuncs {
err = errors.Join(err, f(ctx))
}
shutdownFuncs = nil
return err
}
handleError := func(inErr error) error {
err = errors.Join(inErr, shutdown(ctx))
return err
}
prop := newPropagator()
otel.SetTextMapPropagator(prop)
tracerProvider, err := newTraceProvider(ctx, cfg.Endpoint, cfg.LicenseKey)
if err != nil {
return nil, handleError(err)
}
shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown)
otel.SetTracerProvider(tracerProvider)
meterProvider, err := newMeterProvider()
if err != nil {
return nil, handleError(err)
}
shutdownFuncs = append(shutdownFuncs, meterProvider.Shutdown)
otel.SetMeterProvider(meterProvider)
return shutdown, nil
}
func newPropagator() propagation.TextMapPropagator {
return propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
)
}
const defaultNewRelicEndpoint = "otlp.eu01.nr-data.net:443"
func newTraceProvider(ctx context.Context, endpoint, licensekey string) (traceProvider *trace.TracerProvider, err error) {
opts := make([]trace.TracerProviderOption, 0, 2)
opts = append(
opts,
trace.WithSampler(trace.AlwaysSample()),
trace.WithResource(resource.Default()),
)
if licensekey != "" {
endpoint = xdefault.WithFallback(endpoint, defaultNewRelicEndpoint)
client, err := otlptracegrpc.New(
ctx,
otlptracegrpc.WithEndpoint(endpoint),
otlptracegrpc.WithHeaders(map[string]string{
"api-key": licensekey,
}),
)
if err != nil {
return nil, fmt.Errorf("making grpc client: %w", err)
}
opts = append(opts, trace.WithBatcher(client, trace.WithBatchTimeout(time.Second*10)))
} else {
traceExporter, err := stdouttrace.New(
stdouttrace.WithPrettyPrint())
if err != nil {
return nil, err
}
opts = append(
opts,
trace.WithBatcher(traceExporter, trace.WithBatchTimeout(time.Second*5)),
)
}
traceProvider = trace.NewTracerProvider(opts...)
return traceProvider, nil
}
func newMeterProvider() (*metric.MeterProvider, error) {
metricExporter, err := stdoutmetric.New()
if err != nil {
return nil, err
}
meterProvider := metric.NewMeterProvider(
metric.WithReader(metric.NewPeriodicReader(metricExporter,
// Default is 1m. Set to 3s for demonstrative purposes.
metric.WithInterval(60*time.Second))),
)
return meterProvider, nil
}
func muxHandleFunc(router *mux.Router, path string, hf http.HandlerFunc) *mux.Route {
h := otelhttp.WithRouteTag(path, hf)
return router.Handle(path, h)
}