Documentation
¶
Index ¶
- Constants
- Variables
- func CreateMutationStreamReader(streamId common.StreamId, bucketQueueMap BucketQueueMap, supvCmdch MsgChannel, ...) (MutationStreamReader, Message)
- func GetLocalIP() (net.IP, error)
- func IsIPLocal(ip string) bool
- func NewAdminManager(supvCmdch MsgChannel, supvRespch MsgChannel) (AdminManager, Message)
- func NewAtomicMutationQueue(numVbuckets uint16) *atomicMutationQueue
- func NewCbqBridge(supvCmdch MsgChannel, supvRespch MsgChannel) (CbqBridge, Message)
- func NewClustMgrSender(supvCmdch MsgChannel, supvRespch MsgChannel) (ClustMgrSender, Message)
- func NewFlusher() *flusher
- func NewForestDBSlice(name string, sliceId SliceId, idxDefnId common.IndexDefnId, ...) (*fdbSlice, error)
- func NewIndexer(numVbuckets uint16) (Indexer, Message)
- func NewKVSender(supvCmdch MsgChannel, supvRespch MsgChannel, numVbuckets uint16) (KVSender, Message)
- func NewMutationManager(supvCmdch MsgChannel, supvRespch MsgChannel, numVbuckets uint16) (MutationManager, Message)
- func NewScanCoordinator(supvCmdch MsgChannel, supvRespch MsgChannel) (ScanCoordinator, Message)
- func NewSlabManager(startChunkSize int, slabSize int, maxMemAlloc uint64) (SlabManager, Message)
- func NewSnapshotContainer() *snapshotContainer
- func NewStorageManager(supvCmdch MsgChannel, supvRespch MsgChannel) (StorageManager, Message)
- func NewTimekeeper(supvCmdch MsgChannel, supvRespch MsgChannel) (Timekeeper, Message)
- type AdminManager
- type BucketFlushEnabledMap
- type BucketFlushInProgressMap
- type BucketHWTMap
- type BucketIndexCountMap
- type BucketLastTsFlushedMap
- type BucketNewTsReqdMap
- type BucketQueueMap
- type BucketStopChMap
- type BucketSyncCountMap
- type BucketTsListMap
- type CbqBridge
- type ClustMgrSender
- type Counter
- type DoneChannel
- type Error
- type Exister
- type Flusher
- type ForestDBIterator
- func (f *ForestDBIterator) Close()
- func (f *ForestDBIterator) Current() ([]byte, []byte, bool)
- func (f *ForestDBIterator) Key() []byte
- func (f *ForestDBIterator) Next()
- func (f *ForestDBIterator) Seek(key []byte)
- func (f *ForestDBIterator) SeekFirst()
- func (f *ForestDBIterator) Valid() bool
- func (f *ForestDBIterator) Value() []byte
- type HashedSliceContainer
- func (sc *HashedSliceContainer) AddSlice(id SliceId, s Slice)
- func (sc *HashedSliceContainer) GetAllSlices() []Slice
- func (sc *HashedSliceContainer) GetSliceById(id SliceId) Slice
- func (sc *HashedSliceContainer) GetSliceByIndexKey(key common.IndexKey) Slice
- func (sc *HashedSliceContainer) GetSliceIdByIndexKey(key common.IndexKey) SliceId
- func (sc *HashedSliceContainer) RemoveSlice(id SliceId)
- func (sc *HashedSliceContainer) UpdateSlice(id SliceId, s Slice)
- type Inclusion
- type IndexError
- type IndexInfo
- type IndexMetaResponse
- type IndexPartnMap
- type IndexQueueMap
- type IndexReader
- type IndexRequest
- type IndexRow
- type IndexScanResponse
- type IndexWriter
- type Indexer
- type IndexerId
- type IndexerMutationQueue
- type IndexerState
- type InitialBuildInfo
- type KVSender
- type Key
- type Keybytes
- type Looker
- type Message
- type MsgChannel
- type MsgCreateIndex
- type MsgDropIndex
- type MsgError
- type MsgGeneral
- type MsgMutMgrFlushDone
- type MsgMutMgrFlushMutationQueue
- func (m *MsgMutMgrFlushMutationQueue) GetBucket() string
- func (m *MsgMutMgrFlushMutationQueue) GetMsgType() MsgType
- func (m *MsgMutMgrFlushMutationQueue) GetStreamId() common.StreamId
- func (m *MsgMutMgrFlushMutationQueue) GetTimestamp() Timestamp
- func (m *MsgMutMgrFlushMutationQueue) String() string
- type MsgMutMgrGetTimestamp
- type MsgScanIndex
- func (m *MsgScanIndex) GetCountChannel() chan uint64
- func (m *MsgScanIndex) GetErrorChannel() chan Message
- func (m *MsgScanIndex) GetIndexInstId() common.IndexInstId
- func (m *MsgScanIndex) GetMsgType() MsgType
- func (m *MsgScanIndex) GetParams() ScanParams
- func (m *MsgScanIndex) GetResultChannel() chan Value
- func (m *MsgScanIndex) GetScanId() int64
- func (m *MsgScanIndex) GetStopChannel() StopChannel
- type MsgStream
- type MsgStreamError
- type MsgStreamUpdate
- func (m *MsgStreamUpdate) GetBucket() string
- func (m *MsgStreamUpdate) GetIndexList() []common.IndexInst
- func (m *MsgStreamUpdate) GetMsgType() MsgType
- func (m *MsgStreamUpdate) GetResponseChannel() MsgChannel
- func (m *MsgStreamUpdate) GetStreamId() common.StreamId
- func (m *MsgStreamUpdate) GetTimestamp() Timestamp
- func (m *MsgStreamUpdate) String() string
- type MsgSuccess
- type MsgTKEnableFlush
- type MsgTKInitBuildDone
- type MsgTKMergeStream
- type MsgTKStabilityTS
- type MsgTimestamp
- type MsgType
- type MsgUpdateBucketQueue
- type MsgUpdateInstMap
- type MsgUpdatePartnMap
- type MutationChannel
- type MutationKeys
- type MutationManager
- type MutationMeta
- type MutationQueue
- type MutationStreamReader
- type NodeInfo
- type PartitionInst
- type PartitionInstMap
- type QueryParams
- type RangeCounter
- type Ranger
- type RequestType
- type ResponseStatus
- type ScanCoordinator
- type ScanParams
- type ScanType
- type Seqno
- type SlabManager
- type Slice
- type SliceContainer
- type SliceId
- type SliceStatus
- type Snapshot
- type SnapshotContainer
- type SortOrder
- type StabilityTimestamp
- type StopChannel
- type StorageManager
- type StreamAddressMap
- type StreamStatus
- type StreamStatusMap
- type Timekeeper
- type Timestamp
- type Value
- type Vbucket
- type Vbuuid
Constants ¶
const ( ERROR_PANIC errCode = iota //Slab Manager ERROR_SLAB_INIT ERROR_SLAB_BAD_ALLOC_REQUEST ERROR_SLAB_INTERNAL_ALLOC_ERROR ERROR_SLAB_MEM_LIMIT_EXCEED ERROR_SLAB_INTERNAL_ERROR //Stream Reader ERROR_STREAM_INIT ERROR_STREAM_READER_UNKNOWN_COMMAND ERROR_STREAM_READER_STREAM_SHUTDOWN ERROR_STREAM_READER_RESTART_VBUCKETS ERROR_STREAM_READER_UNKNOWN_ERROR ERROR_STREAM_READER_PANIC //Mutation Manager ERROR_MUT_MGR_INTERNAL_ERROR ERROR_MUT_MGR_STREAM_ALREADY_OPEN ERROR_MUT_MGR_STREAM_ALREADY_CLOSED ERROR_MUT_MGR_UNKNOWN_COMMAND ERROR_MUT_MGR_UNCLEAN_SHUTDOWN ERROR_MUT_MGR_PANIC //Mutation Queue ERROR_MUTATION_QUEUE_INIT //Timekeeper ERROR_TK_UNKNOWN_STREAM //KVSender ERROR_KVSENDER_UNKNOWN_INDEX ERROR_KVSENDER_STREAM_ALREADY_OPEN ERROR_KVSENDER_STREAM_REQUEST_ERROR ERROR_KV_SENDER_UNKNOWN_STREAM ERROR_KVSENDER_STREAM_ALREADY_CLOSED //ScanCoordinator ERROR_SCAN_COORD_UNKNOWN_COMMAND ERROR_SCAN_COORD_INTERNAL_ERROR //INDEXER ERROR_INDEX_ALREADY_EXISTS ERROR_INDEXER_INTERNAL_ERROR ERROR_INDEX_BUILD_IN_PROGRESS ERROR_INDEXER_UNKNOWN_INDEX )
const ( FATAL errSeverity = iota NORMAL )
const ( MESSAGING errCategory = iota STORAGE MUTATION_QUEUE TOPOLOGY STREAM_READER SLAB_MANAGER MUTATION_MANAGER TIMEKEEPER SCAN_COORD INDEXER )
const ( Unsorted SortOrder = "none" Asc = "asc" Desc = "desc" )
const ( //General Messages MSG_SUCCESS = iota MSG_ERROR MSG_TIMESTAMP //STREAM_READER STREAM_READER_STREAM_DROP_DATA STREAM_READER_STREAM_BEGIN STREAM_READER_STREAM_END STREAM_READER_SYNC STREAM_READER_UPDATE_QUEUE_MAP STREAM_READER_ERROR STREAM_READER_SHUTDOWN //MUTATION_MANAGER MUT_MGR_PERSIST_MUTATION_QUEUE MUT_MGR_DRAIN_MUTATION_QUEUE MUT_MGR_GET_MUTATION_QUEUE_HWT MUT_MGR_GET_MUTATION_QUEUE_LWT MUT_MGR_SHUTDOWN MUT_MGR_FLUSH_DONE //TIMEKEEPER TK_SHUTDOWN TK_STABILITY_TIMESTAMP TK_INIT_BUILD_DONE TK_ENABLE_FLUSH TK_MERGE_STREAM //STORAGE_MANAGER STORAGE_MGR_SHUTDOWN //KVSender KV_SENDER_SHUTDOWN KV_SENDER_GET_CURR_KV_TS //ADMIN_MGR ADMIN_MGR_SHUTDOWN //CLUSTER_MGR CLUST_MGR_SENDER_SHUTDOWN //CBQ_BRIDGE_SHUTDOWN CBQ_BRIDGE_SHUTDOWN //INDEXER INDEXER_CREATE_INDEX_DDL INDEXER_DROP_INDEX_DDL //SCAN COORDINATOR SCAN_COORD_SCAN_INDEX SCAN_COORD_SCAN_PARTITION SCAN_COORD_SCAN_SLICE SCAN_COORD_SHUTDOWN //COMMON UPDATE_INDEX_INSTANCE_MAP UPDATE_INDEX_PARTITION_MAP OPEN_STREAM ADD_INDEX_LIST_TO_STREAM REMOVE_INDEX_LIST_FROM_STREAM CLOSE_STREAM CLEANUP_STREAM )
const CBQ_BRIDGE_HTTP_ADDR = ":9101"
Cbq Bridge Http Address on which it listens to messages from Cbq Server
const DEFAULT_GROWTH_FACTOR float64 = 2.0
const DEFAULT_MAX_SLAB_MEMORY = DEFAULT_SLAB_SIZE * 1024
const DEFAULT_NUM_STREAM_READER_WORKERS = 8
Default Number of Workers started by a stream reader to processed incoming mutation. Max can be upto the number of vbuckets and minimum must be equal to the number of vbuckets
const DEFAULT_POOL = "default"
Default Pool Name
const DEFAULT_PROJECTOR_ADMIN_PORT_ENDPOINT = "localhost:9999"
Projector Admin Port Endpoint on which projector is listening for admin requests
const DEFAULT_RELEASE_BUFFER int = 10000
const DEFAULT_SLAB_SIZE = DEFAULT_START_CHUNK_SIZE * 1024
const DEFAULT_START_CHUNK_SIZE = 256
Slab Manager Specific constants
const DEQUEUE_POLL_INTERVAL = 5
Poll Interval for dequeue thread
const INDEXER_INIT_DATA_PORT_ENDPOINT = "localhost:8101"
Data Port Endpoint for Local Indexer on which projector needs to send mutations for initial build stream
const INDEXER_MAINT_DATA_PORT_ENDPOINT = "localhost:8100"
Data Port Endpoint for Local Indexer on which projector needs to send mutations for maintenance stream
const INIT_TOPIC = "INIT_STREAM_TOPIC"
Initial Stream Topic Name
const KVPORT = "9000"
const KV_DCP_PORT = "11210"
const KV_DCP_PORT_CLUSTER_RUN = "12000"
const LOCALHOST = "127.0.0.1"
const MAINT_TOPIC = "MAINT_STREAM_TOPIC"
Maintenance Topic Name
const MAX_NUM_VBUCKETS = 1024
Max number of vbuckets supported in the system
const MAX_SNAPSHOTS_PER_INDEX = 100
Max number of snapshot to be retained per index. Older snapshots are deleted.
const MAX_STREAM_READER_WORKER_BUFFER = 1000
Buffer for each of stream reader worker to queue up mutations before processing
const NUM_WRITER_THREADS_PER_SLICE = 2
Default Number of threads for a Slice Writer
const PROJECTOR_PORT = "9999"
const SLICE_COMMAND_BUFFER_SIZE = 10000
Internal Buffer Size for Each Slice to store incoming requests
const SLICE_COMMIT_POLL_INTERVAL = 20
Time in milliseconds for a slice to poll for any outstanding writes before commit
const SYNC_COUNT_TS_TRIGGER = 1024 * 2
Number of Sync messages after which Timekeeper triggers a new Stability Timestamp
const WORKER_MSG_QUEUE_LEN = 100000
Supervisor's channel capacity to buffer requests from workers
Variables ¶
var HTTP_PREFIX string = "http://"
var KEY_SEPARATOR []byte = []byte{0xff, 0xff, 0xff, 0xff}
var NUM_VBUCKETS uint16
TODO move this to config
var PROJECTOR_ADMIN_PORT_ENDPOINT string
Functions ¶
func CreateMutationStreamReader ¶
func CreateMutationStreamReader(streamId common.StreamId, bucketQueueMap BucketQueueMap, supvCmdch MsgChannel, supvRespch MsgChannel, numWorkers int) ( MutationStreamReader, Message)
CreateMutationStreamReader creates a new mutation stream and starts a reader to listen and process the mutations. In case returned MutationStreamReader is nil, Message will have the error msg.
func GetLocalIP ¶
func NewAdminManager ¶
func NewAdminManager(supvCmdch MsgChannel, supvRespch MsgChannel) ( AdminManager, Message)
func NewAtomicMutationQueue ¶
func NewAtomicMutationQueue(numVbuckets uint16) *atomicMutationQueue
NewAtomicMutationQueue allocates a new Atomic Mutation Queue and initializes it
func NewCbqBridge ¶
func NewCbqBridge(supvCmdch MsgChannel, supvRespch MsgChannel) ( CbqBridge, Message)
func NewClustMgrSender ¶
func NewClustMgrSender(supvCmdch MsgChannel, supvRespch MsgChannel) ( ClustMgrSender, Message)
func NewForestDBSlice ¶
func NewForestDBSlice(name string, sliceId SliceId, idxDefnId common.IndexDefnId, idxInstId common.IndexInstId) (*fdbSlice, error)
NewForestDBSlice initiailizes a new slice with forestdb backend. Both main and back index gets initialized with default config. Slice methods are not thread-safe and application needs to handle the synchronization. The only exception being Insert and Delete can be called concurrently. Returns error in case slice cannot be initialized.
func NewIndexer ¶
func NewKVSender ¶
func NewKVSender(supvCmdch MsgChannel, supvRespch MsgChannel, numVbuckets uint16) (KVSender, Message)
func NewMutationManager ¶
func NewMutationManager(supvCmdch MsgChannel, supvRespch MsgChannel, numVbuckets uint16) (MutationManager, Message)
NewMutationManager creates a new Mutation Manager which listens for commands from Indexer. In case returned MutationManager is nil, Message will have the error msg. supvCmdch is a synchronous channel and every request on this channel is followed by a response on the same channel. Supervisor is expected to wait for the response before issuing a new request on this channel. supvRespch will be used by Mutation Manager to send any async error/info messages that may happen due to any downstream error or its own processing. Additionally, for Flush commands, a sync response is sent on supvCmdch to indicate flush has been initiated and once flush completes, another message is sent on supvRespch to indicate its completion or any error that may have happened. If supvRespch or supvCmdch is closed, mutation manager will termiate its loop.
func NewScanCoordinator ¶
func NewScanCoordinator(supvCmdch MsgChannel, supvRespch MsgChannel) ( ScanCoordinator, Message)
NewStorageManager returns an instance of scanCoordinator or err message It listens on supvCmdch for command and every command is followed by a synchronous response on the supvCmdch. Any async response to supervisor is sent to supvRespch. If supvCmdch get closed, ScanCoordinator will shut itself down.
func NewSlabManager ¶
func NewSlabManager(startChunkSize int, slabSize int, maxMemAlloc uint64) (SlabManager, Message)
NewSlabManager returns a slabManager struct instance with an initialized Arena
func NewSnapshotContainer ¶
func NewSnapshotContainer() *snapshotContainer
NewSnapshotContainer inits a new snapshotContainer and returns
func NewStorageManager ¶
func NewStorageManager(supvCmdch MsgChannel, supvRespch MsgChannel) ( StorageManager, Message)
NewStorageManager returns an instance of storageMgr or err message It listens on supvCmdch for command and every command is followed by a synchronous response of the supvCmdch. Any async response to supervisor is sent to supvRespch. If supvCmdch get closed, storageMgr will shut itself down.
func NewTimekeeper ¶
func NewTimekeeper(supvCmdch MsgChannel, supvRespch MsgChannel) ( Timekeeper, Message)
NewTimekeeper returns an instance of timekeeper or err message. It listens on supvCmdch for command and every command is followed by a synchronous response of the supvCmdch. Any async response to supervisor is sent to supvRespch. If supvCmdch get closed, storageMgr will shut itself down.
Types ¶
type AdminManager ¶
type AdminManager interface {
}
AdminManager listens to the admin port messages and relays it back to Indexer
type BucketFlushEnabledMap ¶
type BucketHWTMap ¶
type BucketIndexCountMap ¶
type BucketLastTsFlushedMap ¶
type BucketNewTsReqdMap ¶
type BucketQueueMap ¶
type BucketQueueMap map[string]IndexerMutationQueue
Map from bucket name to mutation queue
func CopyBucketQueueMap ¶
func CopyBucketQueueMap(inMap BucketQueueMap) BucketQueueMap
type BucketStopChMap ¶
type BucketStopChMap map[string]StopChannel
Map from bucket name to flusher stop channel
type BucketSyncCountMap ¶
type BucketTsListMap ¶
type CbqBridge ¶
type CbqBridge interface {
}
CbqBridge is a temporary solution to allow Cbq Engine to talk to Indexing
type ClustMgrSender ¶
type ClustMgrSender interface {
}
ClustMgrSender provides the mechanism to talk to Index Coordinator
type Counter ¶
type Counter interface {
CountTotal(stopch StopChannel) (uint64, error)
}
Counter is a class of algorithms that return total node count efficiently
type DoneChannel ¶
type DoneChannel chan bool
a generic channel which can be closed when you want to indicate the caller that you are done
type Exister ¶
type Exister interface {
Exists(key Key, stopch StopChannel) (bool, error)
}
Exister is a class of algorithms that allow testing if a key exists in the index
type Flusher ¶
type Flusher interface {
//PersistUptoTS will flush the mutation queue upto Timestamp provided.
//Can be stopped anytime by closing StopChannel.
//Sends SUCCESS on the MsgChannel when its done flushing till TS.
//Any error condition is reported back on the MsgChannel.
//Caller can wait on MsgChannel after closing StopChannel
//to get notified about shutdown completion.
PersistUptoTS(q MutationQueue, streamId common.StreamId, indexInstMap common.IndexInstMap,
indexPartnMap IndexPartnMap, ts Timestamp, stopch StopChannel) MsgChannel
//DrainUptoTS will flush the mutation queue upto Timestamp
//provided without actually persisting it.
//Can be stopped anytime by closing the StopChannel.
//Sends SUCCESS on the MsgChannel when its done flushing till timestamp.
//Any error condition is reported back on the MsgChannel.
//Caller can wait on MsgChannel after closing StopChannel
//to get notified about shutdown completion.
DrainUptoTS(q MutationQueue, streamId common.StreamId, ts Timestamp,
stopch StopChannel) MsgChannel
//Persist will keep flushing the mutation queue till caller closes
//the stop channel.Can be stopped anytime by closing the StopChannel.
//Any error condition is reported back on the MsgChannel.
//Caller can wait on MsgChannel after closing StopChannel to get
//notified about shutdown completion.
Persist(q MutationQueue, streamId common.StreamId, indexInstMap common.IndexInstMap,
indexPartnMap IndexPartnMap, stopch StopChannel) MsgChannel
//Drain will keep flushing the mutation queue till caller closes
//the stop channel without actually persisting the mutations.
//Can be stopped anytime by closing the StopChannel.
//Any error condition is reported back on the MsgChannel.
//Caller can wait on MsgChannel after closing StopChannel to get
//notified about shutdown completion.
Drain(q MutationQueue, streamId common.StreamId, stopch StopChannel) MsgChannel
//IsTimestampGreaterThanQueueLWT checks if each Vbucket in the Queue
//has mutation with Seqno lower than the corresponding Seqno present
//in the specified timestamp.
IsQueueLWTLowerThanTimestamp(q MutationQueue, ts Timestamp) bool
//GetQueueLWT returns the lowest seqno for each vbucket in the queue
GetQueueLWT(q MutationQueue) Timestamp
//GetQueueHWT returns the highest seqno for each vbucket in the queue
GetQueueHWT(q MutationQueue) Timestamp
}
Flusher is the only component which does read/dequeue from a MutationQueue. As MutationQueue has a restriction of only single reader and writer per vbucket, flusher should not be invoked concurrently for a single MutationQueue.
type ForestDBIterator ¶
type ForestDBIterator struct {
// contains filtered or unexported fields
}
ForestDBIterator taken from https://github.com/couchbaselabs/bleve/blob/master/index/store/goforestdb/iterator.go
func (*ForestDBIterator) Close ¶
func (f *ForestDBIterator) Close()
func (*ForestDBIterator) Key ¶
func (f *ForestDBIterator) Key() []byte
func (*ForestDBIterator) Next ¶
func (f *ForestDBIterator) Next()
func (*ForestDBIterator) Seek ¶
func (f *ForestDBIterator) Seek(key []byte)
func (*ForestDBIterator) SeekFirst ¶
func (f *ForestDBIterator) SeekFirst()
func (*ForestDBIterator) Valid ¶
func (f *ForestDBIterator) Valid() bool
func (*ForestDBIterator) Value ¶
func (f *ForestDBIterator) Value() []byte
type HashedSliceContainer ¶
hashedSliceContainer provides a hash based implementation for SliceContainer. Each IndexKey is hashed to determine which slice it belongs to.
func NewHashedSliceContainer ¶
func NewHashedSliceContainer() *HashedSliceContainer
NewHashedSliceContainer initializes a new HashedSliceContainer and returns
func (*HashedSliceContainer) AddSlice ¶
func (sc *HashedSliceContainer) AddSlice(id SliceId, s Slice)
AddSlice adds a slice to the container
func (*HashedSliceContainer) GetAllSlices ¶
func (sc *HashedSliceContainer) GetAllSlices() []Slice
GetAllSlices returns all slices from the container
func (*HashedSliceContainer) GetSliceById ¶
func (sc *HashedSliceContainer) GetSliceById(id SliceId) Slice
GetSliceById returns Slice for the given SliceId
func (*HashedSliceContainer) GetSliceByIndexKey ¶
func (sc *HashedSliceContainer) GetSliceByIndexKey(key common.IndexKey) Slice
GetSliceByIndexKey returns Slice for the given IndexKey This is a convenience method which calls other interface methods to first determine the sliceId from IndexKey and then the slice from sliceId
func (*HashedSliceContainer) GetSliceIdByIndexKey ¶
func (sc *HashedSliceContainer) GetSliceIdByIndexKey(key common.IndexKey) SliceId
GetSliceIdByIndexKey returns SliceId for the given IndexKey
func (*HashedSliceContainer) RemoveSlice ¶
func (sc *HashedSliceContainer) RemoveSlice(id SliceId)
RemoveSlice removes a slice from the container
func (*HashedSliceContainer) UpdateSlice ¶
func (sc *HashedSliceContainer) UpdateSlice(id SliceId, s Slice)
UpdateSlice updates an existing slice to the container
type Inclusion ¶
type Inclusion int
Inclusion controls how the boundaries values of a range are treated
type IndexError ¶
type IndexInfo ¶
type IndexInfo struct {
Name string `json:"name,omitempty"` // Name of the index
Uuid string `json:"uuid,omitempty"` // unique id for every index
Using common.IndexType `json:"using,omitempty"` // indexing algorithm
OnExprList []string `json:"onExprList,omitempty"` // expression list
Bucket string `json:"bucket,omitempty"` // bucket name
IsPrimary bool `json:"isPrimary,omitempty"`
Exprtype common.ExprType `json:"exprType,omitempty"`
}
Every index ever created and maintained by this package will have an associated index-info structure.
type IndexMetaResponse ¶
type IndexMetaResponse struct {
Status ResponseStatus `json:"status,omitempty"`
Indexes []IndexInfo `json:"indexes,omitempty"`
ServerUuid string `json:"serverUuid,omitempty"`
Nodes []NodeInfo `json:"nodes,omitempty"`
Errors []IndexError `json:"errors,omitempty"`
}
type IndexPartnMap ¶
type IndexPartnMap map[common.IndexInstId]PartitionInstMap
IndexPartnMap maps a IndexInstId to PartitionInstMap
func CopyIndexPartnMap ¶
func CopyIndexPartnMap(inMap IndexPartnMap) IndexPartnMap
func (IndexPartnMap) String ¶
func (pm IndexPartnMap) String() string
type IndexQueueMap ¶
type IndexQueueMap map[common.IndexInstId]IndexerMutationQueue
IndexQueueMap is a map between IndexId and IndexerMutationQueue
type IndexReader ¶
type IndexReader interface {
Counter
Ranger
RangeCounter
}
type IndexRequest ¶
type IndexRequest struct {
Type RequestType `json:"type,omitempty"`
Index IndexInfo `json:"index,omitempty"`
ServerUuid string `json:"serverUuid,omitempty"`
Params QueryParams `json:"params,omitempty"`
}
All API accept IndexRequest structure and returns IndexResponse structure. If application is written in Go, and compiled with `indexing` package then they can choose the access the underlying interfaces directly.
type IndexScanResponse ¶
type IndexScanResponse struct {
Status ResponseStatus `json:"status,omitempty"`
TotalRows uint64 `json:"totalrows,omitempty"`
Rows []IndexRow `json:"rows,omitempty"`
Errors []IndexError `json:"errors,omitempty"`
}
type IndexWriter ¶
type IndexWriter interface {
//Persist a key/value pair
Insert(key Key, value Value) error
//Delete a key/value pair by docId
Delete(docid []byte) error
//Commit the pending operations
Commit() error
//Snapshot
Snapshot() (Snapshot, error)
//Close the index. Should be able to reopen after this operation
Close() error
//Destroy/Wipe the index completely
Destroy() error
}
type IndexerMutationQueue ¶
type IndexerMutationQueue struct {
// contains filtered or unexported fields
}
IndexMutationQueue comprising of a mutation queue and a slab manager
type InitialBuildInfo ¶
type InitialBuildInfo struct {
// contains filtered or unexported fields
}
type KVSender ¶
type KVSender interface {
}
KVSender provides the mechanism to talk to KV(projector, router etc)
type Key ¶
type Key struct {
// contains filtered or unexported fields
}
Key is an array of JSON objects, per encoding/json
func NewKeyFromEncodedBytes ¶
func (*Key) EncodedBytes ¶
type Looker ¶
type Looker interface {
Exister
Lookup(key Key, stopch StopChannel) (chan Value, chan error)
KeySet(stop StopChannel) (chan Key, chan error)
ValueSet(stop StopChannel) (chan Value, chan error)
}
Looker is a class of algorithms that allow looking up a key in an index. Usually, being able to look up a key means we can iterate through all keys too, and so that is introduced here as well.
type MsgChannel ¶
type MsgChannel chan Message
type MsgCreateIndex ¶
type MsgCreateIndex struct {
// contains filtered or unexported fields
}
INDEXER_CREATE_INDEX_DDL
func (*MsgCreateIndex) GetIndexInst ¶
func (m *MsgCreateIndex) GetIndexInst() common.IndexInst
func (*MsgCreateIndex) GetMsgType ¶
func (m *MsgCreateIndex) GetMsgType() MsgType
func (*MsgCreateIndex) GetResponseChannel ¶
func (m *MsgCreateIndex) GetResponseChannel() MsgChannel
func (*MsgCreateIndex) GetString ¶
func (m *MsgCreateIndex) GetString() string
type MsgDropIndex ¶
type MsgDropIndex struct {
// contains filtered or unexported fields
}
INDEXER_DROP_INDEX_DDL
func (*MsgDropIndex) GetIndexInstId ¶
func (m *MsgDropIndex) GetIndexInstId() common.IndexInstId
func (*MsgDropIndex) GetMsgType ¶
func (m *MsgDropIndex) GetMsgType() MsgType
func (*MsgDropIndex) GetResponseChannel ¶
func (m *MsgDropIndex) GetResponseChannel() MsgChannel
func (*MsgDropIndex) GetString ¶
func (m *MsgDropIndex) GetString() string
type MsgError ¶
type MsgError struct {
// contains filtered or unexported fields
}
Error Message
func (*MsgError) GetMsgType ¶
type MsgGeneral ¶
type MsgGeneral struct {
// contains filtered or unexported fields
}
Generic Message
func (*MsgGeneral) GetMsgType ¶
func (m *MsgGeneral) GetMsgType() MsgType
type MsgMutMgrFlushDone ¶
type MsgMutMgrFlushDone struct {
// contains filtered or unexported fields
}
MUT_MGR_FLUSH_DONE
func (*MsgMutMgrFlushDone) GetBucket ¶
func (m *MsgMutMgrFlushDone) GetBucket() string
func (*MsgMutMgrFlushDone) GetMsgType ¶
func (m *MsgMutMgrFlushDone) GetMsgType() MsgType
func (*MsgMutMgrFlushDone) GetStreamId ¶
func (m *MsgMutMgrFlushDone) GetStreamId() common.StreamId
func (*MsgMutMgrFlushDone) GetTS ¶
func (m *MsgMutMgrFlushDone) GetTS() Timestamp
func (*MsgMutMgrFlushDone) String ¶
func (m *MsgMutMgrFlushDone) String() string
type MsgMutMgrFlushMutationQueue ¶
type MsgMutMgrFlushMutationQueue struct {
// contains filtered or unexported fields
}
MUT_MGR_PERSIST_MUTATION_QUEUE MUT_MGR_DISCARD_MUTATION_QUEUE
func (*MsgMutMgrFlushMutationQueue) GetBucket ¶
func (m *MsgMutMgrFlushMutationQueue) GetBucket() string
func (*MsgMutMgrFlushMutationQueue) GetMsgType ¶
func (m *MsgMutMgrFlushMutationQueue) GetMsgType() MsgType
func (*MsgMutMgrFlushMutationQueue) GetStreamId ¶
func (m *MsgMutMgrFlushMutationQueue) GetStreamId() common.StreamId
func (*MsgMutMgrFlushMutationQueue) GetTimestamp ¶
func (m *MsgMutMgrFlushMutationQueue) GetTimestamp() Timestamp
func (*MsgMutMgrFlushMutationQueue) String ¶
func (m *MsgMutMgrFlushMutationQueue) String() string
type MsgMutMgrGetTimestamp ¶
type MsgMutMgrGetTimestamp struct {
// contains filtered or unexported fields
}
MUT_MGR_GET_MUTATION_QUEUE_HWT MUT_MGR_GET_MUTATION_QUEUE_LWT
func (*MsgMutMgrGetTimestamp) GetBucket ¶
func (m *MsgMutMgrGetTimestamp) GetBucket() string
func (*MsgMutMgrGetTimestamp) GetMsgType ¶
func (m *MsgMutMgrGetTimestamp) GetMsgType() MsgType
func (*MsgMutMgrGetTimestamp) GetStreamId ¶
func (m *MsgMutMgrGetTimestamp) GetStreamId() common.StreamId
type MsgScanIndex ¶
type MsgScanIndex struct {
// contains filtered or unexported fields
}
SCAN_COORD_SCAN_INDEX
func (*MsgScanIndex) GetCountChannel ¶
func (m *MsgScanIndex) GetCountChannel() chan uint64
func (*MsgScanIndex) GetErrorChannel ¶
func (m *MsgScanIndex) GetErrorChannel() chan Message
func (*MsgScanIndex) GetIndexInstId ¶
func (m *MsgScanIndex) GetIndexInstId() common.IndexInstId
func (*MsgScanIndex) GetMsgType ¶
func (m *MsgScanIndex) GetMsgType() MsgType
func (*MsgScanIndex) GetParams ¶
func (m *MsgScanIndex) GetParams() ScanParams
func (*MsgScanIndex) GetResultChannel ¶
func (m *MsgScanIndex) GetResultChannel() chan Value
func (*MsgScanIndex) GetScanId ¶
func (m *MsgScanIndex) GetScanId() int64
func (*MsgScanIndex) GetStopChannel ¶
func (m *MsgScanIndex) GetStopChannel() StopChannel
type MsgStream ¶
type MsgStream struct {
// contains filtered or unexported fields
}
Stream Reader Message
func (*MsgStream) GetMsgType ¶
func (*MsgStream) GetMutationMeta ¶
func (m *MsgStream) GetMutationMeta() *MutationMeta
func (*MsgStream) GetStreamId ¶
type MsgStreamError ¶
type MsgStreamError struct {
// contains filtered or unexported fields
}
Stream Error Message
func (*MsgStreamError) GetError ¶
func (m *MsgStreamError) GetError() Error
func (*MsgStreamError) GetMsgType ¶
func (m *MsgStreamError) GetMsgType() MsgType
func (*MsgStreamError) GetStreamId ¶
func (m *MsgStreamError) GetStreamId() common.StreamId
type MsgStreamUpdate ¶
type MsgStreamUpdate struct {
// contains filtered or unexported fields
}
OPEN_STREAM ADD_INDEX_LIST_TO_STREAM REMOVE_INDEX_LIST_FROM_STREAM CLOSE_STREAM CLEANUP_STREAM
func (*MsgStreamUpdate) GetBucket ¶
func (m *MsgStreamUpdate) GetBucket() string
func (*MsgStreamUpdate) GetIndexList ¶
func (m *MsgStreamUpdate) GetIndexList() []common.IndexInst
func (*MsgStreamUpdate) GetMsgType ¶
func (m *MsgStreamUpdate) GetMsgType() MsgType
func (*MsgStreamUpdate) GetResponseChannel ¶
func (m *MsgStreamUpdate) GetResponseChannel() MsgChannel
func (*MsgStreamUpdate) GetStreamId ¶
func (m *MsgStreamUpdate) GetStreamId() common.StreamId
func (*MsgStreamUpdate) GetTimestamp ¶
func (m *MsgStreamUpdate) GetTimestamp() Timestamp
func (*MsgStreamUpdate) String ¶
func (m *MsgStreamUpdate) String() string
type MsgSuccess ¶
type MsgSuccess struct {
}
Success Message
func (*MsgSuccess) GetMsgType ¶
func (m *MsgSuccess) GetMsgType() MsgType
type MsgTKEnableFlush ¶
type MsgTKEnableFlush struct {
// contains filtered or unexported fields
}
TK_ENABLE_FLUSH
func (*MsgTKEnableFlush) GetBucket ¶
func (m *MsgTKEnableFlush) GetBucket() string
func (*MsgTKEnableFlush) GetMsgType ¶
func (m *MsgTKEnableFlush) GetMsgType() MsgType
func (*MsgTKEnableFlush) GetStreamId ¶
func (m *MsgTKEnableFlush) GetStreamId() common.StreamId
type MsgTKInitBuildDone ¶
type MsgTKInitBuildDone struct {
// contains filtered or unexported fields
}
TK_INIT_BUILD_DONE
func (*MsgTKInitBuildDone) GetBucket ¶
func (m *MsgTKInitBuildDone) GetBucket() string
func (*MsgTKInitBuildDone) GetMsgType ¶
func (m *MsgTKInitBuildDone) GetMsgType() MsgType
func (*MsgTKInitBuildDone) GetResponseChannel ¶
func (m *MsgTKInitBuildDone) GetResponseChannel() MsgChannel
func (*MsgTKInitBuildDone) GetStreamId ¶
func (m *MsgTKInitBuildDone) GetStreamId() common.StreamId
func (*MsgTKInitBuildDone) GetTimestamp ¶
func (m *MsgTKInitBuildDone) GetTimestamp() Timestamp
type MsgTKMergeStream ¶
type MsgTKMergeStream struct {
// contains filtered or unexported fields
}
TK_MERGE_STREAM
func (*MsgTKMergeStream) GetBucket ¶
func (m *MsgTKMergeStream) GetBucket() string
func (*MsgTKMergeStream) GetMergeTS ¶
func (m *MsgTKMergeStream) GetMergeTS() Timestamp
func (*MsgTKMergeStream) GetMsgType ¶
func (m *MsgTKMergeStream) GetMsgType() MsgType
func (*MsgTKMergeStream) GetStreamId ¶
func (m *MsgTKMergeStream) GetStreamId() common.StreamId
type MsgTKStabilityTS ¶
type MsgTKStabilityTS struct {
// contains filtered or unexported fields
}
TK_STABILITY_TIMESTAMP
func (*MsgTKStabilityTS) GetBucket ¶
func (m *MsgTKStabilityTS) GetBucket() string
func (*MsgTKStabilityTS) GetMsgType ¶
func (m *MsgTKStabilityTS) GetMsgType() MsgType
func (*MsgTKStabilityTS) GetStreamId ¶
func (m *MsgTKStabilityTS) GetStreamId() common.StreamId
func (*MsgTKStabilityTS) GetTimestamp ¶
func (m *MsgTKStabilityTS) GetTimestamp() Timestamp
func (*MsgTKStabilityTS) String ¶
func (m *MsgTKStabilityTS) String() string
type MsgTimestamp ¶
type MsgTimestamp struct {
// contains filtered or unexported fields
}
Timestamp Message
func (*MsgTimestamp) GetMsgType ¶
func (m *MsgTimestamp) GetMsgType() MsgType
func (*MsgTimestamp) GetTimestamp ¶
func (m *MsgTimestamp) GetTimestamp() Timestamp
type MsgUpdateBucketQueue ¶
type MsgUpdateBucketQueue struct {
// contains filtered or unexported fields
}
STREAM_READER_UPDATE_QUEUE_MAP
func (*MsgUpdateBucketQueue) GetBucketQueueMap ¶
func (m *MsgUpdateBucketQueue) GetBucketQueueMap() BucketQueueMap
func (*MsgUpdateBucketQueue) GetMsgType ¶
func (m *MsgUpdateBucketQueue) GetMsgType() MsgType
func (*MsgUpdateBucketQueue) String ¶
func (m *MsgUpdateBucketQueue) String() string
type MsgUpdateInstMap ¶
type MsgUpdateInstMap struct {
// contains filtered or unexported fields
}
UPDATE_INSTANCE_MAP
func (*MsgUpdateInstMap) GetIndexInstMap ¶
func (m *MsgUpdateInstMap) GetIndexInstMap() common.IndexInstMap
func (*MsgUpdateInstMap) GetMsgType ¶
func (m *MsgUpdateInstMap) GetMsgType() MsgType
func (*MsgUpdateInstMap) String ¶
func (m *MsgUpdateInstMap) String() string
type MsgUpdatePartnMap ¶
type MsgUpdatePartnMap struct {
// contains filtered or unexported fields
}
UPDATE_PARTITION_MAP
func (*MsgUpdatePartnMap) GetIndexPartnMap ¶
func (m *MsgUpdatePartnMap) GetIndexPartnMap() IndexPartnMap
func (*MsgUpdatePartnMap) GetMsgType ¶
func (m *MsgUpdatePartnMap) GetMsgType() MsgType
func (*MsgUpdatePartnMap) String ¶
func (m *MsgUpdatePartnMap) String() string
type MutationChannel ¶
type MutationChannel chan *MutationKeys
type MutationKeys ¶
type MutationKeys struct {
// contains filtered or unexported fields
}
MutationKeys holds the Secondary Keys from a single KV Mutation
type MutationManager ¶
type MutationManager interface {
}
MutationManager handles messages from Indexer to manage Mutation Streams and flush mutations from mutation queues.
type MutationMeta ¶
type MutationMeta struct {
// contains filtered or unexported fields
}
MutationMeta represents meta information for a KV Mutation
type MutationQueue ¶
type MutationQueue interface {
//enqueue a mutation reference based on vbucket
Enqueue(mutation *MutationKeys, vbucket Vbucket) error
//dequeue a vbucket's mutation and keep sending on a channel until stop signal
Dequeue(vbucket Vbucket) (<-chan *MutationKeys, chan<- bool, error)
//dequeue a vbucket's mutation upto seqno(wait if not available)
DequeueUptoSeqno(vbucket Vbucket, seqno Seqno) (<-chan *MutationKeys, error)
//dequeue single element for a vbucket and return
DequeueSingleElement(vbucket Vbucket) *MutationKeys
//return reference to a vbucket's mutation at Tail of queue without dequeue
PeekTail(vbucket Vbucket) *MutationKeys
//return reference to a vbucket's mutation at Head of queue without dequeue
PeekHead(vbucket Vbucket) *MutationKeys
//return size of queue per vbucket
GetSize(vbucket Vbucket) int64
//returns the numbers of vbuckets for the queue
GetNumVbuckets() uint16
}
MutationQueue interface specifies methods which a mutation queue for indexer needs to implement
type MutationStreamReader ¶
type MutationStreamReader interface {
Shutdown()
}
MutationStreamReader reads a Dataport and stores the incoming mutations in mutation queue. This is the only component writing to a mutation queue.
type NodeInfo ¶
type NodeInfo struct {
IndexerURL string `json:"indexerURL,omitempty"`
}
Indexer Node Info
type PartitionInst ¶
type PartitionInst struct {
Defn common.PartitionDefn
Sc SliceContainer
}
PartitionInst contains the partition definition and a SliceContainer to manage all the slices storing the partition's data
func (PartitionInst) String ¶
func (pi PartitionInst) String() string
type PartitionInstMap ¶
type PartitionInstMap map[common.PartitionId]PartitionInst
PartitionInstMap maps a PartitionId to PartitionInst
type QueryParams ¶
type QueryParams struct {
ScanType ScanType `json:"scanType,omitempty"`
Low [][]byte `json:"low,omitempty"`
High [][]byte `json:"high,omitempty"`
Inclusion Inclusion `json:"inclusion,omitempty"`
Limit int64 `json:"limit,omitempty"`
}
URL encoded query params
type RangeCounter ¶
type RangeCounter interface {
CountRange(low Key, high Key, inclusion Inclusion, stopch StopChannel) (
uint64, error)
}
RangeCounter is a class of algorithms that can count a range efficiently
type Ranger ¶
type Ranger interface {
Looker
KeyRange(low, high Key, inclusion Inclusion, stopch StopChannel) (
chan Key, chan error, SortOrder)
ValueRange(low, high Key, inclusion Inclusion, stopch StopChannel) (
chan Value, chan error, SortOrder)
}
Ranger is a class of algorithms that can extract a range of keys from the index.
type RequestType ¶
type RequestType string
const ( CREATE RequestType = "create" DROP RequestType = "drop" LIST RequestType = "list" NOTIFY RequestType = "notify" NODES RequestType = "nodes" SCAN RequestType = "scan" STATS RequestType = "stats" )
type ResponseStatus ¶
type ResponseStatus string
RESPONSE DATA FORMATS
const ( RESP_SUCCESS ResponseStatus = "success" RESP_ERROR ResponseStatus = "error" RESP_INVALID_CACHE ResponseStatus = "invalid_cache" )
type ScanCoordinator ¶
type ScanCoordinator interface {
}
type ScanParams ¶
type ScanParams struct {
// contains filtered or unexported fields
}
type SlabManager ¶
type SlabManager interface {
//AllocBuf allocates a buffer of given size. If returned buffer is nil,
//Message will have error message
AllocBuf(bufSize int) ([]byte, Message)
//ReleaseBuf releases the buffer back to free pool
ReleaseBuf(buf []byte) bool
//SetMaxMemoryLimit sets the maximum memory that can be allocated
SetMaxMemoryLimit(maxMemAlloc uint64) bool
//GetMaxMemoryLimit returns the maximum memory that can be allocated
GetMaxMemoryLimit() uint64
}
type Slice ¶
type Slice interface {
Id() SliceId
Name() string
Status() SliceStatus
IndexInstId() common.IndexInstId
IndexDefnId() common.IndexDefnId
IsActive() bool
SetActive(bool)
SetStatus(SliceStatus)
GetSnapshotContainer() SnapshotContainer
IndexWriter
}
Slice represents the unit of physical storage for index
type SliceContainer ¶
type SliceContainer interface {
//Add Slice to container
AddSlice(SliceId, Slice)
//Update existing slice
UpdateSlice(SliceId, Slice)
//Remove existing slice
RemoveSlice(SliceId)
//Return Slice for the given IndexKey
GetSliceByIndexKey(common.IndexKey) Slice
//Return SliceId for the given IndexKey
GetSliceIdByIndexKey(common.IndexKey) SliceId
//Return Slice for the given SliceId
GetSliceById(SliceId) Slice
//Return all Slices
GetAllSlices() []Slice
}
SliceContainer contains all slices for an index partition and provides methods to determine how data is distributed in multiple slices for a single partition
type SliceStatus ¶
type SliceStatus int16
const ( //Slice is warming up(open db files etc), not ready for operations SLICE_STATUS_PREPARING SliceStatus = iota //Ready for operations SLICE_STATUS_ACTIVE //Marked for deletion SLICE_STATUS_TERMINATE )
type Snapshot ¶
type Snapshot interface {
IndexReader
Open() error
Close() error
IsOpen() bool
Id() SliceId
IndexInstId() common.IndexInstId
IndexDefnId() common.IndexDefnId
Timestamp() Timestamp
SetTimestamp(Timestamp)
}
Snapshot interface
type SnapshotContainer ¶
type SnapshotContainer interface {
Add(Snapshot)
RemoveOldest() Snapshot
Len() int
GetLatestSnapshot() Snapshot
GetSnapshotEqualToTS(Timestamp) Snapshot
GetSnapshotRecentThanTS(Timestamp) Snapshot
}
SnapshotContainer manages snapshots for a Slice
type SortOrder ¶
type SortOrder string
SortOrder characterizes if the algorithm emits keys in a predictable order
type StopChannel ¶
type StopChannel chan bool
a generic channel which can be closed when you want someone to stop doing something
type StorageManager ¶
type StorageManager interface {
}
type StreamAddressMap ¶
var StreamAddrMap StreamAddressMap
type StreamStatus ¶
type StreamStatusMap ¶
type Timekeeper ¶
type Timekeeper interface {
}
Timekeeper manages the Stability Timestamp Generation and also keeps track of the HWTimestamp for each bucket
type Timestamp ¶
type Timestamp []Seqno
list of seqno per vbucket
func CopyTimestamp ¶
func NewTimestamp ¶
func NewTimestamp() Timestamp
func (Timestamp) GreaterThan ¶
GreaterThan returns true if the timestamp is greater than given timestamp
func (Timestamp) GreaterThanEqual ¶
GreaterThanEqual returns true if the given timestamp is matching or greater
type Value ¶
type Value struct {
// contains filtered or unexported fields
}
Value is the primary key of the relavent document
func (*Value) EncodedBytes ¶
Source Files
¶
- admin_manager.go
- cbq_bridge.go
- cbq_bridge_defs.go
- cluster_manager_sender.go
- common.go
- constant.go
- error.go
- flusher.go
- forestdb_iterator.go
- forestdb_slice_writer.go
- forestdb_snapshot.go
- forestdb_snapshot_reader.go
- index_reader.go
- index_writer.go
- indexer.go
- kv.go
- kv_sender.go
- message.go
- mutation_manager.go
- mutation_queue_atomic.go
- partition_instance.go
- scan_coordinator.go
- slab_manager.go
- slice.go
- slice_container.go
- snapshot.go
- snapshot_container.go
- storage_manager.go
- stream_reader.go
- timekeeper.go
- timestamp.go
- util.go