indexer

package
v0.0.0-...-71e7410 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2014 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ERROR_PANIC errCode = iota

	//Slab Manager
	ERROR_SLAB_INIT
	ERROR_SLAB_BAD_ALLOC_REQUEST
	ERROR_SLAB_INTERNAL_ALLOC_ERROR
	ERROR_SLAB_MEM_LIMIT_EXCEED
	ERROR_SLAB_INTERNAL_ERROR

	//Stream Reader
	ERROR_STREAM_INIT
	ERROR_STREAM_READER_UNKNOWN_COMMAND
	ERROR_STREAM_READER_STREAM_SHUTDOWN
	ERROR_STREAM_READER_RESTART_VBUCKETS
	ERROR_STREAM_READER_UNKNOWN_ERROR
	ERROR_STREAM_READER_PANIC

	//Mutation Manager
	ERROR_MUT_MGR_INTERNAL_ERROR
	ERROR_MUT_MGR_STREAM_ALREADY_OPEN
	ERROR_MUT_MGR_STREAM_ALREADY_CLOSED
	ERROR_MUT_MGR_UNKNOWN_COMMAND
	ERROR_MUT_MGR_UNCLEAN_SHUTDOWN
	ERROR_MUT_MGR_PANIC

	//Mutation Queue
	ERROR_MUTATION_QUEUE_INIT

	//Timekeeper
	ERROR_TK_UNKNOWN_STREAM

	//KVSender
	ERROR_KVSENDER_UNKNOWN_INDEX
	ERROR_KVSENDER_STREAM_ALREADY_OPEN
	ERROR_KVSENDER_STREAM_REQUEST_ERROR
	ERROR_KV_SENDER_UNKNOWN_STREAM
	ERROR_KVSENDER_STREAM_ALREADY_CLOSED

	//ScanCoordinator
	ERROR_SCAN_COORD_UNKNOWN_COMMAND
	ERROR_SCAN_COORD_INTERNAL_ERROR

	//INDEXER
	ERROR_INDEX_ALREADY_EXISTS
	ERROR_INDEXER_INTERNAL_ERROR
	ERROR_INDEX_BUILD_IN_PROGRESS
	ERROR_INDEXER_UNKNOWN_INDEX
)
View Source
const (
	FATAL errSeverity = iota
	NORMAL
)
View Source
const (
	MESSAGING errCategory = iota
	STORAGE
	MUTATION_QUEUE
	TOPOLOGY
	STREAM_READER
	SLAB_MANAGER
	MUTATION_MANAGER
	TIMEKEEPER
	SCAN_COORD
	INDEXER
)
View Source
const (
	Unsorted SortOrder = "none"
	Asc                = "asc"
	Desc               = "desc"
)
View Source
const (

	//General Messages
	MSG_SUCCESS = iota
	MSG_ERROR
	MSG_TIMESTAMP

	//STREAM_READER
	STREAM_READER_STREAM_DROP_DATA
	STREAM_READER_STREAM_BEGIN
	STREAM_READER_STREAM_END
	STREAM_READER_SYNC
	STREAM_READER_UPDATE_QUEUE_MAP
	STREAM_READER_ERROR
	STREAM_READER_SHUTDOWN

	//MUTATION_MANAGER
	MUT_MGR_PERSIST_MUTATION_QUEUE
	MUT_MGR_DRAIN_MUTATION_QUEUE
	MUT_MGR_GET_MUTATION_QUEUE_HWT
	MUT_MGR_GET_MUTATION_QUEUE_LWT
	MUT_MGR_SHUTDOWN
	MUT_MGR_FLUSH_DONE

	//TIMEKEEPER
	TK_SHUTDOWN
	TK_STABILITY_TIMESTAMP
	TK_INIT_BUILD_DONE
	TK_ENABLE_FLUSH
	TK_MERGE_STREAM

	//STORAGE_MANAGER
	STORAGE_MGR_SHUTDOWN

	//KVSender
	KV_SENDER_SHUTDOWN
	KV_SENDER_GET_CURR_KV_TS

	//ADMIN_MGR
	ADMIN_MGR_SHUTDOWN

	//CLUSTER_MGR
	CLUST_MGR_SENDER_SHUTDOWN

	//CBQ_BRIDGE_SHUTDOWN
	CBQ_BRIDGE_SHUTDOWN

	//INDEXER
	INDEXER_CREATE_INDEX_DDL
	INDEXER_DROP_INDEX_DDL

	//SCAN COORDINATOR
	SCAN_COORD_SCAN_INDEX
	SCAN_COORD_SCAN_PARTITION
	SCAN_COORD_SCAN_SLICE
	SCAN_COORD_SHUTDOWN

	//COMMON
	UPDATE_INDEX_INSTANCE_MAP
	UPDATE_INDEX_PARTITION_MAP

	OPEN_STREAM
	ADD_INDEX_LIST_TO_STREAM
	REMOVE_INDEX_LIST_FROM_STREAM
	CLOSE_STREAM
	CLEANUP_STREAM
)
View Source
const CBQ_BRIDGE_HTTP_ADDR = ":9101"

Cbq Bridge Http Address on which it listens to messages from Cbq Server

View Source
const DEFAULT_GROWTH_FACTOR float64 = 2.0
View Source
const DEFAULT_MAX_SLAB_MEMORY = DEFAULT_SLAB_SIZE * 1024
View Source
const DEFAULT_NUM_STREAM_READER_WORKERS = 8

Default Number of Workers started by a stream reader to processed incoming mutation. Max can be upto the number of vbuckets and minimum must be equal to the number of vbuckets

View Source
const DEFAULT_POOL = "default"

Default Pool Name

View Source
const DEFAULT_PROJECTOR_ADMIN_PORT_ENDPOINT = "localhost:9999"

Projector Admin Port Endpoint on which projector is listening for admin requests

View Source
const DEFAULT_RELEASE_BUFFER int = 10000
View Source
const DEFAULT_SLAB_SIZE = DEFAULT_START_CHUNK_SIZE * 1024
View Source
const DEFAULT_START_CHUNK_SIZE = 256

Slab Manager Specific constants

View Source
const DEQUEUE_POLL_INTERVAL = 5

Poll Interval for dequeue thread

View Source
const INDEXER_INIT_DATA_PORT_ENDPOINT = "localhost:8101"

Data Port Endpoint for Local Indexer on which projector needs to send mutations for initial build stream

View Source
const INDEXER_MAINT_DATA_PORT_ENDPOINT = "localhost:8100"

Data Port Endpoint for Local Indexer on which projector needs to send mutations for maintenance stream

View Source
const INIT_TOPIC = "INIT_STREAM_TOPIC"

Initial Stream Topic Name

View Source
const KVPORT = "9000"
View Source
const KV_DCP_PORT = "11210"
View Source
const KV_DCP_PORT_CLUSTER_RUN = "12000"
View Source
const LOCALHOST = "127.0.0.1"
View Source
const MAINT_TOPIC = "MAINT_STREAM_TOPIC"

Maintenance Topic Name

View Source
const MAX_NUM_VBUCKETS = 1024

Max number of vbuckets supported in the system

View Source
const MAX_SNAPSHOTS_PER_INDEX = 100

Max number of snapshot to be retained per index. Older snapshots are deleted.

View Source
const MAX_STREAM_READER_WORKER_BUFFER = 1000

Buffer for each of stream reader worker to queue up mutations before processing

View Source
const NUM_WRITER_THREADS_PER_SLICE = 2

Default Number of threads for a Slice Writer

View Source
const PROJECTOR_PORT = "9999"
View Source
const SLICE_COMMAND_BUFFER_SIZE = 10000

Internal Buffer Size for Each Slice to store incoming requests

View Source
const SLICE_COMMIT_POLL_INTERVAL = 20

Time in milliseconds for a slice to poll for any outstanding writes before commit

View Source
const SYNC_COUNT_TS_TRIGGER = 1024 * 2

Number of Sync messages after which Timekeeper triggers a new Stability Timestamp

View Source
const WORKER_MSG_QUEUE_LEN = 100000

Supervisor's channel capacity to buffer requests from workers

Variables

View Source
var HTTP_PREFIX string = "http://"
View Source
var KEY_SEPARATOR []byte = []byte{0xff, 0xff, 0xff, 0xff}
View Source
var NUM_VBUCKETS uint16

TODO move this to config

View Source
var PROJECTOR_ADMIN_PORT_ENDPOINT string

Functions

func CreateMutationStreamReader

func CreateMutationStreamReader(streamId common.StreamId, bucketQueueMap BucketQueueMap,
	supvCmdch MsgChannel, supvRespch MsgChannel, numWorkers int) (
	MutationStreamReader, Message)

CreateMutationStreamReader creates a new mutation stream and starts a reader to listen and process the mutations. In case returned MutationStreamReader is nil, Message will have the error msg.

func GetLocalIP

func GetLocalIP() (net.IP, error)

func IsIPLocal

func IsIPLocal(ip string) bool

func NewAdminManager

func NewAdminManager(supvCmdch MsgChannel, supvRespch MsgChannel) (
	AdminManager, Message)

func NewAtomicMutationQueue

func NewAtomicMutationQueue(numVbuckets uint16) *atomicMutationQueue

NewAtomicMutationQueue allocates a new Atomic Mutation Queue and initializes it

func NewCbqBridge

func NewCbqBridge(supvCmdch MsgChannel, supvRespch MsgChannel) (
	CbqBridge, Message)

func NewClustMgrSender

func NewClustMgrSender(supvCmdch MsgChannel, supvRespch MsgChannel) (
	ClustMgrSender, Message)

func NewFlusher

func NewFlusher() *flusher

NewFlusher returns new instance of flusher

func NewForestDBSlice

func NewForestDBSlice(name string, sliceId SliceId, idxDefnId common.IndexDefnId,
	idxInstId common.IndexInstId) (*fdbSlice, error)

NewForestDBSlice initiailizes a new slice with forestdb backend. Both main and back index gets initialized with default config. Slice methods are not thread-safe and application needs to handle the synchronization. The only exception being Insert and Delete can be called concurrently. Returns error in case slice cannot be initialized.

func NewIndexer

func NewIndexer(numVbuckets uint16) (Indexer, Message)

func NewKVSender

func NewKVSender(supvCmdch MsgChannel, supvRespch MsgChannel,
	numVbuckets uint16) (KVSender, Message)

func NewMutationManager

func NewMutationManager(supvCmdch MsgChannel, supvRespch MsgChannel,
	numVbuckets uint16) (MutationManager, Message)

NewMutationManager creates a new Mutation Manager which listens for commands from Indexer. In case returned MutationManager is nil, Message will have the error msg. supvCmdch is a synchronous channel and every request on this channel is followed by a response on the same channel. Supervisor is expected to wait for the response before issuing a new request on this channel. supvRespch will be used by Mutation Manager to send any async error/info messages that may happen due to any downstream error or its own processing. Additionally, for Flush commands, a sync response is sent on supvCmdch to indicate flush has been initiated and once flush completes, another message is sent on supvRespch to indicate its completion or any error that may have happened. If supvRespch or supvCmdch is closed, mutation manager will termiate its loop.

func NewScanCoordinator

func NewScanCoordinator(supvCmdch MsgChannel, supvRespch MsgChannel) (
	ScanCoordinator, Message)

NewStorageManager returns an instance of scanCoordinator or err message It listens on supvCmdch for command and every command is followed by a synchronous response on the supvCmdch. Any async response to supervisor is sent to supvRespch. If supvCmdch get closed, ScanCoordinator will shut itself down.

func NewSlabManager

func NewSlabManager(startChunkSize int, slabSize int, maxMemAlloc uint64) (SlabManager, Message)

NewSlabManager returns a slabManager struct instance with an initialized Arena

func NewSnapshotContainer

func NewSnapshotContainer() *snapshotContainer

NewSnapshotContainer inits a new snapshotContainer and returns

func NewStorageManager

func NewStorageManager(supvCmdch MsgChannel, supvRespch MsgChannel) (
	StorageManager, Message)

NewStorageManager returns an instance of storageMgr or err message It listens on supvCmdch for command and every command is followed by a synchronous response of the supvCmdch. Any async response to supervisor is sent to supvRespch. If supvCmdch get closed, storageMgr will shut itself down.

func NewTimekeeper

func NewTimekeeper(supvCmdch MsgChannel, supvRespch MsgChannel) (
	Timekeeper, Message)

NewTimekeeper returns an instance of timekeeper or err message. It listens on supvCmdch for command and every command is followed by a synchronous response of the supvCmdch. Any async response to supervisor is sent to supvRespch. If supvCmdch get closed, storageMgr will shut itself down.

Types

type AdminManager

type AdminManager interface {
}

AdminManager listens to the admin port messages and relays it back to Indexer

type BucketFlushEnabledMap

type BucketFlushEnabledMap map[string]bool

type BucketFlushInProgressMap

type BucketFlushInProgressMap map[string]bool

type BucketHWTMap

type BucketHWTMap map[string]Timestamp

type BucketIndexCountMap

type BucketIndexCountMap map[string]int

type BucketLastTsFlushedMap

type BucketLastTsFlushedMap map[string]Timestamp

type BucketNewTsReqdMap

type BucketNewTsReqdMap map[string]bool

type BucketQueueMap

type BucketQueueMap map[string]IndexerMutationQueue

Map from bucket name to mutation queue

func CopyBucketQueueMap

func CopyBucketQueueMap(inMap BucketQueueMap) BucketQueueMap

type BucketStopChMap

type BucketStopChMap map[string]StopChannel

Map from bucket name to flusher stop channel

type BucketSyncCountMap

type BucketSyncCountMap map[string]uint64

type BucketTsListMap

type BucketTsListMap map[string]*list.List

type CbqBridge

type CbqBridge interface {
}

CbqBridge is a temporary solution to allow Cbq Engine to talk to Indexing

type ClustMgrSender

type ClustMgrSender interface {
}

ClustMgrSender provides the mechanism to talk to Index Coordinator

type Counter

type Counter interface {
	CountTotal(stopch StopChannel) (uint64, error)
}

Counter is a class of algorithms that return total node count efficiently

type DoneChannel

type DoneChannel chan bool

a generic channel which can be closed when you want to indicate the caller that you are done

type Error

type Error struct {
	// contains filtered or unexported fields
}

type Exister

type Exister interface {
	Exists(key Key, stopch StopChannel) (bool, error)
}

Exister is a class of algorithms that allow testing if a key exists in the index

type Flusher

type Flusher interface {

	//PersistUptoTS will flush the mutation queue upto Timestamp provided.
	//Can be stopped anytime by closing StopChannel.
	//Sends SUCCESS on the MsgChannel when its done flushing till TS.
	//Any error condition is reported back on the MsgChannel.
	//Caller can wait on MsgChannel after closing StopChannel
	//to get notified about shutdown completion.
	PersistUptoTS(q MutationQueue, streamId common.StreamId, indexInstMap common.IndexInstMap,
		indexPartnMap IndexPartnMap, ts Timestamp, stopch StopChannel) MsgChannel

	//DrainUptoTS will flush the mutation queue upto Timestamp
	//provided without actually persisting it.
	//Can be stopped anytime by closing the StopChannel.
	//Sends SUCCESS on the MsgChannel when its done flushing till timestamp.
	//Any error condition is reported back on the MsgChannel.
	//Caller can wait on MsgChannel after closing StopChannel
	//to get notified about shutdown completion.
	DrainUptoTS(q MutationQueue, streamId common.StreamId, ts Timestamp,
		stopch StopChannel) MsgChannel

	//Persist will keep flushing the mutation queue till caller closes
	//the stop channel.Can be stopped anytime by closing the StopChannel.
	//Any error condition is reported back on the MsgChannel.
	//Caller can wait on MsgChannel after closing StopChannel to get
	//notified about shutdown completion.
	Persist(q MutationQueue, streamId common.StreamId, indexInstMap common.IndexInstMap,
		indexPartnMap IndexPartnMap, stopch StopChannel) MsgChannel

	//Drain will keep flushing the mutation queue till caller closes
	//the stop channel without actually persisting the mutations.
	//Can be stopped anytime by closing the StopChannel.
	//Any error condition is reported back on the MsgChannel.
	//Caller can wait on MsgChannel after closing StopChannel to get
	//notified about shutdown completion.
	Drain(q MutationQueue, streamId common.StreamId, stopch StopChannel) MsgChannel

	//IsTimestampGreaterThanQueueLWT checks if each Vbucket in the Queue
	//has mutation with Seqno lower than the corresponding Seqno present
	//in the specified timestamp.
	IsQueueLWTLowerThanTimestamp(q MutationQueue, ts Timestamp) bool

	//GetQueueLWT returns the lowest seqno for each vbucket in the queue
	GetQueueLWT(q MutationQueue) Timestamp

	//GetQueueHWT returns the highest seqno for each vbucket in the queue
	GetQueueHWT(q MutationQueue) Timestamp
}

Flusher is the only component which does read/dequeue from a MutationQueue. As MutationQueue has a restriction of only single reader and writer per vbucket, flusher should not be invoked concurrently for a single MutationQueue.

type ForestDBIterator

type ForestDBIterator struct {
	// contains filtered or unexported fields
}

ForestDBIterator taken from https://github.com/couchbaselabs/bleve/blob/master/index/store/goforestdb/iterator.go

func (*ForestDBIterator) Close

func (f *ForestDBIterator) Close()

func (*ForestDBIterator) Current

func (f *ForestDBIterator) Current() ([]byte, []byte, bool)

func (*ForestDBIterator) Key

func (f *ForestDBIterator) Key() []byte

func (*ForestDBIterator) Next

func (f *ForestDBIterator) Next()

func (*ForestDBIterator) Seek

func (f *ForestDBIterator) Seek(key []byte)

func (*ForestDBIterator) SeekFirst

func (f *ForestDBIterator) SeekFirst()

func (*ForestDBIterator) Valid

func (f *ForestDBIterator) Valid() bool

func (*ForestDBIterator) Value

func (f *ForestDBIterator) Value() []byte

type HashedSliceContainer

type HashedSliceContainer struct {
	SliceMap  map[SliceId]Slice
	NumSlices int
}

hashedSliceContainer provides a hash based implementation for SliceContainer. Each IndexKey is hashed to determine which slice it belongs to.

func NewHashedSliceContainer

func NewHashedSliceContainer() *HashedSliceContainer

NewHashedSliceContainer initializes a new HashedSliceContainer and returns

func (*HashedSliceContainer) AddSlice

func (sc *HashedSliceContainer) AddSlice(id SliceId, s Slice)

AddSlice adds a slice to the container

func (*HashedSliceContainer) GetAllSlices

func (sc *HashedSliceContainer) GetAllSlices() []Slice

GetAllSlices returns all slices from the container

func (*HashedSliceContainer) GetSliceById

func (sc *HashedSliceContainer) GetSliceById(id SliceId) Slice

GetSliceById returns Slice for the given SliceId

func (*HashedSliceContainer) GetSliceByIndexKey

func (sc *HashedSliceContainer) GetSliceByIndexKey(key common.IndexKey) Slice

GetSliceByIndexKey returns Slice for the given IndexKey This is a convenience method which calls other interface methods to first determine the sliceId from IndexKey and then the slice from sliceId

func (*HashedSliceContainer) GetSliceIdByIndexKey

func (sc *HashedSliceContainer) GetSliceIdByIndexKey(key common.IndexKey) SliceId

GetSliceIdByIndexKey returns SliceId for the given IndexKey

func (*HashedSliceContainer) RemoveSlice

func (sc *HashedSliceContainer) RemoveSlice(id SliceId)

RemoveSlice removes a slice from the container

func (*HashedSliceContainer) UpdateSlice

func (sc *HashedSliceContainer) UpdateSlice(id SliceId, s Slice)

UpdateSlice updates an existing slice to the container

type Inclusion

type Inclusion int

Inclusion controls how the boundaries values of a range are treated

const (
	Neither Inclusion = iota
	Low
	High
	Both
)

type IndexError

type IndexError struct {
	Code string `json:"code,omitempty"`
	Msg  string `json:"msg,omitempty"`
}

type IndexInfo

type IndexInfo struct {
	Name       string           `json:"name,omitempty"`       // Name of the index
	Uuid       string           `json:"uuid,omitempty"`       // unique id for every index
	Using      common.IndexType `json:"using,omitempty"`      // indexing algorithm
	OnExprList []string         `json:"onExprList,omitempty"` // expression list
	Bucket     string           `json:"bucket,omitempty"`     // bucket name
	IsPrimary  bool             `json:"isPrimary,omitempty"`
	Exprtype   common.ExprType  `json:"exprType,omitempty"`
}

Every index ever created and maintained by this package will have an associated index-info structure.

func (IndexInfo) String

func (idx IndexInfo) String() string

type IndexMetaResponse

type IndexMetaResponse struct {
	Status     ResponseStatus `json:"status,omitempty"`
	Indexes    []IndexInfo    `json:"indexes,omitempty"`
	ServerUuid string         `json:"serverUuid,omitempty"`
	Nodes      []NodeInfo     `json:"nodes,omitempty"`
	Errors     []IndexError   `json:"errors,omitempty"`
}

type IndexPartnMap

type IndexPartnMap map[common.IndexInstId]PartitionInstMap

IndexPartnMap maps a IndexInstId to PartitionInstMap

func CopyIndexPartnMap

func CopyIndexPartnMap(inMap IndexPartnMap) IndexPartnMap

func (IndexPartnMap) String

func (pm IndexPartnMap) String() string

type IndexQueueMap

type IndexQueueMap map[common.IndexInstId]IndexerMutationQueue

IndexQueueMap is a map between IndexId and IndexerMutationQueue

type IndexReader

type IndexReader interface {
	Counter
	Ranger
	RangeCounter
}

type IndexRequest

type IndexRequest struct {
	Type       RequestType `json:"type,omitempty"`
	Index      IndexInfo   `json:"index,omitempty"`
	ServerUuid string      `json:"serverUuid,omitempty"`
	Params     QueryParams `json:"params,omitempty"`
}

All API accept IndexRequest structure and returns IndexResponse structure. If application is written in Go, and compiled with `indexing` package then they can choose the access the underlying interfaces directly.

type IndexRow

type IndexRow struct {
	Key   [][]byte `json:"key,omitempty"`
	Value string   `json:"value,omitempty"`
}

type IndexScanResponse

type IndexScanResponse struct {
	Status    ResponseStatus `json:"status,omitempty"`
	TotalRows uint64         `json:"totalrows,omitempty"`
	Rows      []IndexRow     `json:"rows,omitempty"`
	Errors    []IndexError   `json:"errors,omitempty"`
}

type IndexWriter

type IndexWriter interface {

	//Persist a key/value pair
	Insert(key Key, value Value) error

	//Delete a key/value pair by docId
	Delete(docid []byte) error

	//Commit the pending operations
	Commit() error

	//Snapshot
	Snapshot() (Snapshot, error)

	//Close the index. Should be able to reopen after this operation
	Close() error

	//Destroy/Wipe the index completely
	Destroy() error
}

type Indexer

type Indexer interface {
	Shutdown() Message
}

type IndexerId

type IndexerId uint64

type IndexerMutationQueue

type IndexerMutationQueue struct {
	// contains filtered or unexported fields
}

IndexMutationQueue comprising of a mutation queue and a slab manager

type IndexerState

type IndexerState int16
const (
	INIT IndexerState = iota
	ACTIVE
	RECOVERY
)

type InitialBuildInfo

type InitialBuildInfo struct {
	// contains filtered or unexported fields
}

type KVSender

type KVSender interface {
}

KVSender provides the mechanism to talk to KV(projector, router etc)

type Key

type Key struct {
	// contains filtered or unexported fields
}

Key is an array of JSON objects, per encoding/json

func NewKey

func NewKey(data [][]byte, docid []byte) (Key, error)

func NewKeyFromEncodedBytes

func NewKeyFromEncodedBytes(b []byte) (Key, error)

func (*Key) Compare

func (k *Key) Compare(than Key) int

func (*Key) EncodedBytes

func (k *Key) EncodedBytes() []byte

func (*Key) String

func (k *Key) String() string

type Keybytes

type Keybytes [][]byte

type Looker

type Looker interface {
	Exister
	Lookup(key Key, stopch StopChannel) (chan Value, chan error)
	KeySet(stop StopChannel) (chan Key, chan error)
	ValueSet(stop StopChannel) (chan Value, chan error)
}

Looker is a class of algorithms that allow looking up a key in an index. Usually, being able to look up a key means we can iterate through all keys too, and so that is introduced here as well.

type Message

type Message interface {
	GetMsgType() MsgType
}

type MsgChannel

type MsgChannel chan Message

type MsgCreateIndex

type MsgCreateIndex struct {
	// contains filtered or unexported fields
}

INDEXER_CREATE_INDEX_DDL

func (*MsgCreateIndex) GetIndexInst

func (m *MsgCreateIndex) GetIndexInst() common.IndexInst

func (*MsgCreateIndex) GetMsgType

func (m *MsgCreateIndex) GetMsgType() MsgType

func (*MsgCreateIndex) GetResponseChannel

func (m *MsgCreateIndex) GetResponseChannel() MsgChannel

func (*MsgCreateIndex) GetString

func (m *MsgCreateIndex) GetString() string

type MsgDropIndex

type MsgDropIndex struct {
	// contains filtered or unexported fields
}

INDEXER_DROP_INDEX_DDL

func (*MsgDropIndex) GetIndexInstId

func (m *MsgDropIndex) GetIndexInstId() common.IndexInstId

func (*MsgDropIndex) GetMsgType

func (m *MsgDropIndex) GetMsgType() MsgType

func (*MsgDropIndex) GetResponseChannel

func (m *MsgDropIndex) GetResponseChannel() MsgChannel

func (*MsgDropIndex) GetString

func (m *MsgDropIndex) GetString() string

type MsgError

type MsgError struct {
	// contains filtered or unexported fields
}

Error Message

func (*MsgError) GetError

func (m *MsgError) GetError() Error

func (*MsgError) GetMsgType

func (m *MsgError) GetMsgType() MsgType

type MsgGeneral

type MsgGeneral struct {
	// contains filtered or unexported fields
}

Generic Message

func (*MsgGeneral) GetMsgType

func (m *MsgGeneral) GetMsgType() MsgType

type MsgMutMgrFlushDone

type MsgMutMgrFlushDone struct {
	// contains filtered or unexported fields
}

MUT_MGR_FLUSH_DONE

func (*MsgMutMgrFlushDone) GetBucket

func (m *MsgMutMgrFlushDone) GetBucket() string

func (*MsgMutMgrFlushDone) GetMsgType

func (m *MsgMutMgrFlushDone) GetMsgType() MsgType

func (*MsgMutMgrFlushDone) GetStreamId

func (m *MsgMutMgrFlushDone) GetStreamId() common.StreamId

func (*MsgMutMgrFlushDone) GetTS

func (m *MsgMutMgrFlushDone) GetTS() Timestamp

func (*MsgMutMgrFlushDone) String

func (m *MsgMutMgrFlushDone) String() string

type MsgMutMgrFlushMutationQueue

type MsgMutMgrFlushMutationQueue struct {
	// contains filtered or unexported fields
}

MUT_MGR_PERSIST_MUTATION_QUEUE MUT_MGR_DISCARD_MUTATION_QUEUE

func (*MsgMutMgrFlushMutationQueue) GetBucket

func (m *MsgMutMgrFlushMutationQueue) GetBucket() string

func (*MsgMutMgrFlushMutationQueue) GetMsgType

func (m *MsgMutMgrFlushMutationQueue) GetMsgType() MsgType

func (*MsgMutMgrFlushMutationQueue) GetStreamId

func (m *MsgMutMgrFlushMutationQueue) GetStreamId() common.StreamId

func (*MsgMutMgrFlushMutationQueue) GetTimestamp

func (m *MsgMutMgrFlushMutationQueue) GetTimestamp() Timestamp

func (*MsgMutMgrFlushMutationQueue) String

func (m *MsgMutMgrFlushMutationQueue) String() string

type MsgMutMgrGetTimestamp

type MsgMutMgrGetTimestamp struct {
	// contains filtered or unexported fields
}

MUT_MGR_GET_MUTATION_QUEUE_HWT MUT_MGR_GET_MUTATION_QUEUE_LWT

func (*MsgMutMgrGetTimestamp) GetBucket

func (m *MsgMutMgrGetTimestamp) GetBucket() string

func (*MsgMutMgrGetTimestamp) GetMsgType

func (m *MsgMutMgrGetTimestamp) GetMsgType() MsgType

func (*MsgMutMgrGetTimestamp) GetStreamId

func (m *MsgMutMgrGetTimestamp) GetStreamId() common.StreamId

type MsgScanIndex

type MsgScanIndex struct {
	// contains filtered or unexported fields
}

SCAN_COORD_SCAN_INDEX

func (*MsgScanIndex) GetCountChannel

func (m *MsgScanIndex) GetCountChannel() chan uint64

func (*MsgScanIndex) GetErrorChannel

func (m *MsgScanIndex) GetErrorChannel() chan Message

func (*MsgScanIndex) GetIndexInstId

func (m *MsgScanIndex) GetIndexInstId() common.IndexInstId

func (*MsgScanIndex) GetMsgType

func (m *MsgScanIndex) GetMsgType() MsgType

func (*MsgScanIndex) GetParams

func (m *MsgScanIndex) GetParams() ScanParams

func (*MsgScanIndex) GetResultChannel

func (m *MsgScanIndex) GetResultChannel() chan Value

func (*MsgScanIndex) GetScanId

func (m *MsgScanIndex) GetScanId() int64

func (*MsgScanIndex) GetStopChannel

func (m *MsgScanIndex) GetStopChannel() StopChannel

type MsgStream

type MsgStream struct {
	// contains filtered or unexported fields
}

Stream Reader Message

func (*MsgStream) GetMsgType

func (m *MsgStream) GetMsgType() MsgType

func (*MsgStream) GetMutationMeta

func (m *MsgStream) GetMutationMeta() *MutationMeta

func (*MsgStream) GetStreamId

func (m *MsgStream) GetStreamId() common.StreamId

type MsgStreamError

type MsgStreamError struct {
	// contains filtered or unexported fields
}

Stream Error Message

func (*MsgStreamError) GetError

func (m *MsgStreamError) GetError() Error

func (*MsgStreamError) GetMsgType

func (m *MsgStreamError) GetMsgType() MsgType

func (*MsgStreamError) GetStreamId

func (m *MsgStreamError) GetStreamId() common.StreamId

type MsgStreamUpdate

type MsgStreamUpdate struct {
	// contains filtered or unexported fields
}

OPEN_STREAM ADD_INDEX_LIST_TO_STREAM REMOVE_INDEX_LIST_FROM_STREAM CLOSE_STREAM CLEANUP_STREAM

func (*MsgStreamUpdate) GetBucket

func (m *MsgStreamUpdate) GetBucket() string

func (*MsgStreamUpdate) GetIndexList

func (m *MsgStreamUpdate) GetIndexList() []common.IndexInst

func (*MsgStreamUpdate) GetMsgType

func (m *MsgStreamUpdate) GetMsgType() MsgType

func (*MsgStreamUpdate) GetResponseChannel

func (m *MsgStreamUpdate) GetResponseChannel() MsgChannel

func (*MsgStreamUpdate) GetStreamId

func (m *MsgStreamUpdate) GetStreamId() common.StreamId

func (*MsgStreamUpdate) GetTimestamp

func (m *MsgStreamUpdate) GetTimestamp() Timestamp

func (*MsgStreamUpdate) String

func (m *MsgStreamUpdate) String() string

type MsgSuccess

type MsgSuccess struct {
}

Success Message

func (*MsgSuccess) GetMsgType

func (m *MsgSuccess) GetMsgType() MsgType

type MsgTKEnableFlush

type MsgTKEnableFlush struct {
	// contains filtered or unexported fields
}

TK_ENABLE_FLUSH

func (*MsgTKEnableFlush) GetBucket

func (m *MsgTKEnableFlush) GetBucket() string

func (*MsgTKEnableFlush) GetMsgType

func (m *MsgTKEnableFlush) GetMsgType() MsgType

func (*MsgTKEnableFlush) GetStreamId

func (m *MsgTKEnableFlush) GetStreamId() common.StreamId

type MsgTKInitBuildDone

type MsgTKInitBuildDone struct {
	// contains filtered or unexported fields
}

TK_INIT_BUILD_DONE

func (*MsgTKInitBuildDone) GetBucket

func (m *MsgTKInitBuildDone) GetBucket() string

func (*MsgTKInitBuildDone) GetMsgType

func (m *MsgTKInitBuildDone) GetMsgType() MsgType

func (*MsgTKInitBuildDone) GetResponseChannel

func (m *MsgTKInitBuildDone) GetResponseChannel() MsgChannel

func (*MsgTKInitBuildDone) GetStreamId

func (m *MsgTKInitBuildDone) GetStreamId() common.StreamId

func (*MsgTKInitBuildDone) GetTimestamp

func (m *MsgTKInitBuildDone) GetTimestamp() Timestamp

type MsgTKMergeStream

type MsgTKMergeStream struct {
	// contains filtered or unexported fields
}

TK_MERGE_STREAM

func (*MsgTKMergeStream) GetBucket

func (m *MsgTKMergeStream) GetBucket() string

func (*MsgTKMergeStream) GetMergeTS

func (m *MsgTKMergeStream) GetMergeTS() Timestamp

func (*MsgTKMergeStream) GetMsgType

func (m *MsgTKMergeStream) GetMsgType() MsgType

func (*MsgTKMergeStream) GetStreamId

func (m *MsgTKMergeStream) GetStreamId() common.StreamId

type MsgTKStabilityTS

type MsgTKStabilityTS struct {
	// contains filtered or unexported fields
}

TK_STABILITY_TIMESTAMP

func (*MsgTKStabilityTS) GetBucket

func (m *MsgTKStabilityTS) GetBucket() string

func (*MsgTKStabilityTS) GetMsgType

func (m *MsgTKStabilityTS) GetMsgType() MsgType

func (*MsgTKStabilityTS) GetStreamId

func (m *MsgTKStabilityTS) GetStreamId() common.StreamId

func (*MsgTKStabilityTS) GetTimestamp

func (m *MsgTKStabilityTS) GetTimestamp() Timestamp

func (*MsgTKStabilityTS) String

func (m *MsgTKStabilityTS) String() string

type MsgTimestamp

type MsgTimestamp struct {
	// contains filtered or unexported fields
}

Timestamp Message

func (*MsgTimestamp) GetMsgType

func (m *MsgTimestamp) GetMsgType() MsgType

func (*MsgTimestamp) GetTimestamp

func (m *MsgTimestamp) GetTimestamp() Timestamp

type MsgType

type MsgType int16

type MsgUpdateBucketQueue

type MsgUpdateBucketQueue struct {
	// contains filtered or unexported fields
}

STREAM_READER_UPDATE_QUEUE_MAP

func (*MsgUpdateBucketQueue) GetBucketQueueMap

func (m *MsgUpdateBucketQueue) GetBucketQueueMap() BucketQueueMap

func (*MsgUpdateBucketQueue) GetMsgType

func (m *MsgUpdateBucketQueue) GetMsgType() MsgType

func (*MsgUpdateBucketQueue) String

func (m *MsgUpdateBucketQueue) String() string

type MsgUpdateInstMap

type MsgUpdateInstMap struct {
	// contains filtered or unexported fields
}

UPDATE_INSTANCE_MAP

func (*MsgUpdateInstMap) GetIndexInstMap

func (m *MsgUpdateInstMap) GetIndexInstMap() common.IndexInstMap

func (*MsgUpdateInstMap) GetMsgType

func (m *MsgUpdateInstMap) GetMsgType() MsgType

func (*MsgUpdateInstMap) String

func (m *MsgUpdateInstMap) String() string

type MsgUpdatePartnMap

type MsgUpdatePartnMap struct {
	// contains filtered or unexported fields
}

UPDATE_PARTITION_MAP

func (*MsgUpdatePartnMap) GetIndexPartnMap

func (m *MsgUpdatePartnMap) GetIndexPartnMap() IndexPartnMap

func (*MsgUpdatePartnMap) GetMsgType

func (m *MsgUpdatePartnMap) GetMsgType() MsgType

func (*MsgUpdatePartnMap) String

func (m *MsgUpdatePartnMap) String() string

type MutationChannel

type MutationChannel chan *MutationKeys

type MutationKeys

type MutationKeys struct {
	// contains filtered or unexported fields
}

MutationKeys holds the Secondary Keys from a single KV Mutation

type MutationManager

type MutationManager interface {
}

MutationManager handles messages from Indexer to manage Mutation Streams and flush mutations from mutation queues.

type MutationMeta

type MutationMeta struct {
	// contains filtered or unexported fields
}

MutationMeta represents meta information for a KV Mutation

type MutationQueue

type MutationQueue interface {

	//enqueue a mutation reference based on vbucket
	Enqueue(mutation *MutationKeys, vbucket Vbucket) error

	//dequeue a vbucket's mutation and keep sending on a channel until stop signal
	Dequeue(vbucket Vbucket) (<-chan *MutationKeys, chan<- bool, error)
	//dequeue a vbucket's mutation upto seqno(wait if not available)
	DequeueUptoSeqno(vbucket Vbucket, seqno Seqno) (<-chan *MutationKeys, error)
	//dequeue single element for a vbucket and return
	DequeueSingleElement(vbucket Vbucket) *MutationKeys

	//return reference to a vbucket's mutation at Tail of queue without dequeue
	PeekTail(vbucket Vbucket) *MutationKeys
	//return reference to a vbucket's mutation at Head of queue without dequeue
	PeekHead(vbucket Vbucket) *MutationKeys

	//return size of queue per vbucket
	GetSize(vbucket Vbucket) int64

	//returns the numbers of vbuckets for the queue
	GetNumVbuckets() uint16
}

MutationQueue interface specifies methods which a mutation queue for indexer needs to implement

type MutationStreamReader

type MutationStreamReader interface {
	Shutdown()
}

MutationStreamReader reads a Dataport and stores the incoming mutations in mutation queue. This is the only component writing to a mutation queue.

type NodeInfo

type NodeInfo struct {
	IndexerURL string `json:"indexerURL,omitempty"`
}

Indexer Node Info

type PartitionInst

type PartitionInst struct {
	Defn common.PartitionDefn
	Sc   SliceContainer
}

PartitionInst contains the partition definition and a SliceContainer to manage all the slices storing the partition's data

func (PartitionInst) String

func (pi PartitionInst) String() string

type PartitionInstMap

type PartitionInstMap map[common.PartitionId]PartitionInst

PartitionInstMap maps a PartitionId to PartitionInst

type QueryParams

type QueryParams struct {
	ScanType  ScanType  `json:"scanType,omitempty"`
	Low       [][]byte  `json:"low,omitempty"`
	High      [][]byte  `json:"high,omitempty"`
	Inclusion Inclusion `json:"inclusion,omitempty"`
	Limit     int64     `json:"limit,omitempty"`
}

URL encoded query params

type RangeCounter

type RangeCounter interface {
	CountRange(low Key, high Key, inclusion Inclusion, stopch StopChannel) (
		uint64, error)
}

RangeCounter is a class of algorithms that can count a range efficiently

type Ranger

type Ranger interface {
	Looker
	KeyRange(low, high Key, inclusion Inclusion, stopch StopChannel) (
		chan Key, chan error, SortOrder)
	ValueRange(low, high Key, inclusion Inclusion, stopch StopChannel) (
		chan Value, chan error, SortOrder)
}

Ranger is a class of algorithms that can extract a range of keys from the index.

type RequestType

type RequestType string
const (
	CREATE RequestType = "create"
	DROP   RequestType = "drop"
	LIST   RequestType = "list"
	NOTIFY RequestType = "notify"
	NODES  RequestType = "nodes"
	SCAN   RequestType = "scan"
	STATS  RequestType = "stats"
)

type ResponseStatus

type ResponseStatus string

RESPONSE DATA FORMATS

const (
	RESP_SUCCESS       ResponseStatus = "success"
	RESP_ERROR         ResponseStatus = "error"
	RESP_INVALID_CACHE ResponseStatus = "invalid_cache"
)

type ScanCoordinator

type ScanCoordinator interface {
}

type ScanParams

type ScanParams struct {
	// contains filtered or unexported fields
}

type ScanType

type ScanType string
const (
	COUNT      ScanType = "count"
	EXISTS     ScanType = "exists"
	LOOKUP     ScanType = "lookup"
	RANGESCAN  ScanType = "rangeScan"
	FULLSCAN   ScanType = "fullScan"
	RANGECOUNT ScanType = "rangeCount"
)

type Seqno

type Seqno uint64

type SlabManager

type SlabManager interface {
	//AllocBuf allocates a buffer of given size. If returned buffer is nil,
	//Message will have error message
	AllocBuf(bufSize int) ([]byte, Message)

	//ReleaseBuf releases the buffer back to free pool
	ReleaseBuf(buf []byte) bool

	//SetMaxMemoryLimit sets the maximum memory that can be allocated
	SetMaxMemoryLimit(maxMemAlloc uint64) bool

	//GetMaxMemoryLimit returns the maximum memory that can be allocated
	GetMaxMemoryLimit() uint64
}

type Slice

type Slice interface {
	Id() SliceId
	Name() string
	Status() SliceStatus
	IndexInstId() common.IndexInstId
	IndexDefnId() common.IndexDefnId
	IsActive() bool

	SetActive(bool)
	SetStatus(SliceStatus)

	GetSnapshotContainer() SnapshotContainer

	IndexWriter
}

Slice represents the unit of physical storage for index

type SliceContainer

type SliceContainer interface {
	//Add Slice to container
	AddSlice(SliceId, Slice)

	//Update existing slice
	UpdateSlice(SliceId, Slice)

	//Remove existing slice
	RemoveSlice(SliceId)

	//Return Slice for the given IndexKey
	GetSliceByIndexKey(common.IndexKey) Slice

	//Return SliceId for the given IndexKey
	GetSliceIdByIndexKey(common.IndexKey) SliceId

	//Return Slice for the given SliceId
	GetSliceById(SliceId) Slice

	//Return all Slices
	GetAllSlices() []Slice
}

SliceContainer contains all slices for an index partition and provides methods to determine how data is distributed in multiple slices for a single partition

type SliceId

type SliceId uint64

type SliceStatus

type SliceStatus int16
const (
	//Slice is warming up(open db files etc), not ready for operations
	SLICE_STATUS_PREPARING SliceStatus = iota
	//Ready for operations
	SLICE_STATUS_ACTIVE
	//Marked for deletion
	SLICE_STATUS_TERMINATE
)

type Snapshot

type Snapshot interface {
	IndexReader

	Open() error
	Close() error
	IsOpen() bool

	Id() SliceId
	IndexInstId() common.IndexInstId
	IndexDefnId() common.IndexDefnId

	Timestamp() Timestamp
	SetTimestamp(Timestamp)
}

Snapshot interface

type SnapshotContainer

type SnapshotContainer interface {
	Add(Snapshot)
	RemoveOldest() Snapshot
	Len() int

	GetLatestSnapshot() Snapshot
	GetSnapshotEqualToTS(Timestamp) Snapshot
	GetSnapshotRecentThanTS(Timestamp) Snapshot
}

SnapshotContainer manages snapshots for a Slice

type SortOrder

type SortOrder string

SortOrder characterizes if the algorithm emits keys in a predictable order

type StabilityTimestamp

type StabilityTimestamp Timestamp

Stability Timestamp

type StopChannel

type StopChannel chan bool

a generic channel which can be closed when you want someone to stop doing something

type StorageManager

type StorageManager interface {
}

type StreamAddressMap

type StreamAddressMap map[common.StreamId]common.Endpoint
var StreamAddrMap StreamAddressMap

type StreamStatus

type StreamStatus map[common.StreamId]bool

type StreamStatusMap

type StreamStatusMap map[common.StreamId]bool

type Timekeeper

type Timekeeper interface {
}

Timekeeper manages the Stability Timestamp Generation and also keeps track of the HWTimestamp for each bucket

type Timestamp

type Timestamp []Seqno

list of seqno per vbucket

func CopyTimestamp

func CopyTimestamp(ts Timestamp) Timestamp

func NewTimestamp

func NewTimestamp() Timestamp

func (Timestamp) Equals

func (ts Timestamp) Equals(ts1 Timestamp) bool

Equals returns true if both timestamps match, false otherwise

func (Timestamp) GreaterThan

func (ts Timestamp) GreaterThan(ts1 Timestamp) bool

GreaterThan returns true if the timestamp is greater than given timestamp

func (Timestamp) GreaterThanEqual

func (ts Timestamp) GreaterThanEqual(ts1 Timestamp) bool

GreaterThanEqual returns true if the given timestamp is matching or greater

func (Timestamp) IsZeroTs

func (ts Timestamp) IsZeroTs() bool

IsZeroTs return true if all seqno in TS are zero

type Value

type Value struct {
	// contains filtered or unexported fields
}

Value is the primary key of the relavent document

func NewValue

func NewValue(data [][]byte, docid []byte, vbucket Vbucket, seqno Seqno) (Value, error)

func NewValueFromEncodedBytes

func NewValueFromEncodedBytes(b []byte) (Value, error)

func (*Value) Docid

func (v *Value) Docid() []byte

func (*Value) EncodedBytes

func (v *Value) EncodedBytes() []byte

func (*Value) KeyBytes

func (v *Value) KeyBytes() Keybytes

func (*Value) String

func (v *Value) String() string

type Vbucket

type Vbucket uint32

type Vbuuid

type Vbuuid uint64

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL