Documentation
¶
Index ¶
Constants ¶
const ( // DefaultEtcdTimeout is the default timeout config for etcd. DefaultEtcdTimeout = 5 * time.Second // DefaultRetryTime is the default retry time for each pump. DefaultRetryTime = 10 // DefaultBinlogWriteTimeout is the default max time binlog can use to write to pump. DefaultBinlogWriteTimeout = 15 * time.Second // CheckInterval is the default interval for check unavailable pumps. CheckInterval = 30 * time.Second )
const ( // Range means range strategy. Range = "range" // Hash means hash strategy. Hash = "hash" // Score means choose pump by it's score. Score = "score" // LocalUnix means will only use the local pump by unix socket. LocalUnix = "local unix" )
Variables ¶
var ( // ErrNoAvaliablePump means no available pump to write binlog. ErrNoAvaliablePump = errors.New("no available pump to write binlog") // CommitBinlogTimeout is the max retry duration time for write commit/rollback binlog. CommitBinlogTimeout = 10 * time.Minute // RetryInterval is the interval of retrying to write binlog. RetryInterval = 100 * time.Millisecond )
Functions ¶
This section is empty.
Types ¶
type HashSelector ¶
type HashSelector struct {
// the pumps to be selected.
Pumps []*PumpStatus
}
HashSelector select a pump by hash.
func (*HashSelector) Feedback ¶
func (*HashSelector) Feedback(startTS int64, binlogType pb.BinlogType, pump *PumpStatus)
Feedback implement PumpSelector.Feedback
func (*HashSelector) Select ¶
func (h *HashSelector) Select(binlog *pb.Binlog, retryTime int) *PumpStatus
Select implement PumpSelector.Select.
func (*HashSelector) SetPumps ¶
func (h *HashSelector) SetPumps(pumps []*PumpStatus)
SetPumps implement PumpSelector.SetPumps.
type LocalUnixSelector ¶
type LocalUnixSelector struct {
// the pump to be selected.
Pump *PumpStatus
}
LocalUnixSelector will always select the local pump, used for compatible with kafka version tidb-binlog.
func (*LocalUnixSelector) Feedback ¶
func (*LocalUnixSelector) Feedback(_ int64, _ pb.BinlogType, _ *PumpStatus)
Feedback implement PumpSelector.Feedback
func (*LocalUnixSelector) Select ¶
func (u *LocalUnixSelector) Select(_ *pb.Binlog, _ int) *PumpStatus
Select implement PumpSelector.Select.
func (*LocalUnixSelector) SetPumps ¶
func (u *LocalUnixSelector) SetPumps(pumps []*PumpStatus)
SetPumps implement PumpSelector.SetPumps.
type PumpInfos ¶
type PumpInfos struct {
// Pumps saves the map of pump's nodeID and pump status.
Pumps map[string]*PumpStatus
// AvliablePumps saves the whole available pumps' status.
AvaliablePumps map[string]*PumpStatus
// UnAvaliablePumps saves the unavailable pumps.
// And only pump with Online state in this map need check is it available.
UnAvaliablePumps map[string]*PumpStatus
}
PumpInfos saves pumps' information in pumps client.
type PumpSelector ¶
type PumpSelector interface {
// SetPumps set pumps to be selected.
SetPumps([]*PumpStatus)
// Select returns a situable pump. Tips: should call this function only one time for commit/rollback binlog.
Select(binlog *pb.Binlog, retryTime int) *PumpStatus
// Feedback set the corresponding relations between startTS and pump.
Feedback(startTS int64, binlogType pb.BinlogType, pump *PumpStatus)
}
PumpSelector selects pump for sending binlog.
func NewHashSelector ¶
func NewHashSelector() PumpSelector
NewHashSelector returns a new HashSelector.
func NewLocalUnixSelector ¶
func NewLocalUnixSelector() PumpSelector
NewLocalUnixSelector returns a new LocalUnixSelector.
func NewRangeSelector ¶
func NewRangeSelector() PumpSelector
NewRangeSelector returns a new ScoreSelector.
func NewScoreSelector ¶
func NewScoreSelector() PumpSelector
NewScoreSelector returns a new ScoreSelector.
func NewSelector ¶
func NewSelector(strategy string) PumpSelector
NewSelector returns a PumpSelector according to the strategy.
type PumpStatus ¶
type PumpStatus struct {
/*
Pump has these state:
Online:
only when pump's state is online that pumps client can write binlog to.
Pausing:
this pump is pausing, and can't provide write binlog service. And this state will turn into Paused when pump is quit.
Paused:
this pump is paused, and can't provide write binlog service.
Closing:
this pump is closing, and can't provide write binlog service. And this state will turn into Offline when pump is quit.
Offline:
this pump is offline, and can't provide write binlog service forever.
*/
sync.RWMutex
node.Status
// the pump is available or not, obsolete now
IsAvaliable bool
// the client of this pump
Client pb.PumpClient
ErrNum int64
// contains filtered or unexported fields
}
PumpStatus saves pump's status.
func NewPumpStatus ¶
func NewPumpStatus(status *node.Status, security *tls.Config) *PumpStatus
NewPumpStatus returns a new PumpStatus according to node's status.
func (*PumpStatus) IsUsable ¶
func (p *PumpStatus) IsUsable() bool
IsUsable returns true if pump is usable.
func (*PumpStatus) Reset ¶
func (p *PumpStatus) Reset()
Reset resets the pump's grpc conn and err num.
func (*PumpStatus) ResetGrpcClient ¶
func (p *PumpStatus) ResetGrpcClient()
ResetGrpcClient closes the pump's grpc connection.
func (*PumpStatus) ShouldBeUsable ¶
func (p *PumpStatus) ShouldBeUsable() bool
ShouldBeUsable returns true if pump should be usable
func (*PumpStatus) WriteBinlog ¶
func (p *PumpStatus) WriteBinlog(req *pb.WriteBinlogReq, timeout time.Duration) (*pb.WriteBinlogResp, error)
WriteBinlog write binlog by grpc client.
type PumpsClient ¶
type PumpsClient struct {
sync.RWMutex
// ClusterID is the cluster ID of this tidb cluster.
ClusterID uint64
// the registry of etcd.
EtcdRegistry *node.EtcdRegistry
// Pumps saves the pumps' information.
Pumps *PumpInfos
// Selector will select a suitable pump.
Selector PumpSelector
// the max retry time if write binlog failed, obsolete now.
RetryTime int
// BinlogWriteTimeout is the max time binlog can use to write to pump.
BinlogWriteTimeout time.Duration
// Security is the security config
Security *tls.Config
// contains filtered or unexported fields
}
PumpsClient is the client of pumps.
func NewLocalPumpsClient ¶
func NewLocalPumpsClient(etcdURLs, binlogSocket string, timeout time.Duration, securityOpt pd.SecurityOption) (*PumpsClient, error)
NewLocalPumpsClient returns a PumpsClient, this PumpsClient will write binlog by socket file. For compatible with kafka version pump.
func NewPumpsClient ¶
func NewPumpsClient(etcdURLs, strategy string, timeout time.Duration, securityOpt pd.SecurityOption) (*PumpsClient, error)
NewPumpsClient returns a PumpsClient. TODO: get strategy from etcd, and can update strategy in real-time. Use Range as default now.
func (*PumpsClient) SetSelectStrategy ¶
func (c *PumpsClient) SetSelectStrategy(strategy string) error
SetSelectStrategy sets the selector's strategy, strategy should be 'range' or 'hash' now.
func (*PumpsClient) WriteBinlog ¶
func (c *PumpsClient) WriteBinlog(binlog *pb.Binlog) error
WriteBinlog writes binlog to a suitable pump. Tips: will never return error for commit/rollback binlog.
type RangeSelector ¶
type RangeSelector struct {
// Offset saves the offset in Pumps.
Offset int
// the pumps to be selected.
Pumps []*PumpStatus
}
RangeSelector select a pump by range.
func (*RangeSelector) Feedback ¶
func (*RangeSelector) Feedback(startTS int64, binlogType pb.BinlogType, pump *PumpStatus)
Feedback implement PumpSelector.Select
func (*RangeSelector) Select ¶
func (r *RangeSelector) Select(binlog *pb.Binlog, _ int) *PumpStatus
Select implement PumpSelector.Select.
func (*RangeSelector) SetPumps ¶
func (r *RangeSelector) SetPumps(pumps []*PumpStatus)
SetPumps implement PumpSelector.SetPumps.
type ScoreSelector ¶
type ScoreSelector struct{}
ScoreSelector select a pump by pump's score.
func (*ScoreSelector) Feedback ¶
func (*ScoreSelector) Feedback(_ int64, _ pb.BinlogType, _ *PumpStatus)
Feedback implement PumpSelector.Feedback
func (*ScoreSelector) Select ¶
func (*ScoreSelector) Select(_ *pb.Binlog, _ int) *PumpStatus
Select implement PumpSelector.Select.
func (*ScoreSelector) SetPumps ¶
func (*ScoreSelector) SetPumps(_ []*PumpStatus)
SetPumps implement PumpSelector.SetPumps.