Documentation
¶
Index ¶
- func NewBroadcaster(orm ORM, ethClient evmclient.Client, config Config, lggr logger.Logger, ...) *broadcaster
- func NewORM(ds sqlutil.DataSource, evmChainID big.Int) *orm
- type AbigenContract
- type Broadcast
- type Broadcaster
- type BroadcasterInTest
- type Config
- type Listener
- type ListenerOpts
- type LogBroadcast
- type LogBroadcastAsKey
- type NullBroadcaster
- func (n *NullBroadcaster) AddDependents(int)
- func (n *NullBroadcaster) AwaitDependents() <-chan struct{}
- func (n *NullBroadcaster) BackfillBlockNumber() sql.NullInt64
- func (n *NullBroadcaster) Close() error
- func (n *NullBroadcaster) DependentReady()
- func (n *NullBroadcaster) HealthReport() map[string]error
- func (n *NullBroadcaster) IsConnected() bool
- func (n *NullBroadcaster) LogsFromBlock(common.Hash) int
- func (n *NullBroadcaster) MarkConsumed(ctx context.Context, ds sqlutil.DataSource, lb Broadcast) error
- func (n *NullBroadcaster) Name() string
- func (n *NullBroadcaster) OnNewLongestChain(context.Context, *evmtypes.Head)
- func (n *NullBroadcaster) Pause()
- func (n *NullBroadcaster) Ready() error
- func (n *NullBroadcaster) Register(listener Listener, opts ListenerOpts) (unsubscribe func())
- func (n *NullBroadcaster) ReplayFromBlock(number int64, forceBroadcast bool)
- func (n *NullBroadcaster) Resume()
- func (n *NullBroadcaster) Start(context.Context) error
- func (n *NullBroadcaster) TrackedAddressesCount() uint32
- func (n *NullBroadcaster) WasAlreadyConsumed(ctx context.Context, lb Broadcast) (bool, error)
- type ORM
- type ParseLogFunc
- type Topic
- type Uint64
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type AbigenContract ¶
type Broadcast ¶
type Broadcast interface {
DecodedLog() interface{}
RawLog() types.Log
String() string
LatestBlockNumber() uint64
LatestBlockHash() common.Hash
ReceiptsRoot() common.Hash
TransactionsRoot() common.Hash
StateRoot() common.Hash
JobID() int32
EVMChainID() big.Int
}
The Broadcast type wraps a types.Log but provides additional functionality for determining whether or not the log has been consumed and for marking the log as consumed
type Broadcaster ¶
type Broadcaster interface {
utils.DependentAwaiter
services.Service
httypes.HeadTrackable
// ReplayFromBlock enqueues a replay from the provided block number. If forceBroadcast is
// set to true, the broadcaster will broadcast logs that were already marked consumed
// previously by any subscribers.
ReplayFromBlock(number int64, forceBroadcast bool)
IsConnected() bool
Register(listener Listener, opts ListenerOpts) (unsubscribe func())
WasAlreadyConsumed(ctx context.Context, lb Broadcast) (bool, error)
// ds is optional
MarkConsumed(ctx context.Context, ds sqlutil.DataSource, lb Broadcast) error
}
The Broadcaster manages log subscription requests for the Chainlink node. Instead of creating a new subscription for each request, it multiplexes all subscriptions to all of the relevant contracts over a single connection and forwards the logs to the relevant subscribers.
In case of node crash and/or restart, the logs will be backfilled for subscribers that are added before all dependents of LogBroadcaster are done.
The backfill starts from the earliest block of either:
- Latest DB head minus BlockBackfillDepth and the maximum number of confirmations.
- Earliest pending or unconsumed log broadcast from DB.
If a subscriber is added after the LogBroadcaster does the initial backfill, then it's possible/likely that the backfill fill only have depth: 1 (from latest head)
Of course, these backfilled logs + any new logs will only be sent after the NumConfirmations for given subscriber.
type BroadcasterInTest ¶
type ListenerOpts ¶
type ListenerOpts struct {
Contract common.Address
// Event types to receive, with value filter for each field in the event
// No filter or an empty filter for a given field position mean: all values allowed
// the key should be a result of AbigenLog.Topic() call
// topic => topicValueFilters
LogsWithTopics map[common.Hash][][]Topic
ParseLog ParseLogFunc
// Minimum number of block confirmations before the log is received
MinIncomingConfirmations uint32
// ReplayStartedCallback is called by the log broadcaster once a replay request is received.
ReplayStartedCallback func()
}
type LogBroadcast ¶
LogBroadcast - data from log_broadcasts table columns
func (LogBroadcast) AsKey ¶
func (b LogBroadcast) AsKey() LogBroadcastAsKey
type LogBroadcastAsKey ¶
LogBroadcastAsKey - used as key in a map to filter out already consumed logs
func NewLogBroadcastAsKey ¶
func NewLogBroadcastAsKey(log types.Log, listener Listener) LogBroadcastAsKey
type NullBroadcaster ¶
type NullBroadcaster struct{ ErrMsg string }
func (*NullBroadcaster) AddDependents ¶
func (n *NullBroadcaster) AddDependents(int)
func (*NullBroadcaster) AwaitDependents ¶
func (n *NullBroadcaster) AwaitDependents() <-chan struct{}
func (*NullBroadcaster) BackfillBlockNumber ¶
func (n *NullBroadcaster) BackfillBlockNumber() sql.NullInt64
func (*NullBroadcaster) Close ¶
func (n *NullBroadcaster) Close() error
func (*NullBroadcaster) DependentReady ¶
func (n *NullBroadcaster) DependentReady()
DependentReady does noop for NullBroadcaster.
func (*NullBroadcaster) HealthReport ¶
func (n *NullBroadcaster) HealthReport() map[string]error
func (*NullBroadcaster) IsConnected ¶
func (n *NullBroadcaster) IsConnected() bool
func (*NullBroadcaster) LogsFromBlock ¶
func (n *NullBroadcaster) LogsFromBlock(common.Hash) int
func (*NullBroadcaster) MarkConsumed ¶
func (n *NullBroadcaster) MarkConsumed(ctx context.Context, ds sqlutil.DataSource, lb Broadcast) error
func (*NullBroadcaster) Name ¶
func (n *NullBroadcaster) Name() string
func (*NullBroadcaster) OnNewLongestChain ¶
func (n *NullBroadcaster) OnNewLongestChain(context.Context, *evmtypes.Head)
func (*NullBroadcaster) Pause ¶
func (n *NullBroadcaster) Pause()
func (*NullBroadcaster) Ready ¶
func (n *NullBroadcaster) Ready() error
func (*NullBroadcaster) Register ¶
func (n *NullBroadcaster) Register(listener Listener, opts ListenerOpts) (unsubscribe func())
func (*NullBroadcaster) ReplayFromBlock ¶
func (n *NullBroadcaster) ReplayFromBlock(number int64, forceBroadcast bool)
ReplayFromBlock implements the Broadcaster interface.
func (*NullBroadcaster) Resume ¶
func (n *NullBroadcaster) Resume()
func (*NullBroadcaster) Start ¶
func (n *NullBroadcaster) Start(context.Context) error
Start does noop for NullBroadcaster.
func (*NullBroadcaster) TrackedAddressesCount ¶
func (n *NullBroadcaster) TrackedAddressesCount() uint32
func (*NullBroadcaster) WasAlreadyConsumed ¶
type ORM ¶
type ORM interface {
// FindBroadcasts returns broadcasts for a range of block numbers, both consumed and unconsumed.
FindBroadcasts(ctx context.Context, fromBlockNum int64, toBlockNum int64) ([]LogBroadcast, error)
// CreateBroadcast inserts an unconsumed log broadcast for jobID.
CreateBroadcast(ctx context.Context, blockHash common.Hash, blockNumber uint64, logIndex uint, jobID int32) error
// WasBroadcastConsumed returns true if jobID consumed the log broadcast.
WasBroadcastConsumed(ctx context.Context, blockHash common.Hash, logIndex uint, jobID int32) (bool, error)
// MarkBroadcastConsumed marks the log broadcast as consumed by jobID.
MarkBroadcastConsumed(ctx context.Context, blockHash common.Hash, blockNumber uint64, logIndex uint, jobID int32) error
// MarkBroadcastsUnconsumed marks all log broadcasts from all jobs on or after fromBlock as
// unconsumed.
MarkBroadcastsUnconsumed(ctx context.Context, fromBlock int64) error
// SetPendingMinBlock sets the minimum block number for which there are pending broadcasts in the pool, or nil if empty.
SetPendingMinBlock(ctx context.Context, blockNum *int64) error
// GetPendingMinBlock returns the minimum block number for which there were pending broadcasts in the pool, or nil if it was empty.
GetPendingMinBlock(ctx context.Context) (blockNumber *int64, err error)
// Reinitialize cleans up the database by removing any unconsumed broadcasts, then updating (if necessary) and
// returning the pending minimum block number.
Reinitialize(ctx context.Context) (blockNumber *int64, err error)
WithDataSource(sqlutil.DataSource) ORM
}
ORM is the interface for log broadcasts.
- Unconsumed broadcasts are created just before notifying subscribers, who are responsible for marking them consumed.
- Pending broadcast block numbers are synced to the min from the pool (or deleted when empty)
- On reboot, backfill considers the min block number from unconsumed and pending broadcasts. Additionally, unconsumed entries are removed and the pending broadcasts number updated.