Documentation
¶
Index ¶
- Constants
- func BusConsumedAdd(ctx context.Context, topic string, n int64)
- func BusDeliveryLatencyMs(ctx context.Context, topic string, ms float64)
- func BusProducedAdd(ctx context.Context, topic string, n int64)
- func C(ctx context.Context) *slog.Logger
- func ExtractTrace(parent context.Context, headers map[string]string) context.Context
- func Init(cfg Config) error
- func InitFromAmpyConfig(cfg map[string]any) error
- func InjectTrace(ctx context.Context, headers map[string]string)
- func L() *slog.Logger
- func OMSOrderLatencyMs(ctx context.Context, broker string, ms float64)
- func OMSOrderSubmitAdd(ctx context.Context, broker string, outcome string)
- func OMSRejectAdd(ctx context.Context, broker string, reason string)
- func SetErrorHandler(handler func(error))
- func Shutdown(ctx context.Context) error
- func StartBusConsumeSpan(parent context.Context, headers map[string]string, a BusAttrs) (context.Context, trace.Span)
- func StartBusPublishSpan(ctx context.Context, a BusAttrs) (context.Context, trace.Span)
- func StartSpan(ctx context.Context, name string, kind trace.SpanKind, ...) (context.Context, trace.Span)
- func ValidateProto(ctx context.Context, msg proto.Message) (context.Context, trace.Span, error)
- type BusAttrs
- type Config
- type MessageHandler
- type TracedBus
- func (tb *TracedBus) ConsumePull(ctx context.Context, subject, durable string, handler MessageHandler) (func(), error)
- func (tb *TracedBus) PublishEnvelope(ctx context.Context, env ampybus.Envelope) (*natsbinding.PubAck, error)
- func (tb *TracedBus) PublishProto(ctx context.Context, topic string, msg proto.Message, ...) (*natsbinding.PubAck, error)
Constants ¶
const ( OutcomeOK = "ok" OutcomeRetry = "retry" OutcomeDLQ = "dlq" OutcomeReject = "reject" )
Public enums (bounded label values)
const ( HeaderTraceParent = "traceparent" HeaderTraceState = "tracestate" // Optional AmpyFin correlation headers (only if you choose to send them via ampy-bus): HeaderRunID = "run_id" HeaderUniverseID = "universe_id" HeaderAsOf = "as_of" )
Variables ¶
This section is empty.
Functions ¶
func BusConsumedAdd ¶
BusConsumedAdd increments consumed counter for a topic.
func BusDeliveryLatencyMs ¶
BusDeliveryLatencyMs records bus delivery latency for a topic.
func BusProducedAdd ¶
BusProducedAdd increments produced counter for a topic.
func ExtractTrace ¶
ExtractTrace extracts W3C trace context from headers and returns a child context.
func InitFromAmpyConfig ¶ added in v0.0.3
InitFromAmpyConfig initializes observability using ampy-config. It reads observability settings from the ampy-config effective configuration map.
Expected config paths:
- observability.service_name
- observability.service_version
- observability.environment
- observability.collector_endpoint
- observability.trace_protocol (optional, defaults to "grpc")
- observability.enable_logs (optional, defaults to true)
- observability.enable_metrics (optional, defaults to true)
- observability.enable_tracing (optional, defaults to true)
- observability.sampler (optional, "parent" or "ratio", defaults to "parent")
- observability.sample_ratio (optional, 0.0-1.0, defaults to 0.25)
Example:
import (
"context"
ampyconfigpkg "github.com/AmpyFin/ampy-config/go/ampyconfig/pkg/config"
)
ctx := context.Background()
eff, _, err := ampyconfigpkg.LoadEffective(ctx, ampyconfigpkg.Options{
ProfileYAML: "config/dev.yaml",
})
if err != nil {
log.Fatal(err)
}
if err := ampyobs.InitFromAmpyConfig(eff); err != nil {
log.Fatal(err)
}
func InjectTrace ¶
InjectTrace injects W3C trace context into key/value headers.
func OMSOrderLatencyMs ¶
OMSOrderLatencyMs records order latency for a broker.
func OMSOrderSubmitAdd ¶
OMSOrderSubmitAdd increments order submit counter for a broker+outcome.
func OMSRejectAdd ¶
OMSRejectAdd increments rejection counter for a broker+reason.
func SetErrorHandler ¶
func SetErrorHandler(handler func(error))
SetErrorHandler sets a custom error handler for OTel errors
func StartBusConsumeSpan ¶
func StartBusConsumeSpan(parent context.Context, headers map[string]string, a BusAttrs) (context.Context, trace.Span)
StartBusConsumeSpan extracts W3C context from headers and starts `bus.consume` as a child of the upstream span. It also adds a span link to the upstream context.
func StartBusPublishSpan ¶
StartBusPublishSpan creates a `bus.publish` span with standardized attributes.
func StartSpan ¶
func StartSpan(ctx context.Context, name string, kind trace.SpanKind, attrs ...attribute.KeyValue) (context.Context, trace.Span)
StartSpan creates a span with a conventional name and kind.
func ValidateProto ¶ added in v0.0.3
ValidateProto validates a protobuf message and creates a trace span for the validation. This ensures messages are validated before processing and traces include schema info.
Example:
import (
bars "github.com/AmpyFin/ampy-proto/v2/gen/go/ampy/bars/v1"
)
bar := &bars.Bar{...}
ctx, span, err := ampyobs.ValidateProto(ctx, bar)
if err != nil {
return err
}
defer span.End()
Types ¶
type BusAttrs ¶
type BusAttrs struct {
Topic string
SchemaFQDN string
MessageID string
PartitionKey string
RunID string
}
BusAttrs captures stable, low-cardinality attributes for bus spans.
type Config ¶
type Config struct {
ServiceName string
ServiceVersion string
Environment string // dev | paper | prod
CollectorEndpoint string // e.g. "http://localhost:4317" or "localhost:4317"
TraceProtocol string // "grpc" | "http" (default: "grpc")
EnableLogs bool // JSON logs via slog (stdout)
EnableMetrics bool // OTLP metrics to collector
EnableTracing bool // OTLP traces to collector
Sampler string // "parent" | "ratio"
SampleRatio float64
}
type MessageHandler ¶ added in v0.0.3
MessageHandler is a function type for processing bus messages. It receives the context (with trace context extracted), the envelope, and should return an error if processing fails.
type TracedBus ¶ added in v0.0.3
type TracedBus struct {
// contains filtered or unexported fields
}
TracedBus wraps an ampy-bus Bus to automatically inject trace context into messages. It provides methods that integrate with OpenTelemetry tracing.
func NewTracedBus ¶ added in v0.0.3
func NewTracedBus(bus *natsbinding.Bus) *TracedBus
NewTracedBus creates a new TracedBus that wraps an ampy-bus Bus instance. All messages published through this bus will automatically include trace context.
Example:
import (
"github.com/AmpyFin/ampy-bus/pkg/ampybus/natsbinding"
)
bus, err := natsbinding.Connect(natsbinding.Config{
URLs: "nats://localhost:4222",
})
if err != nil {
log.Fatal(err)
}
defer bus.Close()
tracedBus := ampyobs.NewTracedBus(bus)
func (*TracedBus) ConsumePull ¶ added in v0.0.3
func (tb *TracedBus) ConsumePull( ctx context.Context, subject, durable string, handler MessageHandler, ) (func(), error)
ConsumePull subscribes to a subject using pull-based consumption and processes messages with automatic trace extraction. Each message gets its own trace span that links to the producer's trace.
Example:
cancel, err := tracedBus.ConsumePull(ctx, "ampy.dev.bars.>", "my-consumer",
func(ctx context.Context, env ampybus.Envelope) error {
var bar bars.Bar
if err := proto.Unmarshal(env.Payload, &bar); err != nil {
return err
}
// Process bar...
return nil
},
)
defer cancel()
func (*TracedBus) PublishEnvelope ¶ added in v0.0.3
func (tb *TracedBus) PublishEnvelope(ctx context.Context, env ampybus.Envelope) (*natsbinding.PubAck, error)
PublishEnvelope publishes an ampy-bus Envelope with automatic trace context injection. The trace context from the provided context is automatically added to the envelope headers.
Example:
import (
"github.com/AmpyFin/ampy-bus/pkg/ampybus"
bars "github.com/AmpyFin/ampy-proto/v2/gen/go/ampy/bars/v1"
)
bar := &bars.Bar{...}
payload, _ := proto.Marshal(bar)
env := ampybus.Envelope{
Topic: "ampy/dev/bars/v1",
Headers: ampybus.NewHeaders("ampy.bars.v1.Bar", "my-service", "data-source", "AAPL"),
Payload: payload,
}
ack, err := tracedBus.PublishEnvelope(ctx, env)
func (*TracedBus) PublishProto ¶ added in v0.0.3
func (tb *TracedBus) PublishProto( ctx context.Context, topic string, msg proto.Message, producer, source, partitionKey string, ) (*natsbinding.PubAck, error)
PublishProto publishes a protobuf message with automatic envelope creation and trace injection. This is a convenience method that creates the envelope and headers automatically.
Example:
import (
bars "github.com/AmpyFin/ampy-proto/v2/gen/go/ampy/bars/v1"
)
bar := &bars.Bar{
Security: &common.SecurityId{Symbol: "AAPL", Mic: "XNAS"},
// ... other fields
}
ack, err := tracedBus.PublishProto(ctx, "ampy/dev/bars/v1", bar, "my-service", "data-source", "AAPL")