package main import ( "context" "errors" "fmt" "net/http" "time" "git.loyso.art/frx/kurious/internal/common/config" "google.golang.org/grpc/encoding/gzip" "github.com/gorilla/mux" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" ) var ( webtracer = otel.Tracer("kuriweb.http") webmetric = otel.Meter("kuriweb.http") ) type shutdownFunc func(context.Context) error func setupOtelSDK(ctx context.Context, cfg config.Trace) (shutdown shutdownFunc, err error) { var shutdownFuncs []shutdownFunc shutdown = func(ctx context.Context) error { var err error for _, f := range shutdownFuncs { err = errors.Join(err, f(ctx)) } shutdownFuncs = nil return err } handleError := func(inErr error) error { err = errors.Join(inErr, shutdown(ctx)) return err } resource, err := makeServiceResource(ctx) if err != nil { return shutdown, fmt.Errorf("making service resource: %w", err) } prop := newPropagator() otel.SetTextMapPropagator(prop) tracerProvider, err := newCommonTraceProvider(ctx, TraceProviderParams{ Endpoint: cfg.Endpoint, Type: cfg.Type, AuthHeader: cfg.APIHeader, APIKey: cfg.APIKey, }, resource) if err != nil { return nil, handleError(err) } shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown) otel.SetTracerProvider(tracerProvider) if cfg.ShowMetrics { meterProvider, err := newCommonMeterProvider(ctx, meterProviderParams{ Endpoint: cfg.Endpoint, Type: cfg.Type, AuthHeaderKey: cfg.APIHeader, AuthHeaderValue: cfg.APIKey, ReadInterval: time.Second * 15, }, resource) if err != nil { return nil, handleError(err) } shutdownFuncs = append(shutdownFuncs, meterProvider.Shutdown) otel.SetMeterProvider(meterProvider) } return shutdown, nil } func newPropagator() propagation.TextMapPropagator { return propagation.NewCompositeTextMapPropagator( propagation.TraceContext{}, propagation.Baggage{}, ) } type TraceProviderParams struct { Endpoint string APIKey string AuthHeader string Type config.TraceClientType } func makeServiceResource(ctx context.Context) (*resource.Resource, error) { r, err := resource.New( ctx, resource.WithDetectors( resource.StringDetector(semconv.SchemaURL, semconv.ServiceNameKey, func() (string, error) { return "bigstats:kuriweb", nil }), ), resource.WithAttributes( semconv.ServiceName("bigstats:kuriweb"), semconv.DeploymentEnvironment("production"), ), ) if err != nil { return nil, fmt.Errorf("making new resource: %w", err) } return resource.Merge(resource.Default(), r) } func newCommonTraceProvider(ctx context.Context, params TraceProviderParams, r *resource.Resource) (tp *trace.TracerProvider, err error) { opts := make([]trace.TracerProviderOption, 0, 4) opts = append( opts, trace.WithSampler(trace.AlwaysSample()), trace.WithResource(r), ) if params.Type != config.TraceClientTypeUnset { var spanExporter trace.SpanExporter var headers map[string]string if params.AuthHeader != "" { headers = make(map[string]string, 1) headers[params.AuthHeader] = params.APIKey } switch params.Type { case config.TraceClientTypeGRPC: spanExporter, err = otlptracegrpc.New( ctx, otlptracegrpc.WithEndpointURL(params.Endpoint), otlptracegrpc.WithInsecure(), otlptracegrpc.WithHeaders(headers), otlptracegrpc.WithCompressor(gzip.Name), ) case config.TraceClientTypeHTTP: httpClient := otlptracehttp.NewClient( otlptracehttp.WithEndpointURL(params.Endpoint), otlptracehttp.WithHeaders(headers), otlptracehttp.WithCompression(otlptracehttp.GzipCompression), ) spanExporter, err = otlptrace.New( ctx, httpClient, ) case config.TraceClientTypeStdout: spanExporter, err = stdouttrace.New(stdouttrace.WithPrettyPrint()) default: return nil, fmt.Errorf("unsupported provider type") } if err != nil { return nil, fmt.Errorf("making trace exporter: %w", err) } opts = append(opts, trace.WithBatcher(spanExporter)) } tp = trace.NewTracerProvider(opts...) return tp, nil } type meterProviderParams struct { Endpoint string AuthHeaderKey string AuthHeaderValue string ReadInterval time.Duration Type config.TraceClientType } func newCommonMeterProvider(ctx context.Context, params meterProviderParams, r *resource.Resource) (*metric.MeterProvider, error) { var exporter metric.Exporter var err error switch params.Type { case config.TraceClientTypeGRPC: headers := make(map[string]string, 1) if params.AuthHeaderKey != "" { headers[params.AuthHeaderKey] = params.AuthHeaderValue } exporter, err = otlpmetricgrpc.New( ctx, otlpmetricgrpc.WithEndpointURL(params.Endpoint), otlpmetricgrpc.WithHeaders(headers), otlpmetricgrpc.WithCompressor(gzip.Name), otlpmetricgrpc.WithTemporalitySelector(preferDeltaTemporalitySelector), otlpmetricgrpc.WithInsecure(), ) if err != nil { return nil, fmt.Errorf("making grpc exporter: %w", err) } case config.TraceClientTypeStdout: exporter, err = stdoutmetric.New() if err != nil { return nil, fmt.Errorf("making stdout exporter: %w", err) } default: return nil, nil } reader := metric.NewPeriodicReader( exporter, metric.WithInterval(params.ReadInterval), ) provider := metric.NewMeterProvider( metric.WithReader(reader), metric.WithResource(r), ) return provider, nil } func muxHandleFunc(router *mux.Router, name, path string, hf http.HandlerFunc) *mux.Route { // h := otelhttp.WithRouteTag(path, hf) return router.Handle(path, hf).Name(name) } func preferDeltaTemporalitySelector(kind metric.InstrumentKind) metricdata.Temporality { switch kind { case metric.InstrumentKindCounter, metric.InstrumentKindObservableCounter, metric.InstrumentKindHistogram: return metricdata.DeltaTemporality default: return metricdata.CumulativeTemporality } }