datafeeds

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2025 License: MIT Imports: 22 Imported by: 1

Documentation

Index

Examples

Constants

View Source
const (

	// The following constants are used in value maps to ensure consistent naming while the underlying
	// implementation is untyped.
	TopLevelListOutputFieldName = EVMEncoderKey("Reports")
	FeedIDOutputFieldName       = EVMEncoderKey("FeedID")
	RawReportOutputFieldName    = EVMEncoderKey("RawReport")
	PriceOutputFieldName        = EVMEncoderKey("Price")
	TimestampOutputFieldName    = EVMEncoderKey("Timestamp")
	RemappedIDOutputFieldName   = EVMEncoderKey("RemappedID")
	StreamIDOutputFieldName     = EVMEncoderKey("StreamID")
)

Variables

View Source
var (
	ErrInvalidConfig         = errors.New("invalid config")
	ErrInsufficientConsensus = errors.New("insufficient consensus")
	ErrEmptyObservation      = errors.New("empty observation")
)
View Source
var File_capabilities_consensus_ocr3_datafeeds_data_feeds_types_proto protoreflect.FileDescriptor

Functions

func NewDataFeedsAggregator

func NewDataFeedsAggregator(config values.Map, reportCodec datastreams.ReportCodec) (types.Aggregator, error)

func NewLLOAggregator added in v0.6.0

func NewLLOAggregator(config values.Map) (types.Aggregator, error)

NewLLOAggregator creates a new LLOAggregator instance based on the provided configuration. The config should be a values.Map that has represents from the LLOAggregatorConfig. See [LLOAggreagatorConfig.ToMap]

func ParseConfig

func ParseConfig(config values.Map) (aggregatorConfig, error)

Types

type DataFeedsMercuryReportInfo

type DataFeedsMercuryReportInfo struct {
	ObservationTimestamp int64  `protobuf:"varint,3,opt,name=observation_timestamp,json=observationTimestamp,proto3" json:"observation_timestamp,omitempty"`
	BenchmarkPrice       []byte `protobuf:"bytes,5,opt,name=benchmark_price,json=benchmarkPrice,proto3" json:"benchmark_price,omitempty"` // big.Int
	// contains filtered or unexported fields
}

func (*DataFeedsMercuryReportInfo) Descriptor deprecated

func (*DataFeedsMercuryReportInfo) Descriptor() ([]byte, []int)

Deprecated: Use DataFeedsMercuryReportInfo.ProtoReflect.Descriptor instead.

func (*DataFeedsMercuryReportInfo) GetBenchmarkPrice

func (x *DataFeedsMercuryReportInfo) GetBenchmarkPrice() []byte

func (*DataFeedsMercuryReportInfo) GetObservationTimestamp

func (x *DataFeedsMercuryReportInfo) GetObservationTimestamp() int64

func (*DataFeedsMercuryReportInfo) ProtoMessage

func (*DataFeedsMercuryReportInfo) ProtoMessage()

func (*DataFeedsMercuryReportInfo) ProtoReflect

func (*DataFeedsMercuryReportInfo) Reset

func (x *DataFeedsMercuryReportInfo) Reset()

func (*DataFeedsMercuryReportInfo) String

func (x *DataFeedsMercuryReportInfo) String() string

type DataFeedsOutcomeMetadata

type DataFeedsOutcomeMetadata struct {
	FeedInfo map[string]*DataFeedsMercuryReportInfo `` /* 143-byte string literal not displayed */
	// contains filtered or unexported fields
}

func (*DataFeedsOutcomeMetadata) Descriptor deprecated

func (*DataFeedsOutcomeMetadata) Descriptor() ([]byte, []int)

Deprecated: Use DataFeedsOutcomeMetadata.ProtoReflect.Descriptor instead.

func (*DataFeedsOutcomeMetadata) GetFeedInfo

func (*DataFeedsOutcomeMetadata) ProtoMessage

func (*DataFeedsOutcomeMetadata) ProtoMessage()

func (*DataFeedsOutcomeMetadata) ProtoReflect

func (x *DataFeedsOutcomeMetadata) ProtoReflect() protoreflect.Message

func (*DataFeedsOutcomeMetadata) Reset

func (x *DataFeedsOutcomeMetadata) Reset()

func (*DataFeedsOutcomeMetadata) String

func (x *DataFeedsOutcomeMetadata) String() string

type EVMEncodableStreamUpdate added in v0.6.0

type EVMEncodableStreamUpdate struct {
	StreamID   uint32
	Price      *big.Int
	Timestamp  uint32 // unix timestamp in seconds
	RemappedID []byte
}

EVMEncodableStreamUpdate is the EVM encodable representation of a stream update. The field name must match the field name in the EVM encoder, and must be a valid EVMEncoderKey.

type EVMEncoderKey added in v0.6.0

type EVMEncoderKey = string

type FeedConfig added in v0.7.0

type FeedConfig struct {
	Heartbeat     int    // seconds
	Deviation     string `mapstructure:"deviation"`
	RemappedIDHex string `mapstructure:"remappedId"` // DO NOT CHANGE THIS. It's user facing in existing DataFeeds configurations and should be kept consistent for backward compatibility.
	// contains filtered or unexported fields
}

FeedConfig defines the configuration for each individual feed used by the aggregator. It's map representation is used directly in user-defined workflows to specify the configuration for each feed.

func (FeedConfig) DeviationAsDecimal added in v0.7.0

func (c FeedConfig) DeviationAsDecimal() decimal.Decimal

func (FeedConfig) HeartbeatNanos added in v0.7.0

func (c FeedConfig) HeartbeatNanos() int64

func (FeedConfig) RemappedID added in v0.7.0

func (c FeedConfig) RemappedID() []byte

type LLOAggregator added in v0.6.0

type LLOAggregator struct {
	// contains filtered or unexported fields
}

func (*LLOAggregator) Aggregate added in v0.6.0

func (a *LLOAggregator) Aggregate(lggr logger.Logger, previousOutcome *types.AggregationOutcome, observations map[ocrcommon.OracleID][]values.Value, f int) (*types.AggregationOutcome, error)

Aggregate implements the Aggregator interface For this implementation, we expect the LLO events to be the same across all nodes. And we expect the every observation only contains a single LLO event, ie len(observations.["some-oracle-id"]) == 1.

Example

Example of using LLOAggregator.Aggregate with multiple oracles and price streams It constructs a LLOAggregator with two streams, simulates observations from three oracles, and demonstrates how to process the aggregation outcome. go test -run ExampleLLOAggregator_Aggregate

// Create a logger
lggr, err := logger.New()
if err != nil {
	panic(err)
}
// 1. Create aggregator with 2 stream configs
configMap, _ := values.NewMap(map[string]interface{}{
	"streams": map[string]interface{}{
		"1": map[string]interface{}{
			"deviation":  "0.01", // 1% deviation threshold
			"heartbeat":  3600,   // 1 hour heartbeat
			"remappedID": "0x680084f7347baFfb5C323c2982dfC90e04F9F918",
		},
		"2": map[string]interface{}{
			"deviation":  "0.02", // 2% deviation threshold
			"heartbeat":  1800,   // 30 min heartbeat
			"remappedID": "0x00001237347baFfb5C323c1112dfC90e0789FFFF",
		},
	},
	"allowedPartialStaleness": "0.2", // 20% partial staleness
})

aggregator, err := datafeeds.NewLLOAggregator(*configMap)
if err != nil {
	panic(err)
}

// 2. Create empty previous outcome (first round); empty previousOutcome will cause all streams to be updated
var previousOutcome *types.AggregationOutcome

// 3. Create observations from 3 oracles
observations := make(map[ocrcommon.OracleID][]values.Value)
timestamp := uint64(61116379204) //uint64(time.Now().UnixNano()) //nolint: gosec // G115

// Setup price data for 2 streams
prices := map[uint32]decimal.Decimal{
	1: decimal.NewFromFloat(1250.427975), // ETH/USD price
	2: decimal.NewFromFloat(39250.25),    // BTC/USD price
}

// Create the same observation for each oracle to ensure f+1 consensus
for i := ocrcommon.OracleID(1); i <= 3; i++ {
	// Create LLO event with price payload
	event := &datastreams.LLOStreamsTriggerEvent{
		ObservationTimestampNanoseconds: timestamp,
		Payload:                         make([]*datastreams.LLOStreamDecimal, 0, len(prices)),
	}

	// Add each price to the payload
	for streamID, price := range prices {
		// Convert decimal to binary representation
		priceBinary, _ := price.MarshalBinary()

		event.Payload = append(event.Payload, &datastreams.LLOStreamDecimal{
			StreamID: streamID,
			Decimal:  priceBinary,
		})
	}

	// Wrap the event in a values.Value
	val, err2 := values.Wrap(event)
	if err2 != nil {
		panic(err2)
	}
	observations[i] = []values.Value{val}
}

// 4. Call Aggregate with f=1
outcome, err := aggregator.Aggregate(lggr, previousOutcome, observations, 1)
if err != nil {
	fmt.Printf("Error: %v\n", err)
	return
}

// 5. Print results
fmt.Printf("Should report: %v\n", outcome.ShouldReport)

// Decode the results to view updated streams
if outcome.ShouldReport {
	streamIDs, reports, err := processOutcome(outcome)
	if err != nil {
		panic(err)
	}
	fmt.Printf("Updated streams: %d\n", len(streamIDs))

	// Print details of each updated stream
	for i, report := range reports {
		fmt.Printf("  Stream %d: ID=%d, Price=%s, Timestamp=%d, RemappedID=%x\n",
			i+1, report.StreamID, report.Price.String(), timestamp, report.RemappedID)
	}
}
Output:

Should report: true
Updated streams: 2
  Stream 1: ID=1, Price=1250427975000000000000, Timestamp=61116379204, RemappedID=680084f7347baffb5c323c2982dfc90e04f9f918
  Stream 2: ID=2, Price=39250250000000000000000, Timestamp=61116379204, RemappedID=00001237347baffb5c323c1112dfc90e0789ffff

type LLOAggregatorConfig added in v0.6.0

type LLOAggregatorConfig struct {
	// workaround for the fact that mapstructure doesn't support uint32 keys
	//streams    map[uint32]feedConfig `mapstructure:"-"`
	Streams map[string]FeedConfig `mapstructure:"streams"`
	// allowedPartialStaleness is an optional optimization that tries to maximize batching.
	// Once any deviation or heartbeat threshold hits, we will include all other feeds that are
	// within the allowedPartialStaleness range of their own heartbeat.
	// For example, setting 0.2 will include all feeds that are within 20% of their heartbeat.
	//allowedPartialStaleness float64 `mapstructure:"-"`
	// workaround for the fact that mapstructure doesn't support float64 keys
	AllowedPartialStaleness string `mapstructure:"allowedPartialStaleness"`
}

LLOAggregatorConfig is the config for the LLO aggregator. Example config: remappedID but a hex string streams:

	"1":
	  deviation: "0.1"
	  heartbeat: 10
   remappedID: "0x680084f7347baFfb5C323c2982dfC90e04F9F918"
	"2":
	  deviation: "0.2"
	  heartbeat: 20

allowedPartialStaleness: "0.2" The streams are the stream IDs that the aggregator will aggregate.

func NewLLOConfig added in v0.7.0

func NewLLOConfig(m values.Map) (LLOAggregatorConfig, error)

func (LLOAggregatorConfig) ToMap added in v0.7.0

func (c LLOAggregatorConfig) ToMap() (*values.Map, error)

ToMap converts the LLOAggregatorConfig to a values.Map, which is suitable for the [NewAggegator] function in the OCR3 Aggregator interface.

type LLOOutcomeMetadata added in v0.6.0

type LLOOutcomeMetadata struct {
	StreamInfo map[uint32]*LLOStreamInfo `` /* 148-byte string literal not displayed */
	// contains filtered or unexported fields
}

func (*LLOOutcomeMetadata) Descriptor deprecated added in v0.6.0

func (*LLOOutcomeMetadata) Descriptor() ([]byte, []int)

Deprecated: Use LLOOutcomeMetadata.ProtoReflect.Descriptor instead.

func (*LLOOutcomeMetadata) GetStreamInfo added in v0.6.0

func (x *LLOOutcomeMetadata) GetStreamInfo() map[uint32]*LLOStreamInfo

func (*LLOOutcomeMetadata) ProtoMessage added in v0.6.0

func (*LLOOutcomeMetadata) ProtoMessage()

func (*LLOOutcomeMetadata) ProtoReflect added in v0.6.0

func (x *LLOOutcomeMetadata) ProtoReflect() protoreflect.Message

func (*LLOOutcomeMetadata) Reset added in v0.6.0

func (x *LLOOutcomeMetadata) Reset()

func (*LLOOutcomeMetadata) String added in v0.6.0

func (x *LLOOutcomeMetadata) String() string

type LLOStreamInfo added in v0.6.0

type LLOStreamInfo struct {
	Timestamp int64  `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	Price     []byte `protobuf:"bytes,2,opt,name=price,proto3" json:"price,omitempty"` // Decimal
	// contains filtered or unexported fields
}

func (*LLOStreamInfo) Descriptor deprecated added in v0.6.0

func (*LLOStreamInfo) Descriptor() ([]byte, []int)

Deprecated: Use LLOStreamInfo.ProtoReflect.Descriptor instead.

func (*LLOStreamInfo) GetPrice added in v0.6.0

func (x *LLOStreamInfo) GetPrice() []byte

func (*LLOStreamInfo) GetTimestamp added in v0.6.0

func (x *LLOStreamInfo) GetTimestamp() int64

func (*LLOStreamInfo) ProtoMessage added in v0.6.0

func (*LLOStreamInfo) ProtoMessage()

func (*LLOStreamInfo) ProtoReflect added in v0.6.0

func (x *LLOStreamInfo) ProtoReflect() protoreflect.Message

func (*LLOStreamInfo) Reset added in v0.6.0

func (x *LLOStreamInfo) Reset()

func (*LLOStreamInfo) String added in v0.6.0

func (x *LLOStreamInfo) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL