Documentation
¶
Index ¶
- Constants
- Variables
- func NewDataFeedsAggregator(config values.Map, reportCodec datastreams.ReportCodec) (types.Aggregator, error)
- func NewLLOAggregator(config values.Map) (types.Aggregator, error)
- func ParseConfig(config values.Map) (aggregatorConfig, error)
- type DataFeedsMercuryReportInfo
- func (*DataFeedsMercuryReportInfo) Descriptor() ([]byte, []int)deprecated
- func (x *DataFeedsMercuryReportInfo) GetBenchmarkPrice() []byte
- func (x *DataFeedsMercuryReportInfo) GetObservationTimestamp() int64
- func (*DataFeedsMercuryReportInfo) ProtoMessage()
- func (x *DataFeedsMercuryReportInfo) ProtoReflect() protoreflect.Message
- func (x *DataFeedsMercuryReportInfo) Reset()
- func (x *DataFeedsMercuryReportInfo) String() string
- type DataFeedsOutcomeMetadata
- func (*DataFeedsOutcomeMetadata) Descriptor() ([]byte, []int)deprecated
- func (x *DataFeedsOutcomeMetadata) GetFeedInfo() map[string]*DataFeedsMercuryReportInfo
- func (*DataFeedsOutcomeMetadata) ProtoMessage()
- func (x *DataFeedsOutcomeMetadata) ProtoReflect() protoreflect.Message
- func (x *DataFeedsOutcomeMetadata) Reset()
- func (x *DataFeedsOutcomeMetadata) String() string
- type EVMEncodableStreamUpdate
- type EVMEncoderKey
- type FeedConfig
- type LLOAggregator
- type LLOAggregatorConfig
- type LLOOutcomeMetadata
- func (*LLOOutcomeMetadata) Descriptor() ([]byte, []int)deprecated
- func (x *LLOOutcomeMetadata) GetStreamInfo() map[uint32]*LLOStreamInfo
- func (*LLOOutcomeMetadata) ProtoMessage()
- func (x *LLOOutcomeMetadata) ProtoReflect() protoreflect.Message
- func (x *LLOOutcomeMetadata) Reset()
- func (x *LLOOutcomeMetadata) String() string
- type LLOStreamInfo
- func (*LLOStreamInfo) Descriptor() ([]byte, []int)deprecated
- func (x *LLOStreamInfo) GetPrice() []byte
- func (x *LLOStreamInfo) GetTimestamp() int64
- func (*LLOStreamInfo) ProtoMessage()
- func (x *LLOStreamInfo) ProtoReflect() protoreflect.Message
- func (x *LLOStreamInfo) Reset()
- func (x *LLOStreamInfo) String() string
Examples ¶
Constants ¶
const ( // The following constants are used in value maps to ensure consistent naming while the underlying // implementation is untyped. TopLevelListOutputFieldName = EVMEncoderKey("Reports") FeedIDOutputFieldName = EVMEncoderKey("FeedID") RawReportOutputFieldName = EVMEncoderKey("RawReport") PriceOutputFieldName = EVMEncoderKey("Price") TimestampOutputFieldName = EVMEncoderKey("Timestamp") RemappedIDOutputFieldName = EVMEncoderKey("RemappedID") StreamIDOutputFieldName = EVMEncoderKey("StreamID") )
Variables ¶
var ( ErrInvalidConfig = errors.New("invalid config") ErrInsufficientConsensus = errors.New("insufficient consensus") ErrEmptyObservation = errors.New("empty observation") )
var File_capabilities_consensus_ocr3_datafeeds_data_feeds_types_proto protoreflect.FileDescriptor
Functions ¶
func NewDataFeedsAggregator ¶
func NewDataFeedsAggregator(config values.Map, reportCodec datastreams.ReportCodec) (types.Aggregator, error)
func NewLLOAggregator ¶ added in v0.6.0
func NewLLOAggregator(config values.Map) (types.Aggregator, error)
NewLLOAggregator creates a new LLOAggregator instance based on the provided configuration. The config should be a values.Map that has represents from the LLOAggregatorConfig. See [LLOAggreagatorConfig.ToMap]
func ParseConfig ¶
Types ¶
type DataFeedsMercuryReportInfo ¶
type DataFeedsMercuryReportInfo struct {
ObservationTimestamp int64 `protobuf:"varint,3,opt,name=observation_timestamp,json=observationTimestamp,proto3" json:"observation_timestamp,omitempty"`
BenchmarkPrice []byte `protobuf:"bytes,5,opt,name=benchmark_price,json=benchmarkPrice,proto3" json:"benchmark_price,omitempty"` // big.Int
// contains filtered or unexported fields
}
func (*DataFeedsMercuryReportInfo) Descriptor
deprecated
func (*DataFeedsMercuryReportInfo) Descriptor() ([]byte, []int)
Deprecated: Use DataFeedsMercuryReportInfo.ProtoReflect.Descriptor instead.
func (*DataFeedsMercuryReportInfo) GetBenchmarkPrice ¶
func (x *DataFeedsMercuryReportInfo) GetBenchmarkPrice() []byte
func (*DataFeedsMercuryReportInfo) GetObservationTimestamp ¶
func (x *DataFeedsMercuryReportInfo) GetObservationTimestamp() int64
func (*DataFeedsMercuryReportInfo) ProtoMessage ¶
func (*DataFeedsMercuryReportInfo) ProtoMessage()
func (*DataFeedsMercuryReportInfo) ProtoReflect ¶
func (x *DataFeedsMercuryReportInfo) ProtoReflect() protoreflect.Message
func (*DataFeedsMercuryReportInfo) Reset ¶
func (x *DataFeedsMercuryReportInfo) Reset()
func (*DataFeedsMercuryReportInfo) String ¶
func (x *DataFeedsMercuryReportInfo) String() string
type DataFeedsOutcomeMetadata ¶
type DataFeedsOutcomeMetadata struct {
FeedInfo map[string]*DataFeedsMercuryReportInfo `` /* 143-byte string literal not displayed */
// contains filtered or unexported fields
}
func (*DataFeedsOutcomeMetadata) Descriptor
deprecated
func (*DataFeedsOutcomeMetadata) Descriptor() ([]byte, []int)
Deprecated: Use DataFeedsOutcomeMetadata.ProtoReflect.Descriptor instead.
func (*DataFeedsOutcomeMetadata) GetFeedInfo ¶
func (x *DataFeedsOutcomeMetadata) GetFeedInfo() map[string]*DataFeedsMercuryReportInfo
func (*DataFeedsOutcomeMetadata) ProtoMessage ¶
func (*DataFeedsOutcomeMetadata) ProtoMessage()
func (*DataFeedsOutcomeMetadata) ProtoReflect ¶
func (x *DataFeedsOutcomeMetadata) ProtoReflect() protoreflect.Message
func (*DataFeedsOutcomeMetadata) Reset ¶
func (x *DataFeedsOutcomeMetadata) Reset()
func (*DataFeedsOutcomeMetadata) String ¶
func (x *DataFeedsOutcomeMetadata) String() string
type EVMEncodableStreamUpdate ¶ added in v0.6.0
type EVMEncodableStreamUpdate struct {
StreamID uint32
Price *big.Int
Timestamp uint32 // unix timestamp in seconds
RemappedID []byte
}
EVMEncodableStreamUpdate is the EVM encodable representation of a stream update. The field name must match the field name in the EVM encoder, and must be a valid EVMEncoderKey.
type EVMEncoderKey ¶ added in v0.6.0
type EVMEncoderKey = string
type FeedConfig ¶ added in v0.7.0
type FeedConfig struct {
Heartbeat int // seconds
Deviation string `mapstructure:"deviation"`
RemappedIDHex string `mapstructure:"remappedId"` // DO NOT CHANGE THIS. It's user facing in existing DataFeeds configurations and should be kept consistent for backward compatibility.
// contains filtered or unexported fields
}
FeedConfig defines the configuration for each individual feed used by the aggregator. It's map representation is used directly in user-defined workflows to specify the configuration for each feed.
func (FeedConfig) DeviationAsDecimal ¶ added in v0.7.0
func (c FeedConfig) DeviationAsDecimal() decimal.Decimal
func (FeedConfig) HeartbeatNanos ¶ added in v0.7.0
func (c FeedConfig) HeartbeatNanos() int64
func (FeedConfig) RemappedID ¶ added in v0.7.0
func (c FeedConfig) RemappedID() []byte
type LLOAggregator ¶ added in v0.6.0
type LLOAggregator struct {
// contains filtered or unexported fields
}
func (*LLOAggregator) Aggregate ¶ added in v0.6.0
func (a *LLOAggregator) Aggregate(lggr logger.Logger, previousOutcome *types.AggregationOutcome, observations map[ocrcommon.OracleID][]values.Value, f int) (*types.AggregationOutcome, error)
Aggregate implements the Aggregator interface For this implementation, we expect the LLO events to be the same across all nodes. And we expect the every observation only contains a single LLO event, ie len(observations.["some-oracle-id"]) == 1.
Example ¶
Example of using LLOAggregator.Aggregate with multiple oracles and price streams It constructs a LLOAggregator with two streams, simulates observations from three oracles, and demonstrates how to process the aggregation outcome. go test -run ExampleLLOAggregator_Aggregate
// Create a logger
lggr, err := logger.New()
if err != nil {
panic(err)
}
// 1. Create aggregator with 2 stream configs
configMap, _ := values.NewMap(map[string]interface{}{
"streams": map[string]interface{}{
"1": map[string]interface{}{
"deviation": "0.01", // 1% deviation threshold
"heartbeat": 3600, // 1 hour heartbeat
"remappedID": "0x680084f7347baFfb5C323c2982dfC90e04F9F918",
},
"2": map[string]interface{}{
"deviation": "0.02", // 2% deviation threshold
"heartbeat": 1800, // 30 min heartbeat
"remappedID": "0x00001237347baFfb5C323c1112dfC90e0789FFFF",
},
},
"allowedPartialStaleness": "0.2", // 20% partial staleness
})
aggregator, err := datafeeds.NewLLOAggregator(*configMap)
if err != nil {
panic(err)
}
// 2. Create empty previous outcome (first round); empty previousOutcome will cause all streams to be updated
var previousOutcome *types.AggregationOutcome
// 3. Create observations from 3 oracles
observations := make(map[ocrcommon.OracleID][]values.Value)
timestamp := uint64(61116379204) //uint64(time.Now().UnixNano()) //nolint: gosec // G115
// Setup price data for 2 streams
prices := map[uint32]decimal.Decimal{
1: decimal.NewFromFloat(1250.427975), // ETH/USD price
2: decimal.NewFromFloat(39250.25), // BTC/USD price
}
// Create the same observation for each oracle to ensure f+1 consensus
for i := ocrcommon.OracleID(1); i <= 3; i++ {
// Create LLO event with price payload
event := &datastreams.LLOStreamsTriggerEvent{
ObservationTimestampNanoseconds: timestamp,
Payload: make([]*datastreams.LLOStreamDecimal, 0, len(prices)),
}
// Add each price to the payload
for streamID, price := range prices {
// Convert decimal to binary representation
priceBinary, _ := price.MarshalBinary()
event.Payload = append(event.Payload, &datastreams.LLOStreamDecimal{
StreamID: streamID,
Decimal: priceBinary,
})
}
// Wrap the event in a values.Value
val, err2 := values.Wrap(event)
if err2 != nil {
panic(err2)
}
observations[i] = []values.Value{val}
}
// 4. Call Aggregate with f=1
outcome, err := aggregator.Aggregate(lggr, previousOutcome, observations, 1)
if err != nil {
fmt.Printf("Error: %v\n", err)
return
}
// 5. Print results
fmt.Printf("Should report: %v\n", outcome.ShouldReport)
// Decode the results to view updated streams
if outcome.ShouldReport {
streamIDs, reports, err := processOutcome(outcome)
if err != nil {
panic(err)
}
fmt.Printf("Updated streams: %d\n", len(streamIDs))
// Print details of each updated stream
for i, report := range reports {
fmt.Printf(" Stream %d: ID=%d, Price=%s, Timestamp=%d, RemappedID=%x\n",
i+1, report.StreamID, report.Price.String(), timestamp, report.RemappedID)
}
}
Output: Should report: true Updated streams: 2 Stream 1: ID=1, Price=1250427975000000000000, Timestamp=61116379204, RemappedID=680084f7347baffb5c323c2982dfc90e04f9f918 Stream 2: ID=2, Price=39250250000000000000000, Timestamp=61116379204, RemappedID=00001237347baffb5c323c1112dfc90e0789ffff
type LLOAggregatorConfig ¶ added in v0.6.0
type LLOAggregatorConfig struct {
// workaround for the fact that mapstructure doesn't support uint32 keys
//streams map[uint32]feedConfig `mapstructure:"-"`
Streams map[string]FeedConfig `mapstructure:"streams"`
// allowedPartialStaleness is an optional optimization that tries to maximize batching.
// Once any deviation or heartbeat threshold hits, we will include all other feeds that are
// within the allowedPartialStaleness range of their own heartbeat.
// For example, setting 0.2 will include all feeds that are within 20% of their heartbeat.
//allowedPartialStaleness float64 `mapstructure:"-"`
// workaround for the fact that mapstructure doesn't support float64 keys
AllowedPartialStaleness string `mapstructure:"allowedPartialStaleness"`
}
LLOAggregatorConfig is the config for the LLO aggregator. Example config: remappedID but a hex string streams:
"1": deviation: "0.1" heartbeat: 10 remappedID: "0x680084f7347baFfb5C323c2982dfC90e04F9F918" "2": deviation: "0.2" heartbeat: 20
allowedPartialStaleness: "0.2" The streams are the stream IDs that the aggregator will aggregate.
func NewLLOConfig ¶ added in v0.7.0
func NewLLOConfig(m values.Map) (LLOAggregatorConfig, error)
type LLOOutcomeMetadata ¶ added in v0.6.0
type LLOOutcomeMetadata struct {
StreamInfo map[uint32]*LLOStreamInfo `` /* 148-byte string literal not displayed */
// contains filtered or unexported fields
}
func (*LLOOutcomeMetadata) Descriptor
deprecated
added in
v0.6.0
func (*LLOOutcomeMetadata) Descriptor() ([]byte, []int)
Deprecated: Use LLOOutcomeMetadata.ProtoReflect.Descriptor instead.
func (*LLOOutcomeMetadata) GetStreamInfo ¶ added in v0.6.0
func (x *LLOOutcomeMetadata) GetStreamInfo() map[uint32]*LLOStreamInfo
func (*LLOOutcomeMetadata) ProtoMessage ¶ added in v0.6.0
func (*LLOOutcomeMetadata) ProtoMessage()
func (*LLOOutcomeMetadata) ProtoReflect ¶ added in v0.6.0
func (x *LLOOutcomeMetadata) ProtoReflect() protoreflect.Message
func (*LLOOutcomeMetadata) Reset ¶ added in v0.6.0
func (x *LLOOutcomeMetadata) Reset()
func (*LLOOutcomeMetadata) String ¶ added in v0.6.0
func (x *LLOOutcomeMetadata) String() string
type LLOStreamInfo ¶ added in v0.6.0
type LLOStreamInfo struct {
Timestamp int64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
Price []byte `protobuf:"bytes,2,opt,name=price,proto3" json:"price,omitempty"` // Decimal
// contains filtered or unexported fields
}
func (*LLOStreamInfo) Descriptor
deprecated
added in
v0.6.0
func (*LLOStreamInfo) Descriptor() ([]byte, []int)
Deprecated: Use LLOStreamInfo.ProtoReflect.Descriptor instead.
func (*LLOStreamInfo) GetPrice ¶ added in v0.6.0
func (x *LLOStreamInfo) GetPrice() []byte
func (*LLOStreamInfo) GetTimestamp ¶ added in v0.6.0
func (x *LLOStreamInfo) GetTimestamp() int64
func (*LLOStreamInfo) ProtoMessage ¶ added in v0.6.0
func (*LLOStreamInfo) ProtoMessage()
func (*LLOStreamInfo) ProtoReflect ¶ added in v0.6.0
func (x *LLOStreamInfo) ProtoReflect() protoreflect.Message
func (*LLOStreamInfo) Reset ¶ added in v0.6.0
func (x *LLOStreamInfo) Reset()
func (*LLOStreamInfo) String ¶ added in v0.6.0
func (x *LLOStreamInfo) String() string