db

package
v0.0.0-...-65580d4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2025 License: Apache-2.0 Imports: 48 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TableNameVisits                      = "visits"
	TableNameNeighbors                   = "neighbors"
	TableNameCrawls                      = "crawls"
	TableNameDiscoveryIDPrefixesXPeerIDs = "discovery_id_prefixes_x_peer_ids"
)

Variables

View Source
var (
	ErrEmptyAgentVersion = fmt.Errorf("empty agent version")
	ErrEmptyProtocol     = fmt.Errorf("empty protocol")
	ErrEmptyProtocolsSet = fmt.Errorf("empty protocols set")
)
View Source
var ErrNoPublicIP = fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg
View Source
var ErrorStr = map[string]string{}
View Source
var KnownErrors = map[string]string{
	"i/o timeout":                                         pgmodels.NetErrorIoTimeout,
	"RPC timeout":                                         pgmodels.NetErrorIoTimeout,
	"no recent network activity":                          pgmodels.NetErrorIoTimeout,
	"handshake did not complete in time":                  pgmodels.NetErrorIoTimeout,
	"connection refused":                                  pgmodels.NetErrorConnectionRefused,
	"CONNECTION_REFUSED":                                  pgmodels.NetErrorConnectionRefused,
	"connection reset by peer":                            pgmodels.NetErrorConnectionResetByPeer,
	"protocol not supported":                              pgmodels.NetErrorProtocolNotSupported,
	"protocols not supported":                             pgmodels.NetErrorProtocolNotSupported,
	"peer id mismatch":                                    pgmodels.NetErrorPeerIDMismatch,
	"peer IDs don't match":                                pgmodels.NetErrorPeerIDMismatch,
	"no route to host":                                    pgmodels.NetErrorNoRouteToHost,
	"network is unreachable":                              pgmodels.NetErrorNetworkUnreachable,
	"no good addresses":                                   pgmodels.NetErrorNoGoodAddresses,
	"context deadline exceeded":                           pgmodels.NetErrorIoTimeout,
	"no public IP address":                                pgmodels.NetErrorNoIPAddress,
	"max dial attempts exceeded":                          pgmodels.NetErrorMaxDialAttemptsExceeded,
	"host is down":                                        pgmodels.NetErrorHostIsDown,
	"stream reset":                                        pgmodels.NetErrorStreamReset,
	"stream closed":                                       pgmodels.NetErrorStreamReset,
	"failed to negotiate security protocol: EOF":          pgmodels.NetErrorNegotiateSecurityProtocol,
	"failed to negotiate security protocol":               pgmodels.NetErrorNegotiateSecurityProtocol,
	"failed to negotiate stream multiplexer":              pgmodels.NetErrorNegotiateStreamMultiplexer,
	"resource limit exceeded":                             pgmodels.NetErrorResourceLimitExceeded,
	"Write on stream":                                     pgmodels.NetErrorWriteOnStream,
	"can't assign requested address":                      pgmodels.NetErrorCantAssignRequestedAddress,
	"cannot assign requested address":                     pgmodels.NetErrorCantAssignRequestedAddress,
	"connection gated":                                    pgmodels.NetErrorConnectionGated,
	"RESOURCE_LIMIT_EXCEEDED (201)":                       pgmodels.NetErrorCantConnectOverRelay,
	"opening relay circuit: CONNECTION_FAILED (203)":      pgmodels.NetErrorCantConnectOverRelay,
	"NO_RESERVATION (204)":                                pgmodels.NetErrorCantConnectOverRelay,
	"relay failed with a protocol error":                  pgmodels.NetErrorCantConnectOverRelay,
	"can't dial a p2p-circuit without specifying a relay": pgmodels.NetErrorNoIPAddress,
	"no transport for protocol":                           pgmodels.NetErrorNoTransportForProtocol,

	"no good ip address":                 pgmodels.NetErrorNoIPAddress,
	"disconnect requested":               pgmodels.NetErrorDevp2pDisconnectRequested,
	"network error":                      pgmodels.NetErrorDevp2pNetworkError,
	"breach of protocol":                 pgmodels.NetErrorDevp2pBreachOfProtocol,
	"useless peer":                       pgmodels.NetErrorDevp2pUselessPeer,
	"too many peers":                     pgmodels.NetErrorDevp2pTooManyPeers,
	"already connected":                  pgmodels.NetErrorDevp2pAlreadyConnected,
	"incompatible p2p protocol version":  pgmodels.NetErrorDevp2pIncompatibleP2PProtocolVersion,
	"invalid node identity":              pgmodels.NetErrorDevp2pInvalidNodeIdentity,
	"client quitting":                    pgmodels.NetErrorDevp2pClientQuitting,
	"unexpected identity":                pgmodels.NetErrorDevp2pUnexpectedIdentity,
	"connected to self":                  pgmodels.NetErrorDevp2pConnectedToSelf,
	"read timeout":                       pgmodels.NetErrorDevp2pReadTimeout,
	"subprotocol error":                  pgmodels.NetErrorDevp2pSubprotocolError,
	"could not negotiate eth protocol":   pgmodels.NetErrorDevp2pEthprotocolError,
	"handshake failed: EOF":              pgmodels.NetErrorDevp2pHandshakeEOF,
	"malformed disconnect message":       pgmodels.NetErrorDevp2pMalformedDisconnectMessage,
	"dial refused because of black hole": pgmodels.NetErrorBlackHoleRefused,
}

KnownErrors contains a list of known errors. Property key + string to match for

Functions

func MaddrErrors

func MaddrErrors(maddrs []ma.Multiaddr, err error) []string

func NetError

func NetError(err error) string

NetError extracts the appropriate error type from the given error.

func Rollback

func Rollback(txn *sql.Tx)

Rollback calls rollback on the given transaction and logs the potential error.

Types

type ClickHouseClient

type ClickHouseClient struct {
	// contains filtered or unexported fields
}

ClickHouseClient is a client for interacting with a ClickHouse database. It implements the database Client interface.

func NewClickHouseClient

func NewClickHouseClient(ctx context.Context, cfg *ClickHouseClientConfig) (*ClickHouseClient, error)

NewClickHouseClient initializes and returns a new ClickHouseClient instance. It establishes a connection to the ClickHouse database and applies migrations if enabled in the provided configuration. The function starts a background flusher to manage batched writes and returns an error if any step fails. Always call [Close] when the client isn't needed anymore to clean up resources.

func (*ClickHouseClient) Close

func (c *ClickHouseClient) Close() error

Close releases resources associated with the clickhouse client. Make sure that you don't call any other method anymore before calling close.

func (*ClickHouseClient) Flush

func (c *ClickHouseClient) Flush(ctx context.Context) error

func (*ClickHouseClient) InitCrawl

func (c *ClickHouseClient) InitCrawl(ctx context.Context, version string) error

func (*ClickHouseClient) InsertCrawlProperties

func (c *ClickHouseClient) InsertCrawlProperties(ctx context.Context, properties map[string]map[string]int) error

func (*ClickHouseClient) InsertNeighbors

func (c *ClickHouseClient) InsertNeighbors(ctx context.Context, peerID peer.ID, neighbors []peer.ID, errorBits uint16) error

func (*ClickHouseClient) InsertVisit

func (c *ClickHouseClient) InsertVisit(ctx context.Context, args *VisitArgs) error

func (*ClickHouseClient) QueryBootstrapPeers

func (c *ClickHouseClient) QueryBootstrapPeers(ctx context.Context, limit int) ([]peer.AddrInfo, error)

func (*ClickHouseClient) SealCrawl

func (c *ClickHouseClient) SealCrawl(ctx context.Context, args *SealCrawlArgs) (err error)

func (*ClickHouseClient) SelectPeersToProbe

func (c *ClickHouseClient) SelectPeersToProbe(ctx context.Context) ([]peer.AddrInfo, error)

type ClickHouseClientConfig

type ClickHouseClientConfig struct {
	DatabaseHost           string
	DatabasePort           int
	DatabaseName           string
	DatabaseUser           string
	DatabasePassword       string
	DatabaseSSL            bool
	ClusterName            string
	MigrationsTableEngine  string
	ReplicatedTableEngines bool
	ApplyMigrations        bool
	BatchSize              int
	BatchTimeout           time.Duration
	NetworkID              string
	PersistNeighbors       bool
	MeterProvider          metric.MeterProvider
	TracerProvider         trace.TracerProvider
}

ClickHouseClientConfig holds configuration for ClickHouse client connection. Enables setting up database connection details, migrations, batching, and tracing.

func (*ClickHouseClientConfig) Options

func (cfg *ClickHouseClientConfig) Options() *clickhouse.Options

Options returns a ClickHouse client options configuration. It includes address, authentication, and optional TLS settings. The address is built from the host and port in the configuration.

type ClickHouseCrawl

type ClickHouseCrawl struct {
	ID              uuid.UUID  `ch:"id"`
	State           string     `ch:"state"`
	FinishedAt      *time.Time `ch:"finished_at"`
	UpdatedAt       time.Time  `ch:"updated_at"`
	CreatedAt       time.Time  `ch:"created_at"`
	CrawledPeers    *int32     `ch:"crawled_peers"`
	DialablePeers   *int32     `ch:"dialable_peers"`
	UndialablePeers *int32     `ch:"undialable_peers"`
	RemainingPeers  *int32     `ch:"remaining_peers"`
	Version         string     `ch:"version"`
	NetworkID       string     `ch:"network_id"`
}

type ClickHouseVisit

type ClickHouseVisit struct {
	CrawlID        *uuid.UUID      `ch:"crawl_id"`
	PeerID         string          `ch:"peer_id"`
	AgentVersion   *string         `ch:"agent_version"`
	Protocols      []string        `ch:"protocols"`
	DialMaddrs     []string        `ch:"dial_maddrs"`
	FilteredMaddrs []string        `ch:"filtered_maddrs"`
	ExtraMaddrs    []string        `ch:"extra_maddrs"`
	ListenMaddrs   []string        `ch:"listen_maddrs"`
	DialErrors     []string        `ch:"dial_errors"`
	ConnectMaddr   *string         `ch:"connect_maddr"`
	CrawlError     *string         `ch:"crawl_error"`
	VisitStartedAt time.Time       `ch:"visit_started_at"`
	VisitEndedAt   time.Time       `ch:"visit_ended_at"`
	Properties     json.RawMessage `ch:"peer_properties"`
	// contains filtered or unexported fields
}

type ClickhouseDiscoveryIDPrefix

type ClickhouseDiscoveryIDPrefix struct {
	Prefix uint64 `ch:"discovery_id_prefix"`
	PeerID string `ch:"peer_id"`
}

type ClickhouseNeighbor

type ClickhouseNeighbor struct {
	CrawlID        uuid.UUID `ch:"crawl_id"`
	CrawlCreatedAt time.Time `ch:"crawl_created_at"`
	PeerID         uint64    `ch:"peer_discovery_id_prefix"`
	Neighbor       uint64    `ch:"neighbor_discovery_id_prefix"`
	ErrorBits      uint16    `ch:"error_bits"`
	// contains filtered or unexported fields
}

type Client

type Client interface {
	io.Closer
	// InitCrawl initializes a new crawl instance in the database.
	// The clients are responsible for tracking the crawl's ID and associate
	// later database queries with it. This is necessary because different
	// database engines have different types of IDs. ClickHouse commonly uses string
	// IDs and Postgres uses integers. Making the [Client] interface generic
	// on that ID would complicate the code a lot, so we require Clients to
	// keep state. This is added complexity traded for code clarity. It's a trade-
	// off and IMO this is less bad.
	InitCrawl(ctx context.Context, version string) error

	// SealCrawl marks the crawl (that the Client tracks internally) as done.
	SealCrawl(ctx context.Context, args *SealCrawlArgs) error

	// QueryBootstrapPeers fetches peers from the database that can be used
	// for bootstrapping into the network. The result will contain from zero up
	// to limit entries.
	QueryBootstrapPeers(ctx context.Context, limit int) ([]peer.AddrInfo, error)

	// InsertVisit TODO
	InsertVisit(ctx context.Context, args *VisitArgs) error

	// InsertCrawlProperties TODO
	InsertCrawlProperties(ctx context.Context, properties map[string]map[string]int) error

	// SelectPeersToProbe TODO
	SelectPeersToProbe(ctx context.Context) ([]peer.AddrInfo, error)

	// Flush instructs the client to write all cached data to the database.
	// Client implementations may cache and batch inserts. Flush tells the
	// client to insert everything that's pending.
	Flush(ctx context.Context) error
}

type CrawlState

type CrawlState string
const (
	CrawlStateStarted   CrawlState = "started"
	CrawlStateSucceeded CrawlState = "succeeded"
	CrawlStateCancelled CrawlState = "cancelled"
	CrawlStateFailed    CrawlState = "failed"
)

type JSONClient

type JSONClient struct {
	// contains filtered or unexported fields
}

func NewJSONClient

func NewJSONClient(out string) (*JSONClient, error)

func (*JSONClient) Close

func (c *JSONClient) Close() error

func (*JSONClient) Flush

func (c *JSONClient) Flush(ctx context.Context) error

func (*JSONClient) InitCrawl

func (c *JSONClient) InitCrawl(ctx context.Context, version string) (err error)

func (*JSONClient) InsertCrawlProperties

func (c *JSONClient) InsertCrawlProperties(ctx context.Context, properties map[string]map[string]int) error

func (*JSONClient) InsertNeighbors

func (c *JSONClient) InsertNeighbors(ctx context.Context, peerID peer.ID, neighbors []peer.ID, errorBits uint16) error

func (*JSONClient) InsertVisit

func (c *JSONClient) InsertVisit(ctx context.Context, args *VisitArgs) error

func (*JSONClient) QueryBootstrapPeers

func (c *JSONClient) QueryBootstrapPeers(ctx context.Context, limit int) ([]peer.AddrInfo, error)

func (*JSONClient) SealCrawl

func (c *JSONClient) SealCrawl(ctx context.Context, args *SealCrawlArgs) (err error)

func (*JSONClient) SelectPeersToProbe

func (c *JSONClient) SelectPeersToProbe(ctx context.Context) ([]peer.AddrInfo, error)

type JSONNeighbors

type JSONNeighbors struct {
	PeerID      peer.ID
	NeighborIDs []peer.ID
	ErrorBits   string
}

type JSONVisit

type JSONVisit struct {
	PeerID          peer.ID
	Maddrs          []ma.Multiaddr
	FilteredMaddrs  []ma.Multiaddr
	ListenMaddrs    []ma.Multiaddr
	ConnectMaddr    ma.Multiaddr
	Protocols       []string
	AgentVersion    string
	ConnectDuration string
	CrawlDuration   string
	VisitStartedAt  time.Time
	VisitEndedAt    time.Time
	ConnectErrorStr string
	CrawlErrorStr   string
	Properties      null.JSON
}

type NoopClient

type NoopClient struct{}

func NewNoopClient

func NewNoopClient() *NoopClient

func (*NoopClient) Close

func (n *NoopClient) Close() error

func (*NoopClient) Flush

func (n *NoopClient) Flush(ctx context.Context) error

func (*NoopClient) InitCrawl

func (n *NoopClient) InitCrawl(ctx context.Context, version string) error

func (*NoopClient) InsertCrawlProperties

func (n *NoopClient) InsertCrawlProperties(ctx context.Context, properties map[string]map[string]int) error

func (*NoopClient) InsertNeighbors

func (n *NoopClient) InsertNeighbors(ctx context.Context, peerID peer.ID, neighbors []peer.ID, errorBits uint16) error

func (*NoopClient) InsertVisit

func (n *NoopClient) InsertVisit(ctx context.Context, args *VisitArgs) error

func (*NoopClient) QueryBootstrapPeers

func (n *NoopClient) QueryBootstrapPeers(ctx context.Context, limit int) ([]peer.AddrInfo, error)

func (*NoopClient) SealCrawl

func (n *NoopClient) SealCrawl(ctx context.Context, args *SealCrawlArgs) error

func (*NoopClient) SelectPeersToProbe

func (n *NoopClient) SelectPeersToProbe(ctx context.Context) ([]peer.AddrInfo, error)

type PostgresClient

type PostgresClient struct {
	// contains filtered or unexported fields
}

func NewPostgresClient

func NewPostgresClient(ctx context.Context, cfg *PostgresClientConfig) (*PostgresClient, error)

NewPostgresClient establishes a database connection with the provided configuration

func (*PostgresClient) Close

func (c *PostgresClient) Close() error

func (*PostgresClient) FetchUnresolvedMultiAddresses

func (c *PostgresClient) FetchUnresolvedMultiAddresses(ctx context.Context, limit int) (pgmodels.MultiAddressSlice, error)

FetchUnresolvedMultiAddresses fetches all multi addresses that were not resolved yet.

func (*PostgresClient) Flush

func (c *PostgresClient) Flush(ctx context.Context) error

Flush .

func (*PostgresClient) GetOrCreateAgentVersionID

func (c *PostgresClient) GetOrCreateAgentVersionID(ctx context.Context, exec boil.ContextExecutor, agentVersion string) (*int, error)

func (*PostgresClient) GetOrCreateProtocol

func (c *PostgresClient) GetOrCreateProtocol(ctx context.Context, exec boil.ContextExecutor, protocol string) (*int, error)

func (*PostgresClient) GetOrCreateProtocolsSetID

func (c *PostgresClient) GetOrCreateProtocolsSetID(ctx context.Context, exec boil.ContextExecutor, protocols []string) (*int, error)

func (*PostgresClient) Handle

func (c *PostgresClient) Handle() *sql.DB

func (*PostgresClient) InitCrawl

func (c *PostgresClient) InitCrawl(ctx context.Context, version string) (err error)

InitCrawl inserts a crawl instance into the database in the state `started`. This is done to receive a database ID that all subsequent database entities can be linked to.

func (*PostgresClient) InsertCrawlProperties

func (c *PostgresClient) InsertCrawlProperties(ctx context.Context, properties map[string]map[string]int) error

func (*PostgresClient) InsertNeighbors

func (c *PostgresClient) InsertNeighbors(ctx context.Context, peerID peer.ID, neighbors []peer.ID, errorBits uint16) error

func (*PostgresClient) InsertVisit

func (c *PostgresClient) InsertVisit(ctx context.Context, args *VisitArgs) error

func (*PostgresClient) QueryBootstrapPeers

func (c *PostgresClient) QueryBootstrapPeers(ctx context.Context, limit int) ([]peer.AddrInfo, error)

func (*PostgresClient) SealCrawl

func (c *PostgresClient) SealCrawl(ctx context.Context, args *SealCrawlArgs) (err error)

func (*PostgresClient) SelectPeersToProbe

func (c *PostgresClient) SelectPeersToProbe(ctx context.Context) ([]peer.AddrInfo, error)

SelectPeersToProbe fetches all open sessions from the database that are due to be dialed/probed.

func (*PostgresClient) UpsertPeer

func (c *PostgresClient) UpsertPeer(mh string, agentVersionID null.Int, protocolSetID null.Int, properties null.JSON) (int, error)

type PostgresClientConfig

type PostgresClientConfig struct {
	// Determines the host address of the database.
	DatabaseHost string

	// Determines the port of the database.
	DatabasePort int

	// Determines the name of the database that should be used.
	DatabaseName string

	// Determines the password with which we access the database.
	DatabasePassword string

	// Determines the username with which we access the database.
	DatabaseUser string

	// The database SSL configuration. For Postgres SSL mode should be
	// one of the supported values here: https://www.postgresql.org/docs/current/libpq-ssl.html)
	DatabaseSSL string

	// Whether to apply migrations on startup
	ApplyMigrations bool

	// The cache size to hold agent versions in memory to skip database queries.
	AgentVersionsCacheSize int

	// The cache size to hold protocols in memory to skip database queries.
	ProtocolsCacheSize int

	// The cache size to hold sets of protocols in memory to skip database queries.
	ProtocolsSetCacheSize int

	// Set the maximum idle connections for the database handler.
	MaxIdleConns int

	// Whether to persist the routing tables to disk
	PersistNeighbors bool

	// MeterProvider is the meter provider to use when initialising metric instruments.
	MeterProvider metric.MeterProvider

	// TracerProvider is the tracer provider to use when initialising tracing
	TracerProvider trace.TracerProvider
}

func (*PostgresClientConfig) DatabaseSourceName

func (c *PostgresClientConfig) DatabaseSourceName() string

DatabaseSourceName returns the data source name string to be put into the sql.Open method.

type SealCrawlArgs

type SealCrawlArgs struct {
	Crawled    int
	Dialable   int
	Undialable int
	Remaining  int
	State      CrawlState
}

type VisitArgs

type VisitArgs struct {
	PeerID           peer.ID
	DiscoveryPrefix  uint64
	AgentVersion     string
	Protocols        []string
	DialMaddrs       []ma.Multiaddr
	FilteredMaddrs   []ma.Multiaddr
	ExtraMaddrs      []ma.Multiaddr
	ListenMaddrs     []ma.Multiaddr
	DialErrors       []string
	ConnectMaddr     ma.Multiaddr
	DialDuration     time.Duration
	ConnectDuration  time.Duration
	CrawlDuration    time.Duration
	VisitStartedAt   time.Time
	VisitEndedAt     time.Time
	ConnectErrorStr  string
	CrawlErrorStr    string
	VisitType        VisitType
	Neighbors        []peer.ID
	NeighborPrefixes []uint64
	ErrorBits        uint16
	Properties       json.RawMessage
}

type VisitType

type VisitType string
const (
	VisitTypeDial  VisitType = "dial"
	VisitTypeCrawl VisitType = "crawl"
)

func (VisitType) String

func (v VisitType) String() string

Directories

Path Synopsis
models
pg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL