execution

package
v1.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 25, 2025 License: GPL-3.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// SubNewPendingTransactions subscribes to new pending transactions.
	SubNewPendingTransactions SubscriptionType = "newPendingTransactions"
	// RPCMethodTxpoolContent is the RPC method for getting the content of the transaction pool.
	RPCMethodTxpoolContent = "txpool_content"
	// RPCMethodPendingTransactions is the RPC method for getting pending transactions.
	RPCMethodPendingTransactions = "eth_pendingTransactions"
	// RPCMethodGetTransactionByHash is the RPC method for getting a transaction by its hash.
	RPCMethodGetTransactionByHash = "eth_getTransactionByHash"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

ExecutionClient represents a unified execution client with both WebSocket and RPC capabilities.

func NewClient

func NewClient(ctx context.Context, log logrus.FieldLogger, config *Config) (*Client, error)

NewClient creates a new unified execution client.

func (*Client) BatchCallContext

func (c *Client) BatchCallContext(ctx context.Context, method string, params []interface{}) ([]json.RawMessage, error)

BatchCallContext performs a batch JSON-RPC call for multiple transactions.

func (*Client) BatchGetTransactionsByHash

func (c *Client) BatchGetTransactionsByHash(ctx context.Context, hashes []string) ([]json.RawMessage, error)

BatchGetTransactionsByHash retrieves transactions by their hashes.

func (*Client) CallContext

func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error

CallContext calls an RPC method with the given context.

func (*Client) GetClientInfo

func (c *Client) GetClientInfo(ctx context.Context, version *string) error

GetClientInfo gets the client version info.

func (*Client) GetPendingTransactions

func (c *Client) GetPendingTransactions(ctx context.Context) ([]json.RawMessage, error)

GetPendingTransactions retrieves pending transactions.

func (*Client) GetRPCClient

func (c *Client) GetRPCClient() *rpc.Client

GetRPCClient provides access to the RPC client directly.

func (*Client) GetSender

func (c *Client) GetSender(tx *types.Transaction) (common.Address, error)

GetSender retrieves the sender of a transaction.

func (*Client) GetSigner

func (c *Client) GetSigner() types.Signer

GetSigner returns the signer.

func (*Client) GetTxpoolContent

func (c *Client) GetTxpoolContent(ctx context.Context) (json.RawMessage, error)

GetTxpoolContent retrieves the full transaction pool content.

func (*Client) GetWebSocketClient

func (c *Client) GetWebSocketClient() *rpc.Client

GetWebSocketClient provides access to the WebSocket client directly.

func (*Client) InitSigner

func (c *Client) InitSigner(ctx context.Context)

InitSigner initialises the transaction signer. This is used to determine mempool tx senders.

func (*Client) Start

func (c *Client) Start(ctx context.Context) error

Start starts the execution client.

func (*Client) Stop

func (c *Client) Stop(ctx context.Context) error

Stop stops the execution client.

func (*Client) SubscribeToNewPendingTxs

func (c *Client) SubscribeToNewPendingTxs(ctx context.Context) (<-chan string, <-chan error, error)

SubscribeToNewPendingTxs subscribes to new pending transaction notifications

type ClientProvider

type ClientProvider interface {
	// GetTxpoolContent retrieves the full transaction pool content.
	GetTxpoolContent(ctx context.Context) (json.RawMessage, error)

	// GetPendingTransactions retrieves pending transactions.
	GetPendingTransactions(ctx context.Context) ([]json.RawMessage, error)

	// BatchGetTransactionsByHash retrieves transactions by their hashes.
	BatchGetTransactionsByHash(ctx context.Context, hashes []string) ([]json.RawMessage, error)

	// SubscribeToNewPendingTxs subscribes to new pending transaction notifications.
	SubscribeToNewPendingTxs(ctx context.Context) (<-chan string, <-chan error, error)

	// CallContext performs a JSON-RPC call with the given arguments.
	CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
}

ClientProvider defines the interface for unified execution client (WS and RPC) operations.

type Config

type Config struct {
	// Enabled is whether the execution client is enabled.
	Enabled bool `yaml:"enabled" default:"false"`
	// EthPendingTxsEnabled is whether we suppliment fetching tx's using eth_pendingTransactions periodically.
	EthPendingTxsEnabled bool `yaml:"ethPendingTxsEnabled" default:"false"`
	// TxPoolContentEnabled is whether we suppliment fetching tx's using txpool_content periodically.
	TxPoolContentEnabled bool `yaml:"txPoolContentEnabled" default:"true"`
	// WebsocketEnabled is whether the websocket is enabled.
	WebsocketEnabled bool `yaml:"websocketEnabled" default:"false"`
	// WSAddress is the WebSocket address of the execution client for subscriptions.
	WSAddress string `yaml:"wsAddress"`
	// RPCAddress is the RPC address of the execution client for txpool_content calls.
	RPCAddress string `yaml:"rpcAddress"`
	// Headers is a map of headers to send to the execution client.
	Headers map[string]string `yaml:"headers"`
	// FetchInterval is how often to fetch txpool_content (in seconds).
	FetchInterval int `yaml:"fetchInterval" default:"15"`
	// PruneDuration is how long to keep pending transactions in memory before pruning (in seconds).
	PruneDuration int `yaml:"pruneDuration" default:"300"`
	// ProcessorWorkerCount is the number of worker goroutines for processing transactions.
	ProcessorWorkerCount int `yaml:"processorWorkerCount" default:"50"`
	// RpcBatchSize is the number of transactions to include in a single RPC batch call.
	RpcBatchSize int `yaml:"rpcBatchSize" default:"40"`
	// QueueSize is the size of the transaction processing queue.
	QueueSize int `yaml:"queueSize" default:"5000"`
	// ProcessingInterval is the interval at which to process batches of transactions (in milliseconds).
	ProcessingInterval int `yaml:"processingInterval" default:"500"`
	// MaxConcurrency is the maximum number of concurrent batch RPC requests.
	MaxConcurrency int `yaml:"maxConcurrency" default:"5"`
	// CircuitBreakerFailureThreshold is the number of consecutive failures before opening the circuit breaker.
	CircuitBreakerFailureThreshold int `yaml:"circuitBreakerFailureThreshold" default:"5"`
	// CircuitBreakerResetTimeout is the time to wait before transitioning from open to half-open (in seconds).
	CircuitBreakerResetTimeout int `yaml:"circuitBreakerResetTimeout" default:"30"`
}

Config defines configuration for connecting to an execution client.

type EventCallback

type EventCallback func(ctx context.Context, event interface{}) error

EventCallback is a generic callback function for subscription events.

type MempoolWatcher

type MempoolWatcher struct {
	// contains filtered or unexported fields
}

MempoolWatcher captures and processes pending transactions from the Ethereum execution client. It uses three complementary methods to ensure comprehensive transaction coverage: 1. Websocket subscription to newPendingTransactions for real-time notification 2. Periodic polling of txpool_content for full transaction details 3. Periodic polling of eth_pendingTransactions as a fallback/supplementary source

Transactions flow through the system as follows: - Detected via any of the three sources above - Added to pendingTxs map (temporary storage until processed) - Queued for processing via txQueue - Processed by worker goroutines - Marked as processed in txQueue.processed map to prevent reprocessing - Removed from pendingTxs map

The pendingTxs map functions as a temporary workspace for in-flight processing, not as a permanent mirror of the mempool state.

func NewMempoolWatcher

func NewMempoolWatcher(
	client ClientProvider,
	log logrus.FieldLogger,
	config *Config,
	processTxCallback func(context.Context, *PendingTxRecord, json.RawMessage) error,
	metrics *Metrics,
) *MempoolWatcher

NewMempoolWatcher creates a new MempoolWatcher with configured circuit breakers for resilient RPC operations

func (*MempoolWatcher) Start

func (w *MempoolWatcher) Start(ctx context.Context) error

Start initializes the watcher's context and launches all background goroutines for transaction discovery and processing.

func (*MempoolWatcher) Stop

func (w *MempoolWatcher) Stop()

Stop cleanly shuts down the watcher, canceling the WebSocket subscription and waiting for all goroutines to complete.

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

func NewMetrics

func NewMetrics(namespace, networkID string) *Metrics

func (*Metrics) AddMempoolTxExpired

func (m *Metrics) AddMempoolTxExpired(count int)

AddMempoolTxExpired increments the counter for expired mempool transactions.

func (*Metrics) AddMempoolTxProcessed

func (m *Metrics) AddMempoolTxProcessed(count int)

AddMempoolTxProcessed increments the counter for processed mempool transactions.

func (*Metrics) AddMempoolTxReceived

func (m *Metrics) AddMempoolTxReceived(count int)

AddMempoolTxReceived increments the counter for received mempool transactions.

func (*Metrics) AddQueueRejections

func (m *Metrics) AddQueueRejections(count int)

AddQueueRejections increments the counter for queue rejections.

func (*Metrics) AddQueueThroughput

func (m *Metrics) AddQueueThroughput(count int)

AddQueueThroughput increments the counter for queue throughput.

func (*Metrics) AddTxBySource

func (m *Metrics) AddTxBySource(source string, count int)

AddTxBySource increments the counter for transactions by source.

func (*Metrics) AddTxProcessingOutcome

func (m *Metrics) AddTxProcessingOutcome(outcome string, count int)

AddTxProcessingOutcome increments the counter for transaction processing outcomes.

func (*Metrics) ObserveBatchProcessingDuration

func (m *Metrics) ObserveBatchProcessingDuration(duration float64)

ObserveBatchProcessingDuration observes the duration of batch processing.

func (*Metrics) ObserveRPCRequestDuration

func (m *Metrics) ObserveRPCRequestDuration(method string, duration float64)

ObserveRPCRequestDuration observes the duration of RPC requests.

func (*Metrics) SetCircuitBreakerState

func (m *Metrics) SetCircuitBreakerState(breakerName, state string)

SetCircuitBreakerState updates the circuit breaker state gauge

func (*Metrics) SetMempoolTxPending

func (m *Metrics) SetMempoolTxPending(count int)

SetMempoolTxPending sets the gauge for pending mempool transactions.

func (*Metrics) SetProcessedCacheSize

func (m *Metrics) SetProcessedCacheSize(size int)

SetProcessedCacheSize sets the gauge for processed cache size.

func (*Metrics) SetQueueSize

func (m *Metrics) SetQueueSize(size int)

SetQueueSize sets the queue size gauge.

func (*Metrics) SetWebsocketConnected

func (m *Metrics) SetWebsocketConnected(connected bool)

SetWebsocketConnected sets the gauge for websocket connection.

type PendingTxRecord

type PendingTxRecord struct {
	Hash             string
	FirstSeen        time.Time
	Attempts         int
	TxData           json.RawMessage // Raw tx data (when available).
	Source           string          // Eg: "websocket", "txpool_content", or "eth_pendingTransactions".
	MarkedForPruning bool
}

PendingTxRecord represents a transaction hash and when it was first seen.

type SubscriptionType

type SubscriptionType string

SubscriptionType represents a type of subscription to the execution client.

type TransactionCallback

type TransactionCallback func(ctx context.Context, tx string) error

TransactionCallback is a callback function for when a transaction is received.

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL