Documentation
¶
Index ¶
- Constants
- Variables
- func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo)
- func GetTags(ctx context.Context, catalogPool shared.CatalogPool, flowName string) (map[string]string, error)
- type AdditionalErrorAttributeKey
- type AlertKeys
- type AlertSender
- type AlertSenderConfig
- type Alerter
- func (a *Alerter) AlertIfOpenConnections(ctx context.Context, alertKeys *AlertKeys, ...)
- func (a *Alerter) AlertIfSlotLag(ctx context.Context, alertKeys *AlertKeys, slotInfo *protos.SlotInfo)
- func (a *Alerter) AlertIfTooLongSinceLastNormalize(ctx context.Context, alertKeys *AlertKeys, ...)
- func (a *Alerter) LogFlowError(ctx context.Context, flowName string, inErr error) error
- func (a *Alerter) LogFlowEvent(ctx context.Context, flowName string, info string)
- func (a *Alerter) LogFlowInfo(ctx context.Context, flowName string, info string)
- func (a *Alerter) LogFlowWarning(ctx context.Context, flowName string, inErr error)
- func (a *Alerter) LogFlowWrappedError(ctx context.Context, flowName string, s string, inErr error) error
- func (a *Alerter) LogNonFlowCritical(ctx context.Context, eventType telemetry.EventType, key string, message string)
- func (a *Alerter) LogNonFlowError(ctx context.Context, eventType telemetry.EventType, key string, message string)
- func (a *Alerter) LogNonFlowEvent(ctx context.Context, eventType telemetry.EventType, key string, message string, ...)
- func (a *Alerter) LogNonFlowInfo(ctx context.Context, eventType telemetry.EventType, key string, message string)
- func (a *Alerter) LogNonFlowWarning(ctx context.Context, eventType telemetry.EventType, key string, message string)
- type EmailAlertSender
- type EmailAlertSenderConfig
- type ErrorAction
- type ErrorClass
- type ErrorInfo
- type ErrorSource
- type ServiceType
- type SlackAlertSender
Constants ¶
View Source
const ( MongoShutdownInProgress = "(ShutdownInProgress) The server is in quiesce mode and will shut down" MongoInterruptedDueToReplStateChange = "(InterruptedDueToReplStateChange) operation was interrupted" MongoIncompleteReadOfMessageHeader = "incomplete read of message header" )
Variables ¶
View Source
var ( ClickHouseDecimalParsingRe = regexp.MustCompile( `Cannot parse type Decimal\(\d+, \d+\), expected non-empty binary data with size equal to or less than \d+, got \d+`, ) ClickHouseDecimalInsertRe = regexp.MustCompile( `Cannot insert Avro decimal with scale \d+ and precision \d+ to ClickHouse type Decimal\(\d+, \d+\) with scale \d+ and precision \d+`, ) // ID(a14c2a1c-edcd-5fcb-73be-bd04e09fccb7) not found in user directories ClickHouseNotFoundInUserDirsRe = regexp.MustCompile("ID\\([a-z0-9-]+\\) not found in `?user directories`?") PostgresPublicationDoesNotExistRe = regexp.MustCompile(`publication ".*?" does not exist`) PostgresSnapshotDoesNotExistRe = regexp.MustCompile(`snapshot ".*?" does not exist`) PostgresWalSegmentRemovedRe = regexp.MustCompile(`requested WAL segment \w+ has already been removed`) PostgresSpillFileMissingRe = regexp.MustCompile(`Unable to restore changes for xid \d+`) MySqlRdsBinlogFileNotFoundRe = regexp.MustCompile(`File '/rdsdbdata/log/binlog/mysql-bin-changelog.\d+' not found`) MongoPoolClearedErrorRe = regexp.MustCompile(`connection pool for .+ was cleared because another operation failed with`) )
View Source
var ( ErrorNotifyDestinationModified = ErrorClass{ Class: "NOTIFY_DESTINATION_MODIFIED", // contains filtered or unexported fields } ErrorNotifyOOM = ErrorClass{ Class: "NOTIFY_OOM", // contains filtered or unexported fields } ErrorNotifyMVOrView = ErrorClass{ Class: "NOTIFY_MV_OR_VIEW", // contains filtered or unexported fields } ErrorNotifyConnectivity = ErrorClass{ Class: "NOTIFY_CONNECTIVITY", // contains filtered or unexported fields } ErrorNotifyOOMSource = ErrorClass{ Class: "NOTIFY_OOM_SOURCE", // contains filtered or unexported fields } ErrorNotifySlotInvalid = ErrorClass{ Class: "NOTIFY_SLOT_INVALID", // contains filtered or unexported fields } ErrorNotifyBinlogInvalid = ErrorClass{ Class: "NOTIFY_BINLOG_INVALID", // contains filtered or unexported fields } ErrorNotifyBinlogRowMetadataInvalid = ErrorClass{ Class: "NOTIFY_BINLOG_ROW_METADATA_INVALID", // contains filtered or unexported fields } ErrorNotifyBadGTIDSetup = ErrorClass{ Class: "NOTIFY_BAD_MULTISOURCE_GTID_SETUP", // contains filtered or unexported fields } ErrorNotifySourceTableMissing = ErrorClass{ Class: "NOTIFY_SOURCE_TABLE_MISSING", // contains filtered or unexported fields } ErrorNotifyBadSourceTableReplicaIdentity = ErrorClass{ Class: "NOTIFY_BAD_POSTGRES_TABLE_REPLICA_IDENTITY", // contains filtered or unexported fields } ErrorNotifyPublicationMissing = ErrorClass{ Class: "NOTIFY_PUBLICATION_MISSING", // contains filtered or unexported fields } ErrorNotifyTablesNotInPublication = ErrorClass{ Class: "NOTIFY_TABLES_NOT_IN_PUBLICATION", // contains filtered or unexported fields } ErrorNotifyReplicationSlotMissing = ErrorClass{ Class: "NOTIFY_REPLICATION_SLOT_MISSING", // contains filtered or unexported fields } ErrorNotifyIncreaseLogicalDecodingWorkMem = ErrorClass{ Class: "NOTIFY_INCREASE_LOGICAL_DECODING_WORK_MEM", // contains filtered or unexported fields } ErrorUnsupportedDatatype = ErrorClass{ Class: "NOTIFY_UNSUPPORTED_DATATYPE", // contains filtered or unexported fields } ErrorNotifyInvalidSortKey = ErrorClass{ Class: "NOTIFY_INVALID_SORT_KEY", // contains filtered or unexported fields } ErrorNotifyInvalidSnapshotIdentifier = ErrorClass{ Class: "NOTIFY_INVALID_SNAPSHOT_IDENTIFIER", // contains filtered or unexported fields } ErrorNotifyInvalidSynchronizedStandbySlots = ErrorClass{ Class: "NOTIFY_INVALID_SYNCHRONIZED_STANDBY_SLOTS", // contains filtered or unexported fields } ErrorNotifyTerminate = ErrorClass{ Class: "NOTIFY_TERMINATE", // contains filtered or unexported fields } ErrorNotifyReplicationStandbySetup = ErrorClass{ Class: "NOTIFY_REPLICATION_STANDBY_SETUP", // contains filtered or unexported fields } ErrorInternal = ErrorClass{ Class: "INTERNAL", // contains filtered or unexported fields } ErrorDropFlow = ErrorClass{ Class: "DROP_FLOW", // contains filtered or unexported fields } ErrorIgnoreEOF = ErrorClass{ Class: "IGNORE_EOF", // contains filtered or unexported fields } ErrorIgnoreConnTemporary = ErrorClass{ Class: "IGNORE_CONN_TEMPORARY", // contains filtered or unexported fields } ErrorIgnoreContextCancelled = ErrorClass{ Class: "IGNORE_CONTEXT_CANCELLED", // contains filtered or unexported fields } ErrorRetryRecoverable = ErrorClass{ Class: "ERROR_RETRY_RECOVERABLE", // contains filtered or unexported fields } ErrorInternalClickHouse = ErrorClass{ Class: "INTERNAL_CLICKHOUSE", // contains filtered or unexported fields } ErrorLossyConversion = ErrorClass{ Class: "WARNING_LOSSY_CONVERSION", // contains filtered or unexported fields } ErrorUnsupportedSchemaChange = ErrorClass{ Class: "NOTIFY_UNSUPPORTED_SCHEMA_CHANGE", // contains filtered or unexported fields } // Postgres 16.9/17.5 etc. introduced a bug where certain workloads can cause logical replication to // request a memory allocation of >1GB, which is not allowed by Postgres. Fixed already, but we need to handle this error // https://github.com/postgres/postgres/commit/d87d07b7ad3b782cb74566cd771ecdb2823adf6a ErrorNotifyPostgresSlotMemalloc = ErrorClass{ Class: "NOTIFY_POSTGRES_SLOT_MEMALLOC", // contains filtered or unexported fields } // This RDS specific error is seen when we try to create a replication slot on a read-replica ErrNotifyPostgresCreatingSlotOnReader = ErrorClass{ Class: "NOTIFY_POSTGRES_CREATING_SLOT_ON_READER", // contains filtered or unexported fields } // Mongo specific, equivalent to slot invalidation in Postgres ErrorNotifyChangeStreamHistoryLost = ErrorClass{ Class: "NOTIFY_CHANGE_STREAM_HISTORY_LOST", // contains filtered or unexported fields } ErrorNotifyPostgresLogicalMessageProcessing = ErrorClass{ Class: "NOTIFY_POSTGRES_LOGICAL_MESSAGE_PROCESSING_ERROR", // contains filtered or unexported fields } ErrorNotifyClickHouseSupportIsDisabledError = ErrorClass{ Class: "NOTIFY_CLICKHOUSE_SUPPORT_IS_DISABLED_ERROR", // contains filtered or unexported fields } // Catch-all for unclassified errors ErrorOther = ErrorClass{ Class: "OTHER", // contains filtered or unexported fields } )
Functions ¶
func GetErrorClass ¶
func GetErrorClass(ctx context.Context, err error) (ErrorClass, ErrorInfo)
Types ¶
type AdditionalErrorAttributeKey ¶
type AdditionalErrorAttributeKey string
const ( ErrorAttributeKeyTable AdditionalErrorAttributeKey = "errorAdditionalAttributeTable" ErrorAttributeKeyColumn AdditionalErrorAttributeKey = "errorAdditionalAttributeColumn" )
func (AdditionalErrorAttributeKey) String ¶
func (e AdditionalErrorAttributeKey) String() string
type AlertSender ¶
type AlertSender interface {
// contains filtered or unexported methods
}
type AlertSenderConfig ¶
type AlertSenderConfig struct {
Sender AlertSender
AlertForMirrors []string
Id int64
}
type Alerter ¶
type Alerter struct {
shared.CatalogPool
// contains filtered or unexported fields
}
alerting service, no cool name :(
func NewAlerter ¶
func NewAlerter(ctx context.Context, catalogPool shared.CatalogPool, otelManager *otel_metrics.OtelManager) *Alerter
doesn't take care of closing pool, needs to be done externally.
func (*Alerter) AlertIfOpenConnections ¶
func (*Alerter) AlertIfSlotLag ¶
func (*Alerter) AlertIfTooLongSinceLastNormalize ¶
func (*Alerter) LogFlowError ¶
func (*Alerter) LogFlowEvent ¶
func (*Alerter) LogFlowInfo ¶
func (*Alerter) LogFlowWarning ¶
func (*Alerter) LogFlowWrappedError ¶
func (*Alerter) LogNonFlowCritical ¶
func (*Alerter) LogNonFlowError ¶
func (*Alerter) LogNonFlowEvent ¶
type EmailAlertSender ¶
type EmailAlertSender struct {
AlertSender
// contains filtered or unexported fields
}
func NewEmailAlertSender ¶
func NewEmailAlertSender(client *ses.Client, config *EmailAlertSenderConfig) *EmailAlertSender
func NewEmailAlertSenderWithNewClient ¶
func NewEmailAlertSenderWithNewClient(ctx context.Context, region *string, config *EmailAlertSenderConfig) (*EmailAlertSender, error)
type EmailAlertSenderConfig ¶
type ErrorAction ¶
type ErrorAction string
const ( NotifyUser ErrorAction = "notify_user" Ignore ErrorAction = "ignore" NotifyTelemetry ErrorAction = "notify_telemetry" )
func (ErrorAction) String ¶
func (e ErrorAction) String() string
type ErrorClass ¶
type ErrorClass struct {
Class string
// contains filtered or unexported fields
}
func (ErrorClass) ErrorAction ¶
func (e ErrorClass) ErrorAction() ErrorAction
func (ErrorClass) String ¶
func (e ErrorClass) String() string
type ErrorInfo ¶
type ErrorInfo struct {
AdditionalAttributes map[AdditionalErrorAttributeKey]string
Source ErrorSource
Code string
}
type ErrorSource ¶
type ErrorSource string
const ( ErrorSourceClickHouse ErrorSource = "clickhouse" ErrorSourcePostgres ErrorSource = "postgres" ErrorSourceMySQL ErrorSource = "mysql" ErrorSourceMongoDB ErrorSource = "mongodb" ErrorSourcePostgresCatalog ErrorSource = "postgres_catalog" ErrorSourceSSH ErrorSource = "ssh_tunnel" ErrorSourceNet ErrorSource = "net" ErrorSourceTemporal ErrorSource = "temporal" ErrorSourceOther ErrorSource = "other" )
func (ErrorSource) String ¶
func (e ErrorSource) String() string
type ServiceType ¶
type ServiceType string
const ( SLACK ServiceType = "slack" EMAIL ServiceType = "email" )
type SlackAlertSender ¶
type SlackAlertSender struct {
AlertSender
// contains filtered or unexported fields
}
Click to show internal directories.
Click to hide internal directories.