Documentation
¶
Index ¶
- Constants
- type Client
- func (c *Client) BatchCallContext(ctx context.Context, method string, params []interface{}) ([]json.RawMessage, error)
- func (c *Client) BatchGetTransactionsByHash(ctx context.Context, hashes []string) ([]json.RawMessage, error)
- func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
- func (c *Client) GetClientInfo(ctx context.Context, version *string) error
- func (c *Client) GetPendingTransactions(ctx context.Context) ([]json.RawMessage, error)
- func (c *Client) GetRPCClient() *rpc.Client
- func (c *Client) GetSender(tx *types.Transaction) (common.Address, error)
- func (c *Client) GetSigner() types.Signer
- func (c *Client) GetTxpoolContent(ctx context.Context) (json.RawMessage, error)
- func (c *Client) GetWebSocketClient() *rpc.Client
- func (c *Client) InitSigner(ctx context.Context)
- func (c *Client) Start(ctx context.Context) error
- func (c *Client) Stop(ctx context.Context) error
- func (c *Client) SubscribeToNewPendingTxs(ctx context.Context) (<-chan string, <-chan error, error)
- type ClientProvider
- type Config
- type EventCallback
- type MempoolWatcher
- type Metrics
- func (m *Metrics) AddMempoolTxExpired(count int)
- func (m *Metrics) AddMempoolTxProcessed(count int)
- func (m *Metrics) AddMempoolTxReceived(count int)
- func (m *Metrics) AddQueueRejections(count int)
- func (m *Metrics) AddQueueThroughput(count int)
- func (m *Metrics) AddTxBySource(source string, count int)
- func (m *Metrics) AddTxProcessingOutcome(outcome string, count int)
- func (m *Metrics) ObserveBatchProcessingDuration(duration float64)
- func (m *Metrics) ObserveRPCRequestDuration(method string, duration float64)
- func (m *Metrics) SetCircuitBreakerState(breakerName, state string)
- func (m *Metrics) SetMempoolTxPending(count int)
- func (m *Metrics) SetProcessedCacheSize(size int)
- func (m *Metrics) SetQueueSize(size int)
- func (m *Metrics) SetWebsocketConnected(connected bool)
- type PendingTxRecord
- type SubscriptionType
- type TransactionCallback
Constants ¶
const ( // SubNewPendingTransactions subscribes to new pending transactions. SubNewPendingTransactions SubscriptionType = "newPendingTransactions" // RPCMethodTxpoolContent is the RPC method for getting the content of the transaction pool. RPCMethodTxpoolContent = "txpool_content" // RPCMethodPendingTransactions is the RPC method for getting pending transactions. RPCMethodPendingTransactions = "eth_pendingTransactions" // RPCMethodGetTransactionByHash is the RPC method for getting a transaction by its hash. RPCMethodGetTransactionByHash = "eth_getTransactionByHash" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
ExecutionClient represents a unified execution client with both WebSocket and RPC capabilities.
func (*Client) BatchCallContext ¶
func (c *Client) BatchCallContext(ctx context.Context, method string, params []interface{}) ([]json.RawMessage, error)
BatchCallContext performs a batch JSON-RPC call for multiple transactions.
func (*Client) BatchGetTransactionsByHash ¶
func (c *Client) BatchGetTransactionsByHash(ctx context.Context, hashes []string) ([]json.RawMessage, error)
BatchGetTransactionsByHash retrieves transactions by their hashes.
func (*Client) CallContext ¶
func (c *Client) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
CallContext calls an RPC method with the given context.
func (*Client) GetClientInfo ¶
GetClientInfo gets the client version info.
func (*Client) GetPendingTransactions ¶
GetPendingTransactions retrieves pending transactions.
func (*Client) GetRPCClient ¶
GetRPCClient provides access to the RPC client directly.
func (*Client) GetTxpoolContent ¶
GetTxpoolContent retrieves the full transaction pool content.
func (*Client) GetWebSocketClient ¶
GetWebSocketClient provides access to the WebSocket client directly.
func (*Client) InitSigner ¶
InitSigner initialises the transaction signer. This is used to determine mempool tx senders.
type ClientProvider ¶
type ClientProvider interface { // GetTxpoolContent retrieves the full transaction pool content. GetTxpoolContent(ctx context.Context) (json.RawMessage, error) // GetPendingTransactions retrieves pending transactions. GetPendingTransactions(ctx context.Context) ([]json.RawMessage, error) // BatchGetTransactionsByHash retrieves transactions by their hashes. BatchGetTransactionsByHash(ctx context.Context, hashes []string) ([]json.RawMessage, error) // SubscribeToNewPendingTxs subscribes to new pending transaction notifications. SubscribeToNewPendingTxs(ctx context.Context) (<-chan string, <-chan error, error) // CallContext performs a JSON-RPC call with the given arguments. CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error }
ClientProvider defines the interface for unified execution client (WS and RPC) operations.
type Config ¶
type Config struct { // Enabled is whether the execution client is enabled. Enabled bool `yaml:"enabled" default:"false"` // EthPendingTxsEnabled is whether we suppliment fetching tx's using eth_pendingTransactions periodically. EthPendingTxsEnabled bool `yaml:"ethPendingTxsEnabled" default:"false"` // TxPoolContentEnabled is whether we suppliment fetching tx's using txpool_content periodically. TxPoolContentEnabled bool `yaml:"txPoolContentEnabled" default:"true"` // WebsocketEnabled is whether the websocket is enabled. WebsocketEnabled bool `yaml:"websocketEnabled" default:"false"` // WSAddress is the WebSocket address of the execution client for subscriptions. WSAddress string `yaml:"wsAddress"` // RPCAddress is the RPC address of the execution client for txpool_content calls. RPCAddress string `yaml:"rpcAddress"` // Headers is a map of headers to send to the execution client. Headers map[string]string `yaml:"headers"` // FetchInterval is how often to fetch txpool_content (in seconds). FetchInterval int `yaml:"fetchInterval" default:"15"` // PruneDuration is how long to keep pending transactions in memory before pruning (in seconds). PruneDuration int `yaml:"pruneDuration" default:"300"` // ProcessorWorkerCount is the number of worker goroutines for processing transactions. ProcessorWorkerCount int `yaml:"processorWorkerCount" default:"50"` // RpcBatchSize is the number of transactions to include in a single RPC batch call. RpcBatchSize int `yaml:"rpcBatchSize" default:"40"` // QueueSize is the size of the transaction processing queue. QueueSize int `yaml:"queueSize" default:"5000"` // ProcessingInterval is the interval at which to process batches of transactions (in milliseconds). ProcessingInterval int `yaml:"processingInterval" default:"500"` // MaxConcurrency is the maximum number of concurrent batch RPC requests. MaxConcurrency int `yaml:"maxConcurrency" default:"5"` // CircuitBreakerFailureThreshold is the number of consecutive failures before opening the circuit breaker. CircuitBreakerFailureThreshold int `yaml:"circuitBreakerFailureThreshold" default:"5"` // CircuitBreakerResetTimeout is the time to wait before transitioning from open to half-open (in seconds). CircuitBreakerResetTimeout int `yaml:"circuitBreakerResetTimeout" default:"30"` }
Config defines configuration for connecting to an execution client.
type EventCallback ¶
EventCallback is a generic callback function for subscription events.
type MempoolWatcher ¶
type MempoolWatcher struct {
// contains filtered or unexported fields
}
MempoolWatcher captures and processes pending transactions from the Ethereum execution client. It uses three complementary methods to ensure comprehensive transaction coverage: 1. Websocket subscription to newPendingTransactions for real-time notification 2. Periodic polling of txpool_content for full transaction details 3. Periodic polling of eth_pendingTransactions as a fallback/supplementary source
Transactions flow through the system as follows: - Detected via any of the three sources above - Added to pendingTxs map (temporary storage until processed) - Queued for processing via txQueue - Processed by worker goroutines - Marked as processed in txQueue.processed map to prevent reprocessing - Removed from pendingTxs map
The pendingTxs map functions as a temporary workspace for in-flight processing, not as a permanent mirror of the mempool state.
func NewMempoolWatcher ¶
func NewMempoolWatcher( client ClientProvider, log logrus.FieldLogger, config *Config, processTxCallback func(context.Context, *PendingTxRecord, json.RawMessage) error, metrics *Metrics, ) *MempoolWatcher
NewMempoolWatcher creates a new MempoolWatcher with configured circuit breakers for resilient RPC operations
func (*MempoolWatcher) Start ¶
func (w *MempoolWatcher) Start(ctx context.Context) error
Start initializes the watcher's context and launches all background goroutines for transaction discovery and processing.
func (*MempoolWatcher) Stop ¶
func (w *MempoolWatcher) Stop()
Stop cleanly shuts down the watcher, canceling the WebSocket subscription and waiting for all goroutines to complete.
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
func NewMetrics ¶
func (*Metrics) AddMempoolTxExpired ¶
AddMempoolTxExpired increments the counter for expired mempool transactions.
func (*Metrics) AddMempoolTxProcessed ¶
AddMempoolTxProcessed increments the counter for processed mempool transactions.
func (*Metrics) AddMempoolTxReceived ¶
AddMempoolTxReceived increments the counter for received mempool transactions.
func (*Metrics) AddQueueRejections ¶
AddQueueRejections increments the counter for queue rejections.
func (*Metrics) AddQueueThroughput ¶
AddQueueThroughput increments the counter for queue throughput.
func (*Metrics) AddTxBySource ¶
AddTxBySource increments the counter for transactions by source.
func (*Metrics) AddTxProcessingOutcome ¶
AddTxProcessingOutcome increments the counter for transaction processing outcomes.
func (*Metrics) ObserveBatchProcessingDuration ¶
ObserveBatchProcessingDuration observes the duration of batch processing.
func (*Metrics) ObserveRPCRequestDuration ¶
ObserveRPCRequestDuration observes the duration of RPC requests.
func (*Metrics) SetCircuitBreakerState ¶
SetCircuitBreakerState updates the circuit breaker state gauge
func (*Metrics) SetMempoolTxPending ¶
SetMempoolTxPending sets the gauge for pending mempool transactions.
func (*Metrics) SetProcessedCacheSize ¶
SetProcessedCacheSize sets the gauge for processed cache size.
func (*Metrics) SetQueueSize ¶
SetQueueSize sets the queue size gauge.
func (*Metrics) SetWebsocketConnected ¶
SetWebsocketConnected sets the gauge for websocket connection.
type PendingTxRecord ¶
type PendingTxRecord struct { Hash string FirstSeen time.Time Attempts int TxData json.RawMessage // Raw tx data (when available). Source string // Eg: "websocket", "txpool_content", or "eth_pendingTransactions". MarkedForPruning bool }
PendingTxRecord represents a transaction hash and when it was first seen.
type SubscriptionType ¶
type SubscriptionType string
SubscriptionType represents a type of subscription to the execution client.