ampyobs

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 8, 2025 License: MIT Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	OutcomeOK     = "ok"
	OutcomeRetry  = "retry"
	OutcomeDLQ    = "dlq"
	OutcomeReject = "reject"
)

Public enums (bounded label values)

View Source
const (
	HeaderTraceParent = "traceparent"
	HeaderTraceState  = "tracestate"

	// Optional AmpyFin correlation headers (only if you choose to send them via ampy-bus):
	HeaderRunID      = "run_id"
	HeaderUniverseID = "universe_id"
	HeaderAsOf       = "as_of"
)

Variables

This section is empty.

Functions

func BusConsumedAdd

func BusConsumedAdd(ctx context.Context, topic string, n int64)

BusConsumedAdd increments consumed counter for a topic.

func BusDeliveryLatencyMs

func BusDeliveryLatencyMs(ctx context.Context, topic string, ms float64)

BusDeliveryLatencyMs records bus delivery latency for a topic.

func BusProducedAdd

func BusProducedAdd(ctx context.Context, topic string, n int64)

BusProducedAdd increments produced counter for a topic.

func C

func C(ctx context.Context) *slog.Logger

C returns a context-aware logger that enriches with trace/span if present.

func ExtractTrace

func ExtractTrace(parent context.Context, headers map[string]string) context.Context

ExtractTrace extracts W3C trace context from headers and returns a child context.

func Init

func Init(cfg Config) error

func InitFromAmpyConfig added in v0.0.3

func InitFromAmpyConfig(cfg map[string]any) error

InitFromAmpyConfig initializes observability using ampy-config. It reads observability settings from the ampy-config effective configuration map.

Expected config paths:

  • observability.service_name
  • observability.service_version
  • observability.environment
  • observability.collector_endpoint
  • observability.trace_protocol (optional, defaults to "grpc")
  • observability.enable_logs (optional, defaults to true)
  • observability.enable_metrics (optional, defaults to true)
  • observability.enable_tracing (optional, defaults to true)
  • observability.sampler (optional, "parent" or "ratio", defaults to "parent")
  • observability.sample_ratio (optional, 0.0-1.0, defaults to 0.25)

Example:

import (
  "context"
  ampyconfigpkg "github.com/AmpyFin/ampy-config/go/ampyconfig/pkg/config"
)

ctx := context.Background()
eff, _, err := ampyconfigpkg.LoadEffective(ctx, ampyconfigpkg.Options{
  ProfileYAML: "config/dev.yaml",
})
if err != nil {
  log.Fatal(err)
}
if err := ampyobs.InitFromAmpyConfig(eff); err != nil {
  log.Fatal(err)
}

func InjectTrace

func InjectTrace(ctx context.Context, headers map[string]string)

InjectTrace injects W3C trace context into key/value headers.

func L

func L() *slog.Logger

L returns a *slog.Logger without context.

func OMSOrderLatencyMs

func OMSOrderLatencyMs(ctx context.Context, broker string, ms float64)

OMSOrderLatencyMs records order latency for a broker.

func OMSOrderSubmitAdd

func OMSOrderSubmitAdd(ctx context.Context, broker string, outcome string)

OMSOrderSubmitAdd increments order submit counter for a broker+outcome.

func OMSRejectAdd

func OMSRejectAdd(ctx context.Context, broker string, reason string)

OMSRejectAdd increments rejection counter for a broker+reason.

func SetErrorHandler

func SetErrorHandler(handler func(error))

SetErrorHandler sets a custom error handler for OTel errors

func Shutdown

func Shutdown(ctx context.Context) error

func StartBusConsumeSpan

func StartBusConsumeSpan(parent context.Context, headers map[string]string, a BusAttrs) (context.Context, trace.Span)

StartBusConsumeSpan extracts W3C context from headers and starts `bus.consume` as a child of the upstream span. It also adds a span link to the upstream context.

func StartBusPublishSpan

func StartBusPublishSpan(ctx context.Context, a BusAttrs) (context.Context, trace.Span)

StartBusPublishSpan creates a `bus.publish` span with standardized attributes.

func StartSpan

func StartSpan(ctx context.Context, name string, kind trace.SpanKind, attrs ...attribute.KeyValue) (context.Context, trace.Span)

StartSpan creates a span with a conventional name and kind.

func ValidateProto added in v0.0.3

func ValidateProto(ctx context.Context, msg proto.Message) (context.Context, trace.Span, error)

ValidateProto validates a protobuf message and creates a trace span for the validation. This ensures messages are validated before processing and traces include schema info.

Example:

import (
  bars "github.com/AmpyFin/ampy-proto/v2/gen/go/ampy/bars/v1"
)

bar := &bars.Bar{...}
ctx, span, err := ampyobs.ValidateProto(ctx, bar)
if err != nil {
  return err
}
defer span.End()

Types

type BusAttrs

type BusAttrs struct {
	Topic        string
	SchemaFQDN   string
	MessageID    string
	PartitionKey string
	RunID        string
}

BusAttrs captures stable, low-cardinality attributes for bus spans.

type Config

type Config struct {
	ServiceName       string
	ServiceVersion    string
	Environment       string // dev | paper | prod
	CollectorEndpoint string // e.g. "http://localhost:4317" or "localhost:4317"
	TraceProtocol     string // "grpc" | "http" (default: "grpc")
	EnableLogs        bool   // JSON logs via slog (stdout)
	EnableMetrics     bool   // OTLP metrics to collector
	EnableTracing     bool   // OTLP traces to collector
	Sampler           string // "parent" | "ratio"
	SampleRatio       float64
}

type MessageHandler added in v0.0.3

type MessageHandler func(ctx context.Context, env ampybus.Envelope) error

MessageHandler is a function type for processing bus messages. It receives the context (with trace context extracted), the envelope, and should return an error if processing fails.

type TracedBus added in v0.0.3

type TracedBus struct {
	// contains filtered or unexported fields
}

TracedBus wraps an ampy-bus Bus to automatically inject trace context into messages. It provides methods that integrate with OpenTelemetry tracing.

func NewTracedBus added in v0.0.3

func NewTracedBus(bus *natsbinding.Bus) *TracedBus

NewTracedBus creates a new TracedBus that wraps an ampy-bus Bus instance. All messages published through this bus will automatically include trace context.

Example:

import (
  "github.com/AmpyFin/ampy-bus/pkg/ampybus/natsbinding"
)

bus, err := natsbinding.Connect(natsbinding.Config{
  URLs: "nats://localhost:4222",
})
if err != nil {
  log.Fatal(err)
}
defer bus.Close()

tracedBus := ampyobs.NewTracedBus(bus)

func (*TracedBus) ConsumePull added in v0.0.3

func (tb *TracedBus) ConsumePull(
	ctx context.Context,
	subject, durable string,
	handler MessageHandler,
) (func(), error)

ConsumePull subscribes to a subject using pull-based consumption and processes messages with automatic trace extraction. Each message gets its own trace span that links to the producer's trace.

Example:

cancel, err := tracedBus.ConsumePull(ctx, "ampy.dev.bars.>", "my-consumer",
  func(ctx context.Context, env ampybus.Envelope) error {
    var bar bars.Bar
    if err := proto.Unmarshal(env.Payload, &bar); err != nil {
      return err
    }
    // Process bar...
    return nil
  },
)
defer cancel()

func (*TracedBus) PublishEnvelope added in v0.0.3

func (tb *TracedBus) PublishEnvelope(ctx context.Context, env ampybus.Envelope) (*natsbinding.PubAck, error)

PublishEnvelope publishes an ampy-bus Envelope with automatic trace context injection. The trace context from the provided context is automatically added to the envelope headers.

Example:

import (
  "github.com/AmpyFin/ampy-bus/pkg/ampybus"
  bars "github.com/AmpyFin/ampy-proto/v2/gen/go/ampy/bars/v1"
)

bar := &bars.Bar{...}
payload, _ := proto.Marshal(bar)

env := ampybus.Envelope{
  Topic: "ampy/dev/bars/v1",
  Headers: ampybus.NewHeaders("ampy.bars.v1.Bar", "my-service", "data-source", "AAPL"),
  Payload: payload,
}

ack, err := tracedBus.PublishEnvelope(ctx, env)

func (*TracedBus) PublishProto added in v0.0.3

func (tb *TracedBus) PublishProto(
	ctx context.Context,
	topic string,
	msg proto.Message,
	producer, source, partitionKey string,
) (*natsbinding.PubAck, error)

PublishProto publishes a protobuf message with automatic envelope creation and trace injection. This is a convenience method that creates the envelope and headers automatically.

Example:

import (
  bars "github.com/AmpyFin/ampy-proto/v2/gen/go/ampy/bars/v1"
)

bar := &bars.Bar{
  Security: &common.SecurityId{Symbol: "AAPL", Mic: "XNAS"},
  // ... other fields
}

ack, err := tracedBus.PublishProto(ctx, "ampy/dev/bars/v1", bar, "my-service", "data-source", "AAPL")

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL