Documentation
¶
Index ¶
- Constants
- func NewPartitionFilterAfterDigest(namespace, digest string) (*a.PartitionFilter, error)
- func NewPartitionFilterAll() *a.PartitionFilter
- func NewPartitionFilterByDigest(namespace, digest string) (*a.PartitionFilter, error)
- func NewPartitionFilterByID(partitionID int) *a.PartitionFilter
- func NewPartitionFilterByRange(begin, count int) *a.PartitionFilter
- func ParseSecret(config *SecretAgentConfig, secret string) (string, error)
- func ReadPrivateKey(encPolicy *EncryptionPolicy, saConfig *SecretAgentConfig) ([]byte, error)
- type AerospikeClient
- type BackupConfig
- type BackupHandler
- type Client
- func (c *Client) AerospikeClient() AerospikeClient
- func (c *Client) Backup(ctx context.Context, config *BackupConfig, writer Writer, ...) (*BackupHandler, error)
- func (c *Client) Estimate(ctx context.Context, config *BackupConfig, estimateSamples int64) (uint64, error)
- func (c *Client) Restore(ctx context.Context, config *RestoreConfig, streamingReader StreamingReader) (*RestoreHandler, error)
- type ClientOpt
- type CompressionPolicy
- type Decoder
- type Encoder
- type EncoderType
- type EncryptionPolicy
- type RestoreConfig
- type RestoreHandler
- type RestoreNamespaceConfig
- type SecretAgentConfig
- type State
- type StreamingReader
- type Writer
Constants ¶
const ( // MinParallel is the minimum number of workers to use during an operation. MinParallel = 1 // MaxParallel is the maximum number of workers to use during an operation. MaxParallel = 1024 // MaxPartitions is the maximum number of partitions in an Aerospike cluster. MaxPartitions = 4096 )
const ( // CompressNone no compression. CompressNone = "NONE" // CompressZSTD compression using ZSTD. CompressZSTD = "ZSTD" )
Compression modes
const ( // EncryptNone no encryption. EncryptNone = "NONE" // EncryptAES128 encryption using AES128 algorithm. EncryptAES128 = "AES128" // EncryptAES256 encryption using AES256 algorithm. EncryptAES256 = "AES256" )
Encryption modes
Variables ¶
This section is empty.
Functions ¶
func NewPartitionFilterAfterDigest ¶ added in v0.2.0
func NewPartitionFilterAfterDigest(namespace, digest string) (*a.PartitionFilter, error)
NewPartitionFilterAfterDigest returns partition filter to scan call records after digest.
func NewPartitionFilterAll ¶ added in v0.2.0
func NewPartitionFilterAll() *a.PartitionFilter
NewPartitionFilterAll returns a partition range containing all partitions.
func NewPartitionFilterByDigest ¶ added in v0.2.0
func NewPartitionFilterByDigest(namespace, digest string) (*a.PartitionFilter, error)
NewPartitionFilterByDigest returns a partition filter by digest with specified value.
func NewPartitionFilterByID ¶ added in v0.2.0
func NewPartitionFilterByID(partitionID int) *a.PartitionFilter
NewPartitionFilterByID returns a partition filter by id with specified id.
func NewPartitionFilterByRange ¶ added in v0.2.0
func NewPartitionFilterByRange(begin, count int) *a.PartitionFilter
NewPartitionFilterByRange returns a partition range with boundaries specified by the provided values.
func ParseSecret ¶ added in v0.2.0
func ParseSecret(config *SecretAgentConfig, secret string) (string, error)
ParseSecret check if string contains secret and tries to load secret from secret agent.
func ReadPrivateKey ¶ added in v0.2.0
func ReadPrivateKey(encPolicy *EncryptionPolicy, saConfig *SecretAgentConfig) ([]byte, error)
ReadPrivateKey parses and loads a private key according to the EncryptionPolicy configuration. It can load the private key from a file, env variable or Secret Agent. A valid agent parameter is required to load the key from Aerospike Secret Agent. Pass in nil for any other option.
Types ¶
type AerospikeClient ¶
type AerospikeClient interface { GetDefaultScanPolicy() *a.ScanPolicy GetDefaultInfoPolicy() *a.InfoPolicy GetDefaultWritePolicy() *a.WritePolicy Put(policy *a.WritePolicy, key *a.Key, bins a.BinMap) a.Error CreateComplexIndex(policy *a.WritePolicy, namespace string, set string, indexName string, binName string, indexType a.IndexType, indexCollectionType a.IndexCollectionType, ctx ...*a.CDTContext, ) (*a.IndexTask, a.Error) DropIndex(policy *a.WritePolicy, namespace string, set string, indexName string) a.Error RegisterUDF(policy *a.WritePolicy, udfBody []byte, serverPath string, language a.Language, ) (*a.RegisterTask, a.Error) BatchOperate(policy *a.BatchPolicy, records []a.BatchRecordIfc) a.Error Cluster() *a.Cluster ScanPartitions(scanPolicy *a.ScanPolicy, partitionFilter *a.PartitionFilter, namespace string, setName string, binNames ...string) (*a.Recordset, a.Error) ScanNode(scanPolicy *a.ScanPolicy, node *a.Node, namespace string, setName string, binNames ...string, ) (*a.Recordset, a.Error) Close() GetNodes() []*a.Node }
AerospikeClient describes aerospike client interface for easy mocking.
type BackupConfig ¶
type BackupConfig struct { // InfoPolicy applies to Aerospike Info requests made during backup and // restore. If nil, the Aerospike client's default policy will be used. InfoPolicy *a.InfoPolicy // ScanPolicy applies to Aerospike scan operations made during backup and // restore. If nil, the Aerospike client's default policy will be used. ScanPolicy *a.ScanPolicy // Only include records that last changed before the given time (optional). ModBefore *time.Time // Only include records that last changed after the given time (optional). ModAfter *time.Time // Encryption details. EncryptionPolicy *EncryptionPolicy // Compression details. CompressionPolicy *CompressionPolicy // Secret agent config. SecretAgentConfig *SecretAgentConfig // PartitionFilters specifies the Aerospike partitions to back up. // Partition filters can be ranges, individual partitions, // or records after a specific digest within a single partition. // Note: // if not default partition filter NewPartitionFilterAll() is used, // each partition filter is an individual task which cannot be parallelized, // so you can only achieve as much parallelism as there are partition filters. // You may increase parallelism by dividing up partition ranges manually. // AfterDigest: // afterDigest filter can be applied with // NewPartitionFilterAfterDigest(namespace, digest string) (*a.PartitionFilter, error) // Backup records after record digest in record's partition plus all succeeding partitions. // Used to resume backup with last record received from previous incomplete backup. // This parameter will overwrite PartitionFilters.Begin value. // Can't be used in full backup mode. // This parameter is mutually exclusive to partition-list (not implemented). // Format: base64 encoded string. // Example: EjRWeJq83vEjRRI0VniavN7xI0U= PartitionFilters []*a.PartitionFilter // Namespace is the Aerospike namespace to back up. Namespace string // NodeList contains a list of nodes to back up. // <IP addr 1>:<port 1>[,<IP addr 2>:<port 2>[,...]] // <IP addr 1>:<TLS_NAME 1>:<port 1>[,<IP addr 2>:<TLS_NAME 2>:<port 2>[,...]] // Backup the given cluster nodes only. // If it is set, ParallelNodes automatically set to true. // This argument is mutually exclusive to partition-list/AfterDigest arguments. NodeList []string // SetList is the Aerospike set to back up (optional, given an empty list, // all sets will be backed up). SetList []string // The list of backup bin names // (optional, given an empty list, all bins will be backed up) BinList []string // ParallelNodes specifies how to perform scan. // If set to true, we launch parallel workers for nodes; otherwise workers run in parallel for partitions. // Excludes PartitionFilters param. ParallelNodes bool // EncoderType describes an Encoder type that will be used on backing up. // Default `EncoderTypeASB` = 0. EncoderType EncoderType // ParallelRead is the number of concurrent scans to run against the Aerospike cluster. ParallelRead int // ParallelWrite is the number of concurrent backup files writing. ParallelWrite int // Don't back up any records. NoRecords bool // Don't back up any secondary indexes. NoIndexes bool // Don't back up any UDFs. NoUDFs bool // RecordsPerSecond limits backup records per second (rps) rate. // Will not apply rps limit if RecordsPerSecond is zero (default). RecordsPerSecond int // Limits backup bandwidth (bytes per second). // Will not apply rps limit if Bandwidth is zero (default). Bandwidth int // File size limit (in bytes) for the backup. If a backup file exceeds this // size threshold, a new file will be created. 0 for no file size limit. FileLimit int64 // Do not apply base-64 encoding to BLOBs: Bytes, HLL, RawMap, RawList. // Results in smaller backup files. Compact bool // Only include records that have no ttl set (persistent records). NoTTLOnly bool // Name of a state file that will be saved in backup directory. // Works only with FileLimit parameter. // As we reach FileLimit and close file, the current state will be saved. // Works only for default and/or partition backup. // Not work with ParallelNodes or NodeList. StateFile string // Resumes an interrupted/failed backup from where it was left off, given the .state file // that was generated from the interrupted/failed run. // Works only for default and/or partition backup. Not work with ParallelNodes or NodeList. Continue bool // How many records will be read on one iteration for continuation backup. // Affects size if overlap on resuming backup after an error. // By default, it must be zero. If any value is set, reading from Aerospike will be paginated. // Which affects the performance and RAM usage. PageSize int64 // If set to true, the same number of workers will be created for each stage of the pipeline. // Each worker will be connected to the next stage worker with a separate unbuffered channel. SyncPipelines bool // When using directory parameter, prepend a prefix to the names of the generated files. OutputFilePrefix string }
BackupConfig contains configuration for the backup operation.
func NewDefaultBackupConfig ¶
func NewDefaultBackupConfig() *BackupConfig
NewDefaultBackupConfig returns a new BackupConfig with default values.
type BackupHandler ¶
type BackupHandler struct {
// contains filtered or unexported fields
}
BackupHandler handles a backup job.
func (*BackupHandler) GetStats ¶
func (bh *BackupHandler) GetStats() *models.BackupStats
GetStats returns the stats of the backup job
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is the main entry point for the backup package. It wraps an aerospike client and provides methods to start backup and restore operations. Example usage:
asc, aerr := a.NewClientWithPolicy(...) // create an aerospike client if aerr != nil { // handle error } backupClient, err := backup.NewClient(asc, backup.WithID("id")) // create a backup client if err != nil { // handle error } writers, err := backup.NewWriterLocalDir("backups_folder", false) if err != nil { // handle error } // use the backup client to start backup and restore operations ctx := context.Background() backupHandler, err := backupClient.Backup(ctx, writers, nil) if err != nil { // handle error } // optionally, check the stats of the backup operation stats := backupHandler.Stats() // use the backupHandler to wait for the backup operation to finish ctx := context.Background() if err = backupHandler.Wait(ctx); err != nil { // handle error }
func NewClient ¶
func NewClient(ac AerospikeClient, opts ...ClientOpt) (*Client, error)
NewClient creates a new backup client.
- ac is the aerospike client to use for backup and restore operations.
options:
- WithID to set an identifier for the client.
- WithLogger to set a logger that this client will log to.
- WithScanLimiter to set a semaphore that is used to limit number of concurrent scans.
func (*Client) AerospikeClient ¶
func (c *Client) AerospikeClient() AerospikeClient
AerospikeClient returns the underlying aerospike client.
func (*Client) Backup ¶
func (c *Client) Backup( ctx context.Context, config *BackupConfig, writer Writer, reader StreamingReader, ) (*BackupHandler, error)
Backup starts a backup operation that writes data to a provided writer.
- ctx can be used to cancel the backup operation.
- config is the configuration for the backup operation.
- writer creates new writers for the backup operation.
- reader is used only for reading a state file for continuation operations.
func (*Client) Estimate ¶ added in v0.2.0
func (c *Client) Estimate( ctx context.Context, config *BackupConfig, estimateSamples int64) (uint64, error)
Estimate calculates the backup size from random sample of estimateSamples records number.
- ctx can be used to cancel the calculation operation.
- config is the backup configuration for the calculation operation.
- estimateSamples is number of records to be scanned for calculations.
func (*Client) Restore ¶
func (c *Client) Restore( ctx context.Context, config *RestoreConfig, streamingReader StreamingReader, ) (*RestoreHandler, error)
Restore starts a restore operation that reads data from given readers. The backup data may be in a single file or multiple files.
- ctx can be used to cancel the restore operation.
- config is the configuration for the restore operation.
- streamingReader provides readers with access to backup data.
type ClientOpt ¶
type ClientOpt func(*Client)
ClientOpt is a functional option that allows configuring the Client.
func WithLogger ¶
WithLogger sets the logger for the Client.
type CompressionPolicy ¶
type CompressionPolicy struct { // The compression mode to be used (default is NONE). Mode string `yaml:"mode,omitempty" json:"mode,omitempty" default:"NONE" enums:"NONE,ZSTD"` // The compression level to use (or -1 if unspecified). Level int `yaml:"level,omitempty" json:"level,omitempty"` }
CompressionPolicy contains backup compression information.
func NewCompressionPolicy ¶ added in v0.2.0
func NewCompressionPolicy(mode string, level int) *CompressionPolicy
NewCompressionPolicy returns new compression policy for backup/restore operations.
type Decoder ¶
Decoder is an interface for reading backup data as tokens. It is used to support different data formats. While the return type is `any`, the actual types returned should only be the types exposed by the models package. e.g. *models.Record, *models.UDF and *models.SecondaryIndex
func NewDecoder ¶
func NewDecoder(eType EncoderType, src io.Reader) (Decoder, error)
NewDecoder returns a new Decoder according to `EncoderType`.
type Encoder ¶
type Encoder interface { EncodeToken(*models.Token) ([]byte, error) GetHeader() []byte GenerateFilename(prefix, suffix string) string }
Encoder is an interface for encoding the types from the models package. It is used to support different data formats.
func NewEncoder ¶
func NewEncoder(eType EncoderType, namespace string, compact bool) Encoder
NewEncoder returns a new Encoder according to `EncoderType`.
type EncoderType ¶
type EncoderType int
EncoderType custom type for Encoder types enum.
const ( // EncoderTypeASB matches ASB Encoder with id 0. EncoderTypeASB EncoderType = iota )
type EncryptionPolicy ¶
type EncryptionPolicy struct { // The path to the file containing the encryption key. KeyFile *string `yaml:"key-file,omitempty" json:"key-file,omitempty"` // The name of the environment variable containing the encryption key. KeyEnv *string `yaml:"key-env,omitempty" json:"key-env,omitempty"` // The secret keyword in Aerospike Secret Agent containing the encryption key. KeySecret *string `yaml:"key-secret,omitempty" json:"key-secret,omitempty"` // The encryption mode to be used (NONE, AES128, AES256) Mode string `yaml:"mode,omitempty" json:"mode,omitempty" default:"NONE" enums:"NONE,AES128,AES256"` }
EncryptionPolicy contains backup encryption information.
type RestoreConfig ¶
type RestoreConfig struct { // InfoPolicy applies to Aerospike Info requests made during backup and restore // If nil, the Aerospike client's default policy will be used. InfoPolicy *a.InfoPolicy // WritePolicy applies to Aerospike write operations made during backup and restore // If nil, the Aerospike client's default policy will be used. WritePolicy *a.WritePolicy // Namespace details for the restore operation. // By default, the data is restored to the namespace from which it was taken. Namespace *RestoreNamespaceConfig `json:"namespace,omitempty"` // Encryption details. EncryptionPolicy *EncryptionPolicy // Compression details. CompressionPolicy *CompressionPolicy // Configuration of retries for each restore write operation. // If nil, no retries will be performed. RetryPolicy *models.RetryPolicy // Secret agent config. SecretAgentConfig *SecretAgentConfig // The sets to restore (optional, given an empty list, all sets will be restored). SetList []string // The bins to restore (optional, given an empty list, all bins will be restored). BinList []string // EncoderType describes an Encoder type that will be used on restoring. // Default `EncoderTypeASB` = 0. EncoderType EncoderType // Parallel is the number of concurrent record readers from backup files. Parallel int // RecordsPerSecond limits restore records per second (rps) rate. // Will not apply rps limit if RecordsPerSecond is zero (default). RecordsPerSecond int // Limits restore bandwidth (bytes per second). // Will not apply rps limit if Bandwidth is zero (default). Bandwidth int // Don't restore any records. NoRecords bool // Don't restore any secondary indexes. NoIndexes bool // Don't restore any UDFs. NoUDFs bool // Disables the use of batch writes when restoring records to the Aerospike cluster. DisableBatchWrites bool // The max allowed number of records per batch write call. BatchSize int // Max number of parallel writers to target AS cluster. MaxAsyncBatches int // Amount of extra time-to-live to add to records that have expirable void-times. // Must be set in seconds. ExtraTTL int64 // Ignore permanent record-specific error. // E.g.: AEROSPIKE_RECORD_TOO_BIG. // By default, such errors are not ignored and restore terminates. IgnoreRecordError bool }
RestoreConfig contains configuration for the restore operation.
func NewDefaultRestoreConfig ¶
func NewDefaultRestoreConfig() *RestoreConfig
NewDefaultRestoreConfig returns a new RestoreConfig with default values.
type RestoreHandler ¶
type RestoreHandler struct {
// contains filtered or unexported fields
}
RestoreHandler handles a restore job using the given reader.
func (*RestoreHandler) GetStats ¶
func (rh *RestoreHandler) GetStats() *models.RestoreStats
GetStats returns the stats of the restore job.
type RestoreNamespaceConfig ¶
type RestoreNamespaceConfig struct { // Original namespace name. Source *string `json:"source,omitempty" example:"source-ns" validate:"required"` // Destination namespace name. Destination *string `json:"destination,omitempty" example:"destination-ns" validate:"required"` }
RestoreNamespaceConfig specifies an alternative namespace name for the restore operation, where Source is the original namespace name and Destination is the namespace name to which the backup data is to be restored.
type SecretAgentConfig ¶
type SecretAgentConfig struct { // Connection type: tcp, unix. // Use constants form `secret-agent`: `ConnectionTypeTCP` or `ConnectionTypeUDS` ConnectionType *string `yaml:"sa-connection-type,omitempty" json:"sa-connection-type,omitempty"` // Secret agent host for TCP connection or socket file path for UDS connection. Address *string `yaml:"sa-address,omitempty" json:"sa-address,omitempty"` // Secret agent port (only for TCP connection). Port *int `yaml:"sa-port,omitempty" json:"sa-port,omitempty"` // Secret agent connection and reading timeout. // Default: 1000 millisecond. TimeoutMillisecond *int `yaml:"sa-timeout-millisecond,omitempty" json:"sa-timeout-millisecond,omitempty"` // Path to ca file for encrypted connection. CaFile *string `yaml:"sa-ca-file,omitempty" json:"sa-ca-file,omitempty"` // Flag that shows if secret agent responses are encrypted with base64. IsBase64 *bool `yaml:"sa-is-base64,omitempty" json:"sa-is-base64,omitempty"` }
SecretAgentConfig contains Secret Agent connection information.
type State ¶ added in v0.2.0
type State struct { // Counter to count how many times State instance was initialized. // Is used to create suffix for backup files. Counter int // RecordsStateChan communication channel to save current filter state. RecordsStateChan chan models.PartitionFilterSerialized // RecordStates store states of all filters. RecordStates map[int]models.PartitionFilterSerialized RecordStatesSaved map[int]models.PartitionFilterSerialized // SaveCommandChan command to save current state for worker. SaveCommandChan chan int // File to save state to. FileName string // contains filtered or unexported fields }
State contains current backups status data.
func NewState ¶ added in v0.2.0
func NewState( ctx context.Context, config *BackupConfig, reader StreamingReader, writer Writer, logger *slog.Logger, ) (*State, error)
NewState returns new state instance depending on config. If we continue back up, the state will be loaded from a state file, if it is the first operation, new state instance will be returned.
type StreamingReader ¶
type StreamingReader interface { // StreamFiles creates readers from files and sends them to the channel. // In case of an error, the error is sent to the error channel. // Must be run in a goroutine `go rh.reader.StreamFiles(ctx, readersCh, errorsCh)`. StreamFiles(context.Context, chan<- io.ReadCloser, chan<- error) // StreamFile creates a single file reader and sends io.Readers to the `readersCh` // In case of an error, it is sent to the `errorsCh` channel. // Must be run in a goroutine `go rh.reader.StreamFile()`. StreamFile(ctx context.Context, filename string, readersCh chan<- io.ReadCloser, errorsCh chan<- error) // GetType returns the type of storage. Used in logging. GetType() string }
StreamingReader provides access to data that should be restored.
type Writer ¶
type Writer interface { // NewWriter returns new writer for backup logic to use. Each call creates // a new writer, they might be working in parallel. Backup logic will close // the writer after backup is done. Header func is executed on a writer // after creation (on each one in case of multipart file). NewWriter(ctx context.Context, filename string) (io.WriteCloser, error) // GetType returns the type of storage. Used in logging. GetType() string // RemoveFiles removes a backup file or files from directory. RemoveFiles(ctx context.Context) error }
Writer provides access to backup storage. Exported for integration tests.
Source Files
¶
- client.go
- config_backup.go
- config_partition_filter.go
- config_policy_compression.go
- config_policy_encryption.go
- config_restore.go
- config_restore_namespace.go
- config_secret_agent.go
- estimate.go
- handler_backup.go
- handler_backup_records.go
- handler_restore.go
- io_encoding.go
- io_encryption.go
- secret_agent.go
- shared.go
- state.go
- token_reader.go
- writers.go