Documentation
¶
Index ¶
- Constants
- Variables
- func CollectProposalsForPublish(passed, failed []gov.SimpleProposal) (Proposals, SideProposals)
- func DelistTradingPairForPublish(ctx sdk.Context, dexKeeper *orderPkg.DexKeeper, symbol string)
- func ExpireOrdersForPublish(dexKeeper *orderPkg.DexKeeper, ctx sdk.Context, blockTime time.Time)
- func GetAccountBalances(mapper auth.AccountKeeper, ctx sdk.Context, accSlices ...[]string) (res map[string]Account)
- func GetTradeAndOrdersRelatedAccounts(tradesToPublish []*Trade, orderChanges orderPkg.OrderChanges, ...) []string
- func Publish(publisher MarketDataPublisher, metrics *Metrics, Logger tmlog.Logger, ...)
- func PublishEvent(publisher MarketDataPublisher, Logger tmlog.Logger, ...)
- func Stop(publisher MarketDataPublisher)
- func Timer(logger tmlog.Logger, description string, op func()) (durationMs int64)
- type Account
- type Accounts
- type AggregatedMarketDataPublisher
- type AllocatedAmt
- type AssetBalance
- type AvroOrJsonMsg
- type Block
- type BlockFee
- type BlockInfoToPublish
- type Books
- type BreatheBlockMsg
- type Coin
- type CompletedReDelegation
- type CompletedUnbondingDelegation
- type CrossReceiver
- type CrossTransfer
- type CrossTransfers
- type CryptoBlock
- type DelegateEvent
- type Delegation
- type Distribution
- type DistributionMsg
- type EssMsg
- type ExecutionResults
- type Input
- type KafkaMarketDataPublisher
- type LocalMarketDataPublisher
- type MarketDataPublisher
- type Metrics
- type Mirror
- type Mirrors
- type MockMarketDataPublisher
- type NativeBlockMeta
- type NativeTransaction
- type Order
- type OrderBookDelta
- type OrderSymbolId
- type Orders
- type Output
- type PriceLevel
- type Proposal
- type ProposalStatus
- type Proposals
- type ReDelegation
- type Receiver
- type RedelegateEvent
- type Reward
- type SideProposal
- type SideProposals
- type Slash
- type SlashMsg
- type StakeUpdates
- type StakingMsg
- type Trade
- type Transaction
- type Transfer
- type Transfers
- type UnbondingDelegation
- type UndelegateEvent
- type Validator
Constants ¶
const ( // TODO(#66): revisit the setting / whole thread model here, // do we need better way to make main thread less possibility to block TransferCollectionChannelSize = 4000 ToRemoveOrderIdChannelSize = 1000 MaxOrderBookLevel = 100 )
const (
KafkaBrokerSep = ";"
)
Variables ¶
var ( Logger tmlog.Logger Cfg *config.PublicationConfig ToPublishCh chan BlockInfoToPublish ToRemoveOrderIdCh chan OrderSymbolId // order symbol and ids to remove from keeper.OrderInfoForPublish IsLive bool ToPublishEventCh chan *sub.ToPublishEvent )
var Pool = newPool()
block level pool
Functions ¶
func CollectProposalsForPublish ¶
func CollectProposalsForPublish(passed, failed []gov.SimpleProposal) (Proposals, SideProposals)
func ExpireOrdersForPublish ¶
func GetAccountBalances ¶
func GetTradeAndOrdersRelatedAccounts ¶
func GetTradeAndOrdersRelatedAccounts(tradesToPublish []*Trade, orderChanges orderPkg.OrderChanges, orderInfosForPublish orderPkg.OrderInfoForPublish) []string
func Publish ¶
func Publish( publisher MarketDataPublisher, metrics *Metrics, Logger tmlog.Logger, cfg *config.PublicationConfig, ToPublishCh <-chan BlockInfoToPublish)
func PublishEvent ¶
func PublishEvent( publisher MarketDataPublisher, Logger tmlog.Logger, cfg *config.PublicationConfig, ToPublishEventCh <-chan *sub.ToPublishEvent)
func Stop ¶
func Stop(publisher MarketDataPublisher)
Types ¶
type Account ¶
type Account struct {
Owner string // string representation of AccAddress
Fee string
Sequence int64
Balances []*AssetBalance
}
func (*Account) MarshalJSON ¶
func (*Account) ToNativeMap ¶
type Accounts ¶
func (*Accounts) EmptyCopy ¶
func (msg *Accounts) EmptyCopy() AvroOrJsonMsg
func (*Accounts) EssentialMsg ¶
func (*Accounts) ToNativeMap ¶
type AggregatedMarketDataPublisher ¶
type AggregatedMarketDataPublisher struct {
// contains filtered or unexported fields
}
func NewAggregatedMarketDataPublisher ¶
func NewAggregatedMarketDataPublisher(publishers ...MarketDataPublisher) (publisher *AggregatedMarketDataPublisher)
func (*AggregatedMarketDataPublisher) Stop ¶
func (publisher *AggregatedMarketDataPublisher) Stop()
type AllocatedAmt ¶
func (*AllocatedAmt) String ¶
func (msg *AllocatedAmt) String() string
type AssetBalance ¶
func (*AssetBalance) String ¶
func (msg *AssetBalance) String() string
func (*AssetBalance) ToNativeMap ¶
func (msg *AssetBalance) ToNativeMap() map[string]interface{}
type AvroOrJsonMsg ¶
type Block ¶
type Block struct {
ChainID string
CryptoBlock CryptoBlock
}
func GetBlockPublished ¶
func (Block) ToNativeMap ¶
type BlockFee ¶
type BlockFee struct {
Height int64
Fee string
Validators []string // slice of string wrappers of bytes representation of sdk.AccAddress
}
deliberated not implemented Ess
func (BlockFee) MarshalJSON ¶
func (BlockFee) ToNativeMap ¶
type BlockInfoToPublish ¶
type BlockInfoToPublish struct {
// contains filtered or unexported fields
}
intermediate data structures to deal with concurrent publication between main thread and publisher thread
func NewBlockInfoToPublish ¶
func NewBlockInfoToPublish( height int64, timestamp int64, tradesToPublish []*Trade, proposalsToPublish *Proposals, sideProposalsToPublish *SideProposals, stakeUpdates *StakeUpdates, orderChanges orderPkg.OrderChanges, orderInfos orderPkg.OrderInfoForPublish, accounts map[string]Account, latestPriceLevels orderPkg.ChangedPriceLevelsMap, blockFee BlockFee, feeHolder orderPkg.FeeHolder, transfers *Transfers, block *Block) BlockInfoToPublish
type Books ¶
type Books struct {
Height int64
Timestamp int64
NumOfMsgs int
Books []OrderBookDelta
}
deliberated not implemented Ess
func (*Books) ToNativeMap ¶
type BreatheBlockMsg ¶
func (*BreatheBlockMsg) EmptyCopy ¶
func (msg *BreatheBlockMsg) EmptyCopy() AvroOrJsonMsg
func (*BreatheBlockMsg) EssentialMsg ¶
func (msg *BreatheBlockMsg) EssentialMsg() string
func (*BreatheBlockMsg) String ¶
func (msg *BreatheBlockMsg) String() string
func (*BreatheBlockMsg) ToNativeMap ¶
func (msg *BreatheBlockMsg) ToNativeMap() map[string]interface{}
type Coin ¶
func (Coin) ToNativeMap ¶
type CompletedReDelegation ¶
type CompletedReDelegation struct {
Delegator sdk.AccAddress
ValidatorSrc sdk.ValAddress
ValidatorDst sdk.ValAddress
}
func (*CompletedReDelegation) String ¶
func (msg *CompletedReDelegation) String() string
type CompletedUnbondingDelegation ¶
type CompletedUnbondingDelegation struct {
Validator sdk.ValAddress
Delegator sdk.AccAddress
Amount Coin
}
func (*CompletedUnbondingDelegation) String ¶
func (msg *CompletedUnbondingDelegation) String() string
type CrossReceiver ¶
func (CrossReceiver) String ¶
func (msg CrossReceiver) String() string
func (CrossReceiver) ToNativeMap ¶
func (msg CrossReceiver) ToNativeMap() map[string]interface{}
type CrossTransfer ¶
type CrossTransfer struct {
TxHash string
ChainId string
RelayerFee int64
Type string
From string
Denom string
Contract string
Decimals int
To []CrossReceiver
}
func (CrossTransfer) String ¶
func (msg CrossTransfer) String() string
func (CrossTransfer) ToNativeMap ¶
func (msg CrossTransfer) ToNativeMap() map[string]interface{}
type CrossTransfers ¶
type CrossTransfers struct {
Height int64
Num int
Timestamp int64
Transfers []CrossTransfer
}
deliberated not implemented Ess
func (CrossTransfers) String ¶
func (msg CrossTransfers) String() string
func (CrossTransfers) ToNativeMap ¶
func (msg CrossTransfers) ToNativeMap() map[string]interface{}
type CryptoBlock ¶
type CryptoBlock struct {
BlockHash string
ParentHash string
BlockHeight int64
Timestamp string
TxTotal int64
BlockMeta NativeBlockMeta
Transactions []Transaction
}
func (CryptoBlock) String ¶
func (msg CryptoBlock) String() string
func (CryptoBlock) ToNativeMap ¶
func (msg CryptoBlock) ToNativeMap() map[string]interface{}
type DelegateEvent ¶
type DelegateEvent struct {
Delegator sdk.AccAddress
Validator sdk.ValAddress
Amount Coin
TxHash string
}
func (*DelegateEvent) String ¶
func (msg *DelegateEvent) String() string
type Delegation ¶
type Delegation stake.Delegation
func (*Delegation) String ¶
func (msg *Delegation) String() string
type Distribution ¶
type Distribution struct {
Validator sdk.ValAddress
SelfDelegator sdk.AccAddress
DistributeAddr sdk.AccAddress
ValTokens int64
TotalReward int64
Commission int64
Rewards []*Reward
}
func (*Distribution) String ¶
func (msg *Distribution) String() string
type DistributionMsg ¶
type DistributionMsg struct {
NumOfMsgs int
Height int64
Timestamp int64
Distributions map[string][]*Distribution
}
distribution message
func (*DistributionMsg) EmptyCopy ¶
func (msg *DistributionMsg) EmptyCopy() AvroOrJsonMsg
func (*DistributionMsg) EssentialMsg ¶
func (msg *DistributionMsg) EssentialMsg() string
func (*DistributionMsg) String ¶
func (msg *DistributionMsg) String() string
func (*DistributionMsg) ToNativeMap ¶
func (msg *DistributionMsg) ToNativeMap() map[string]interface{}
type EssMsg ¶
type EssMsg interface {
AvroOrJsonMsg
// a string that carry essential msg used to make up downstream service on kafka issue
// this string would be persisted into file
EssentialMsg() string
// an empty message of original `AvroOrJsonMsg` to make downstream logic not broken
EmptyCopy() AvroOrJsonMsg
}
EssMsg is a type when AvroOrJsonMsg failed to publish Not all AvroOrJsonMsg implemented Ess because:
for transfer:
1. qs doesn't subscribe to its topic (risk control is relying on that) 2. risk control can recover from explorer indexed transfers (pull mode) 3. we don't have a unique representation of transfer like order-id (we didn't save txhash in message)
for trade: the problem is same with above point 3, (trade id is only generated during publication, not persisted anywhere). If we keep qty, price, sid, bid for a trade, it would be too much, in this case we should recover from local publisher
type ExecutionResults ¶
type ExecutionResults struct {
Height int64
Timestamp int64 // milli seconds since Epoch
NumOfMsgs int // number of individual messages we published, consumer can verify messages they received against this field to make sure they does not miss messages
Trades trades
Orders Orders
Proposals Proposals
StakeUpdates StakeUpdates
}
func (*ExecutionResults) EmptyCopy ¶
func (msg *ExecutionResults) EmptyCopy() AvroOrJsonMsg
func (*ExecutionResults) EssentialMsg ¶
func (msg *ExecutionResults) EssentialMsg() string
func (*ExecutionResults) String ¶
func (msg *ExecutionResults) String() string
func (*ExecutionResults) ToNativeMap ¶
func (msg *ExecutionResults) ToNativeMap() map[string]interface{}
type Input ¶
func (Input) ToNativeMap ¶
type KafkaMarketDataPublisher ¶
type KafkaMarketDataPublisher struct {
// contains filtered or unexported fields
}
func NewKafkaMarketDataPublisher ¶
func NewKafkaMarketDataPublisher( logger log.Logger, dbDir string, failFast bool) (publisher *KafkaMarketDataPublisher)
func (*KafkaMarketDataPublisher) Stop ¶
func (publisher *KafkaMarketDataPublisher) Stop()
type LocalMarketDataPublisher ¶
type LocalMarketDataPublisher struct {
// contains filtered or unexported fields
}
Publish market data to local marketdata dir in cechaind home each message will be in json format one line in file file can be compressed and auto-rotated
func NewLocalMarketDataPublisher ¶
func NewLocalMarketDataPublisher( dataPath string, tmLogger tmLogger.Logger, config *config.PublicationConfig) (publisher *LocalMarketDataPublisher)
func (*LocalMarketDataPublisher) Stop ¶
func (publisher *LocalMarketDataPublisher) Stop()
type MarketDataPublisher ¶
type MarketDataPublisher interface {
Stop()
// contains filtered or unexported methods
}
type Metrics ¶
type Metrics struct {
// Height of last published message
PublicationHeight metricsPkg.Gauge
// Size of publication queue
PublicationQueueSize metricsPkg.Gauge
// Time between publish this and the last block.
// Should be (approximate) blocking + abci + publication time
PublicationBlockIntervalMs metricsPkg.Gauge
// Time used to collect block information
CollectBlockTimeMs metricsPkg.Gauge
// Time used to collect orderbook information
CollectOrderBookTimeMs metricsPkg.Gauge
// Time used to publish everything in a block
// Should be (approximate) sum of folllowing Times
PublishTotalTimeMs metricsPkg.Gauge
// Time used to publish order & trade
PublishTradeAndOrderTimeMs metricsPkg.Gauge
// Time used to publish orderbook
PublishOrderbookTimeMs metricsPkg.Gauge
// Time used to publish accounts
PublishAccountTimeMs metricsPkg.Gauge
// Time used to publish blockfee
PublishBlockfeeTimeMs metricsPkg.Gauge
// Time used to publish transfer
PublishTransfersTimeMs metricsPkg.Gauge
// Time used to publish block
PublishBlockTimeMs metricsPkg.Gauge
// Time used to publish sideProposal
PublishSideProposalTimeMs metricsPkg.Gauge
// num of trade
NumTrade metricsPkg.Gauge
// num of order
NumOrder metricsPkg.Gauge
// num of orderbook levels
NumOrderBook metricsPkg.Gauge
// num of account balance changes
NumAccounts metricsPkg.Gauge
// num of transfer
NumTransfers metricsPkg.Gauge
NumOrderInfoForPublish metricsPkg.Gauge
}
Metrics contains metrics exposed by this package.
func PrometheusMetrics ¶
func PrometheusMetrics() *Metrics
PrometheusMetrics returns Metrics build using Prometheus client library.
type Mirror ¶
type Mirror struct {
TxHash string
ChainId string
Type string
RelayerFee int64
Sender string
Contract string
BEP20Name string
BEP20Symbol string
BEP2Symbol string
OldTotalSupply int64
TotalSupply int64
Decimals int
Fee int64
}
func (Mirror) ToNativeMap ¶
type MockMarketDataPublisher ¶
type MockMarketDataPublisher struct {
AccountPublished []*Accounts
BooksPublished []*Books
ExecutionResultsPublished []*ExecutionResults
BlockFeePublished []BlockFee
TransferPublished []Transfers
BlockPublished []*Block
Lock *sync.Mutex // as mock publisher is only used in testing, its no harm to have this granularity Lock
MessagePublished uint32 // atomic integer used to determine the published messages
}
func NewMockMarketDataPublisher ¶
func NewMockMarketDataPublisher() (publisher *MockMarketDataPublisher)
func (*MockMarketDataPublisher) Stop ¶
func (publisher *MockMarketDataPublisher) Stop()
type NativeBlockMeta ¶
type NativeBlockMeta struct {
LastCommitHash string
DataHash string
ValidatorsHash string
NextValidatorsHash string
ConsensusHash string
AppHash string
LastResultsHash string
EvidenceHash string
ProposerAddress string
}
func (NativeBlockMeta) String ¶
func (msg NativeBlockMeta) String() string
func (NativeBlockMeta) ToNativeMap ¶
func (msg NativeBlockMeta) ToNativeMap() map[string]interface{}
type NativeTransaction ¶
type NativeTransaction struct {
Source int64
TxType string
TxAsset string
OrderId string
Code uint32
Data string
ProposalId int64
}
func (NativeTransaction) String ¶
func (msg NativeTransaction) String() string
func (NativeTransaction) ToNativeMap ¶
func (msg NativeTransaction) ToNativeMap() map[string]interface{}
type Order ¶
type Order struct {
Symbol string
Status orderPkg.ChangeType
OrderId string
TradeId string
Owner string
Side int8
OrderType int8
Price int64
Qty int64
LastExecutedPrice int64
LastExecutedQty int64
CumQty int64
Fee string // DEPRECATING(Galileo): total fee for Owner in this block, should use SingleFee in future
OrderCreationTime int64
TransactionTime int64
TimeInForce int8
CurrentExecutionType orderPkg.ExecutionType
TxHash string
SingleFee string // fee for this order update - ADDED Galileo
}
type OrderBookDelta ¶
type OrderBookDelta struct {
Symbol string
Buys []PriceLevel
Sells []PriceLevel
}
func (*OrderBookDelta) String ¶
func (msg *OrderBookDelta) String() string
func (*OrderBookDelta) ToNativeMap ¶
func (msg *OrderBookDelta) ToNativeMap() map[string]interface{}
type OrderSymbolId ¶
type Output ¶
func (Output) ToNativeMap ¶
type PriceLevel ¶
func (*PriceLevel) String ¶
func (msg *PriceLevel) String() string
func (*PriceLevel) ToNativeMap ¶
func (msg *PriceLevel) ToNativeMap() map[string]interface{}
type Proposal ¶
type Proposal struct {
Id int64
Status ProposalStatus
}
type ProposalStatus ¶
type ProposalStatus uint8
const ( Succeed ProposalStatus = iota Failed )
func (ProposalStatus) String ¶
func (this ProposalStatus) String() string
type Proposals ¶
func (*Proposals) ToNativeMap ¶
type ReDelegation ¶
type ReDelegation stake.Redelegation
func (*ReDelegation) String ¶
func (msg *ReDelegation) String() string
type Receiver ¶
func (Receiver) ToNativeMap ¶
type RedelegateEvent ¶
type RedelegateEvent struct {
Delegator sdk.AccAddress
ValidatorSrc sdk.ValAddress
ValidatorDst sdk.ValAddress
Amount Coin
TxHash string
}
func (*RedelegateEvent) String ¶
func (msg *RedelegateEvent) String() string
type Reward ¶
type Reward struct {
Validator sdk.ValAddress
Delegator sdk.AccAddress
Tokens int64
Amount int64
}
type SideProposal ¶
type SideProposal struct {
Id int64
ChainId string
Status ProposalStatus
}
func (*SideProposal) String ¶
func (msg *SideProposal) String() string
type SideProposals ¶
type SideProposals struct {
Height int64
Timestamp int64
NumOfMsgs int
Proposals []*SideProposal
}
func (*SideProposals) String ¶
func (msg *SideProposals) String() string
func (*SideProposals) ToNativeMap ¶
func (msg *SideProposals) ToNativeMap() map[string]interface{}
type Slash ¶
type Slash struct {
Validator sdk.ValAddress
InfractionType byte
InfractionHeight int64
JailUtil int64
SlashAmount int64
ToFeePool int64
Submitter sdk.AccAddress
SubmitterReward int64
ValidatorsCompensation []*AllocatedAmt
}
type SlashMsg ¶
slash message
func (*SlashMsg) EmptyCopy ¶
func (msg *SlashMsg) EmptyCopy() AvroOrJsonMsg
func (*SlashMsg) EssentialMsg ¶
func (*SlashMsg) ToNativeMap ¶
type StakeUpdates ¶
type StakeUpdates struct {
NumOfMsgs int
CompletedUnbondingDelegations []*CompletedUnbondingDelegation
}
func CollectStakeUpdatesForPublish ¶
func CollectStakeUpdatesForPublish(unbondingDelegations []stake.UnbondingDelegation) StakeUpdates
func (*StakeUpdates) String ¶
func (msg *StakeUpdates) String() string
func (*StakeUpdates) ToNativeMap ¶
func (msg *StakeUpdates) ToNativeMap() map[string]interface{}
type StakingMsg ¶
type StakingMsg struct {
NumOfMsgs int
Height int64
Timestamp int64
Validators []*Validator
RemovedValidators map[string][]sdk.ValAddress
Delegations map[string][]*Delegation
UnbondingDelegations map[string][]*UnbondingDelegation
ReDelegations map[string][]*ReDelegation
CompletedUBDs map[string][]*CompletedUnbondingDelegation
CompletedREDs map[string][]*CompletedReDelegation
DelegateEvents map[string][]*DelegateEvent
UndelegateEvents map[string][]*UndelegateEvent
RedelegateEvents map[string][]*RedelegateEvent
ElectedValidators map[string][]*Validator
}
staking message
func (*StakingMsg) EmptyCopy ¶
func (msg *StakingMsg) EmptyCopy() AvroOrJsonMsg
func (*StakingMsg) EssentialMsg ¶
func (msg *StakingMsg) EssentialMsg() string
func (*StakingMsg) String ¶
func (msg *StakingMsg) String() string
func (*StakingMsg) ToNativeMap ¶
func (msg *StakingMsg) ToNativeMap() map[string]interface{}
type Trade ¶
type Trade struct {
Id string
Symbol string
Price int64
Qty int64
Sid string
Bid string
Sfee string // DEPRECATING(Galileo): seller's total fee in this block, in future we should use SSingleFee which is more precise
Bfee string // DEPRECATING(Galileo): buyer's total fee in this block, in future we should use BSingleFee which is more precise
SAddr string // string representation of AccAddress
BAddr string // string representation of AccAddress
SSrc int64 // sell order source - ADDED Galileo
BSrc int64 // buy order source - ADDED Galileo
SSingleFee string // seller's fee for this trade - ADDED Galileo
BSingleFee string // buyer's fee for this trade - ADDED Galileo
TickType int // ADDED Galileo
}
func (*Trade) MarshalJSON ¶
type Transaction ¶
type Transaction struct {
TxHash string
Fee string
Timestamp string
Inputs []Input
Outputs []Output
NativeTransaction NativeTransaction
}
func (Transaction) String ¶
func (msg Transaction) String() string
func (Transaction) ToNativeMap ¶
func (msg Transaction) ToNativeMap() map[string]interface{}
type Transfer ¶
func (Transfer) ToNativeMap ¶
type Transfers ¶
deliberated not implemented Ess
func GetTransferPublished ¶
func (Transfers) ToNativeMap ¶
type UnbondingDelegation ¶
type UnbondingDelegation stake.UnbondingDelegation
func (*UnbondingDelegation) String ¶
func (msg *UnbondingDelegation) String() string
type UndelegateEvent ¶
type UndelegateEvent struct {
Delegator sdk.AccAddress
Validator sdk.ValAddress
Amount Coin
TxHash string
}
func (*UndelegateEvent) String ¶
func (msg *UndelegateEvent) String() string