alerting

package
v0.0.0-...-bd61e48 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2026 License: AGPL-3.0 Imports: 39 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MongoShutdownInProgress              = "(ShutdownInProgress) The server is in quiesce mode and will shut down"
	MongoInterruptedDueToReplStateChange = "(InterruptedDueToReplStateChange) operation was interrupted"
	MongoIncompleteReadOfMessageHeader   = "incomplete read of message header"
)

Variables

View Source
var (
	ClickHouseDecimalParsingRe = regexp.MustCompile(
		`Cannot parse type Decimal\(\d+, \d+\), expected non-empty binary data with size equal to or less than \d+, got \d+`,
	)
	ClickHouseDecimalInsertRe = regexp.MustCompile(
		`Cannot insert Avro decimal with scale \d+ and precision \d+ to ClickHouse type Decimal\(\d+, \d+\) with scale \d+ and precision \d+`,
	)
	// ID(a14c2a1c-edcd-5fcb-73be-bd04e09fccb7) not found in user directories
	ClickHouseNotFoundInUserDirsRe    = regexp.MustCompile("ID\\([a-z0-9-]+\\) not found in `?user directories`?")
	PostgresPublicationDoesNotExistRe = regexp.MustCompile(`publication ".*?" does not exist`)
	PostgresSnapshotDoesNotExistRe    = regexp.MustCompile(`snapshot ".*?" does not exist`)
	PostgresWalSegmentRemovedRe       = regexp.MustCompile(`requested WAL segment \w+ has already been removed`)
	PostgresSpillFileMissingRe        = regexp.MustCompile(`Unable to restore changes for xid \d+`)
	MySqlRdsBinlogFileNotFoundRe      = regexp.MustCompile(`File '/rdsdbdata/log/binlog/mysql-bin-changelog.\d+' not found`)
	MongoPoolClearedErrorRe           = regexp.MustCompile(`connection pool for .+ was cleared because another operation failed with`)
)
View Source
var (
	ErrorNotifyDestinationModified = ErrorClass{
		Class: "NOTIFY_DESTINATION_MODIFIED",
		// contains filtered or unexported fields
	}
	ErrorNotifyOOM = ErrorClass{
		Class: "NOTIFY_OOM",
		// contains filtered or unexported fields
	}
	ErrorNotifyMVOrView = ErrorClass{
		Class: "NOTIFY_MV_OR_VIEW",
		// contains filtered or unexported fields
	}
	ErrorNotifyConnectivity = ErrorClass{
		Class: "NOTIFY_CONNECTIVITY",
		// contains filtered or unexported fields
	}
	ErrorNotifyOOMSource = ErrorClass{
		Class: "NOTIFY_OOM_SOURCE",
		// contains filtered or unexported fields
	}
	ErrorNotifySlotInvalid = ErrorClass{
		Class: "NOTIFY_SLOT_INVALID",
		// contains filtered or unexported fields
	}
	ErrorNotifyBinlogInvalid = ErrorClass{
		Class: "NOTIFY_BINLOG_INVALID",
		// contains filtered or unexported fields
	}
	ErrorNotifyBinlogRowMetadataInvalid = ErrorClass{
		Class: "NOTIFY_BINLOG_ROW_METADATA_INVALID",
		// contains filtered or unexported fields
	}
	ErrorNotifyBadGTIDSetup = ErrorClass{
		Class: "NOTIFY_BAD_MULTISOURCE_GTID_SETUP",
		// contains filtered or unexported fields
	}
	ErrorNotifySourceTableMissing = ErrorClass{
		Class: "NOTIFY_SOURCE_TABLE_MISSING",
		// contains filtered or unexported fields
	}
	ErrorNotifyBadSourceTableReplicaIdentity = ErrorClass{
		Class: "NOTIFY_BAD_POSTGRES_TABLE_REPLICA_IDENTITY",
		// contains filtered or unexported fields
	}
	ErrorNotifyPublicationMissing = ErrorClass{
		Class: "NOTIFY_PUBLICATION_MISSING",
		// contains filtered or unexported fields
	}
	ErrorNotifyTablesNotInPublication = ErrorClass{
		Class: "NOTIFY_TABLES_NOT_IN_PUBLICATION",
		// contains filtered or unexported fields
	}
	ErrorNotifyReplicationSlotMissing = ErrorClass{
		Class: "NOTIFY_REPLICATION_SLOT_MISSING",
		// contains filtered or unexported fields
	}
	ErrorNotifyIncreaseLogicalDecodingWorkMem = ErrorClass{
		Class: "NOTIFY_INCREASE_LOGICAL_DECODING_WORK_MEM",
		// contains filtered or unexported fields
	}
	ErrorUnsupportedDatatype = ErrorClass{
		Class: "NOTIFY_UNSUPPORTED_DATATYPE",
		// contains filtered or unexported fields
	}
	ErrorNotifyInvalidSortKey = ErrorClass{
		Class: "NOTIFY_INVALID_SORT_KEY",
		// contains filtered or unexported fields
	}
	ErrorNotifyInvalidSnapshotIdentifier = ErrorClass{
		Class: "NOTIFY_INVALID_SNAPSHOT_IDENTIFIER",
		// contains filtered or unexported fields
	}
	ErrorNotifyInvalidSynchronizedStandbySlots = ErrorClass{
		Class: "NOTIFY_INVALID_SYNCHRONIZED_STANDBY_SLOTS",
		// contains filtered or unexported fields
	}
	ErrorNotifyTerminate = ErrorClass{
		Class: "NOTIFY_TERMINATE",
		// contains filtered or unexported fields
	}
	ErrorNotifyReplicationStandbySetup = ErrorClass{
		Class: "NOTIFY_REPLICATION_STANDBY_SETUP",
		// contains filtered or unexported fields
	}
	ErrorInternal = ErrorClass{
		Class: "INTERNAL",
		// contains filtered or unexported fields
	}
	ErrorDropFlow = ErrorClass{
		Class: "DROP_FLOW",
		// contains filtered or unexported fields
	}
	ErrorIgnoreEOF = ErrorClass{
		Class: "IGNORE_EOF",
		// contains filtered or unexported fields
	}
	ErrorIgnoreConnTemporary = ErrorClass{
		Class: "IGNORE_CONN_TEMPORARY",
		// contains filtered or unexported fields
	}
	ErrorIgnoreContextCancelled = ErrorClass{
		Class: "IGNORE_CONTEXT_CANCELLED",
		// contains filtered or unexported fields
	}
	ErrorRetryRecoverable = ErrorClass{

		Class: "ERROR_RETRY_RECOVERABLE",
		// contains filtered or unexported fields
	}
	ErrorInternalClickHouse = ErrorClass{
		Class: "INTERNAL_CLICKHOUSE",
		// contains filtered or unexported fields
	}
	ErrorLossyConversion = ErrorClass{
		Class: "WARNING_LOSSY_CONVERSION",
		// contains filtered or unexported fields
	}
	ErrorUnsupportedSchemaChange = ErrorClass{
		Class: "NOTIFY_UNSUPPORTED_SCHEMA_CHANGE",
		// contains filtered or unexported fields
	}
	// Postgres 16.9/17.5 etc. introduced a bug where certain workloads can cause logical replication to
	// request a memory allocation of >1GB, which is not allowed by Postgres. Fixed already, but we need to handle this error
	// https://github.com/postgres/postgres/commit/d87d07b7ad3b782cb74566cd771ecdb2823adf6a
	ErrorNotifyPostgresSlotMemalloc = ErrorClass{
		Class: "NOTIFY_POSTGRES_SLOT_MEMALLOC",
		// contains filtered or unexported fields
	}
	// This RDS specific error is seen when we try to create a replication slot on a read-replica
	ErrNotifyPostgresCreatingSlotOnReader = ErrorClass{
		Class: "NOTIFY_POSTGRES_CREATING_SLOT_ON_READER",
		// contains filtered or unexported fields
	}
	// Mongo specific, equivalent to slot invalidation in Postgres
	ErrorNotifyChangeStreamHistoryLost = ErrorClass{
		Class: "NOTIFY_CHANGE_STREAM_HISTORY_LOST",
		// contains filtered or unexported fields
	}
	ErrorNotifyPostgresLogicalMessageProcessing = ErrorClass{
		Class: "NOTIFY_POSTGRES_LOGICAL_MESSAGE_PROCESSING_ERROR",
		// contains filtered or unexported fields
	}
	ErrorNotifyClickHouseSupportIsDisabledError = ErrorClass{
		Class: "NOTIFY_CLICKHOUSE_SUPPORT_IS_DISABLED_ERROR",
		// contains filtered or unexported fields
	}
	// Catch-all for unclassified errors
	ErrorOther = ErrorClass{

		Class: "OTHER",
		// contains filtered or unexported fields
	}
)

Functions

func GetErrorClass

func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo)

func GetTags

func GetTags(ctx context.Context, catalogPool shared.CatalogPool, flowName string) (map[string]string, error)

Types

type AdditionalErrorAttributeKey

type AdditionalErrorAttributeKey string
const (
	ErrorAttributeKeyTable  AdditionalErrorAttributeKey = "errorAdditionalAttributeTable"
	ErrorAttributeKeyColumn AdditionalErrorAttributeKey = "errorAdditionalAttributeColumn"
)

func (AdditionalErrorAttributeKey) String

type AlertKeys

type AlertKeys struct {
	FlowName string
	PeerName string
	SlotName string
}

type AlertSender

type AlertSender interface {
	// contains filtered or unexported methods
}

type AlertSenderConfig

type AlertSenderConfig struct {
	Sender          AlertSender
	AlertForMirrors []string
	Id              int64
}

type Alerter

type Alerter struct {
	shared.CatalogPool
	// contains filtered or unexported fields
}

alerting service, no cool name :(

func NewAlerter

func NewAlerter(ctx context.Context, catalogPool shared.CatalogPool, otelManager *otel_metrics.OtelManager) *Alerter

doesn't take care of closing pool, needs to be done externally.

func (*Alerter) AlertIfOpenConnections

func (a *Alerter) AlertIfOpenConnections(ctx context.Context, alertKeys *AlertKeys,
	openConnections *protos.GetOpenConnectionsForUserResult,
)

func (*Alerter) AlertIfSlotLag

func (a *Alerter) AlertIfSlotLag(ctx context.Context, alertKeys *AlertKeys, slotInfo *protos.SlotInfo)

func (*Alerter) AlertIfTooLongSinceLastNormalize

func (a *Alerter) AlertIfTooLongSinceLastNormalize(ctx context.Context, alertKeys *AlertKeys, intervalSinceLastNormalize time.Duration)

func (*Alerter) LogFlowError

func (a *Alerter) LogFlowError(ctx context.Context, flowName string, inErr error) error

func (*Alerter) LogFlowEvent

func (a *Alerter) LogFlowEvent(ctx context.Context, flowName string, info string)

func (*Alerter) LogFlowInfo

func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string)

func (*Alerter) LogFlowWarning

func (a *Alerter) LogFlowWarning(ctx context.Context, flowName string, inErr error)

func (*Alerter) LogFlowWrappedError

func (a *Alerter) LogFlowWrappedError(ctx context.Context, flowName string, s string, inErr error) error

func (*Alerter) LogNonFlowCritical

func (a *Alerter) LogNonFlowCritical(ctx context.Context, eventType telemetry.EventType, key string, message string)

func (*Alerter) LogNonFlowError

func (a *Alerter) LogNonFlowError(ctx context.Context, eventType telemetry.EventType, key string, message string)

func (*Alerter) LogNonFlowEvent

func (a *Alerter) LogNonFlowEvent(ctx context.Context, eventType telemetry.EventType, key string, message string, level telemetry.Level)

func (*Alerter) LogNonFlowInfo

func (a *Alerter) LogNonFlowInfo(ctx context.Context, eventType telemetry.EventType, key string, message string)

Wrapper for different telemetry levels for non-flow events

func (*Alerter) LogNonFlowWarning

func (a *Alerter) LogNonFlowWarning(ctx context.Context, eventType telemetry.EventType, key string, message string)

type EmailAlertSender

type EmailAlertSender struct {
	AlertSender
	// contains filtered or unexported fields
}

func NewEmailAlertSender

func NewEmailAlertSender(client *ses.Client, config *EmailAlertSenderConfig) *EmailAlertSender

func NewEmailAlertSenderWithNewClient

func NewEmailAlertSenderWithNewClient(ctx context.Context, region *string, config *EmailAlertSenderConfig) (*EmailAlertSender, error)

type EmailAlertSenderConfig

type EmailAlertSenderConfig struct {
	EmailAddresses                []string `json:"email_addresses"`
	SlotLagMBAlertThreshold       uint32   `json:"slot_lag_mb_alert_threshold"`
	OpenConnectionsAlertThreshold uint32   `json:"open_connections_alert_threshold"`
	// contains filtered or unexported fields
}

type ErrorAction

type ErrorAction string
const (
	NotifyUser      ErrorAction = "notify_user"
	Ignore          ErrorAction = "ignore"
	NotifyTelemetry ErrorAction = "notify_telemetry"
)

func (ErrorAction) String

func (e ErrorAction) String() string

type ErrorClass

type ErrorClass struct {
	Class string
	// contains filtered or unexported fields
}

func (ErrorClass) ErrorAction

func (e ErrorClass) ErrorAction() ErrorAction

func (ErrorClass) String

func (e ErrorClass) String() string

type ErrorInfo

type ErrorInfo struct {
	AdditionalAttributes map[AdditionalErrorAttributeKey]string
	Source               ErrorSource
	Code                 string
}

type ErrorSource

type ErrorSource string
const (
	ErrorSourceClickHouse      ErrorSource = "clickhouse"
	ErrorSourcePostgres        ErrorSource = "postgres"
	ErrorSourceMySQL           ErrorSource = "mysql"
	ErrorSourceMongoDB         ErrorSource = "mongodb"
	ErrorSourcePostgresCatalog ErrorSource = "postgres_catalog"
	ErrorSourceSSH             ErrorSource = "ssh_tunnel"
	ErrorSourceNet             ErrorSource = "net"
	ErrorSourceTemporal        ErrorSource = "temporal"
	ErrorSourceOther           ErrorSource = "other"
)

func (ErrorSource) String

func (e ErrorSource) String() string

type ServiceType

type ServiceType string
const (
	SLACK ServiceType = "slack"
	EMAIL ServiceType = "email"
)

type SlackAlertSender

type SlackAlertSender struct {
	AlertSender
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL