Documentation
¶
Index ¶
- type DefaultMQAdminExtImpl
- func (impl *DefaultMQAdminExtImpl) CleanExpiredConsumerQueue(clusterName string) (result bool, err error)
- func (impl *DefaultMQAdminExtImpl) CleanExpiredConsumerQueueByAddr(brokerAddr string) (bool, error)
- func (impl *DefaultMQAdminExtImpl) CloneGroupOffset(srcGroup, destGroup, topic string, isOffline bool) error
- func (impl *DefaultMQAdminExtImpl) ConsumeMessageDirectly(consumerGroup, clientId, msgId string) (*body.ConsumeMessageDirectlyResult, error)
- func (impl *DefaultMQAdminExtImpl) Consumed(msg *message.MessageExt, consumerGroupId string) (bool, error)
- func (impl *DefaultMQAdminExtImpl) CreateAndUpdateKvConfig(namespace, key, value string) error
- func (impl *DefaultMQAdminExtImpl) CreateAndUpdateSubscriptionGroupConfig(addr string, config *subscription.SubscriptionGroupConfig) error
- func (impl *DefaultMQAdminExtImpl) CreateAndUpdateTopicConfig(brokerAddr string, topicConfig *stgcommon.TopicConfig) error
- func (impl *DefaultMQAdminExtImpl) CreateCustomTopic(brokerAddr string, topicConfig *stgcommon.TopicConfig) error
- func (impl *DefaultMQAdminExtImpl) CreateOrUpdateOrderConf(key, value string, isCluster bool) error
- func (impl *DefaultMQAdminExtImpl) CreateTopic(key, newTopic string, queueNum int) error
- func (impl *DefaultMQAdminExtImpl) DeleteIpsByProjectGroup(key string) error
- func (impl *DefaultMQAdminExtImpl) DeleteKvConfig(namespace, key string) error
- func (impl *DefaultMQAdminExtImpl) DeleteSubscriptionGroup(brokerAddr, groupName string) error
- func (impl *DefaultMQAdminExtImpl) DeleteTopicInBroker(brokerAddrs set.Set, topic string) error
- func (impl *DefaultMQAdminExtImpl) DeleteTopicInNameServer(namesrvSet set.Set, topic string) error
- func (impl *DefaultMQAdminExtImpl) EarliestMsgStoreTime(mq *message.MessageQueue) (int64, error)
- func (impl *DefaultMQAdminExtImpl) ExamineBrokerClusterInfo() (*body.ClusterPlusInfo, []*body.ClusterBrokerWapper, error)
- func (impl *DefaultMQAdminExtImpl) ExamineConsumeStats(consumerGroup string) (*admin.ConsumeStats, error)
- func (impl *DefaultMQAdminExtImpl) ExamineConsumeStatsByTopic(consumerGroup, topic string) (*admin.ConsumeStats, error)
- func (impl *DefaultMQAdminExtImpl) ExamineConsumerConnectionInfo(consumerGroup, topic string) (*body.ConsumerConnectionPlus, int, error)
- func (impl *DefaultMQAdminExtImpl) ExamineProducerConnectionInfo(producerGroup, topic string) (*body.ProducerConnection, error)
- func (impl *DefaultMQAdminExtImpl) ExamineSubscriptionGroupConfig(addr, group string) (*subscription.SubscriptionGroupConfig, error)
- func (impl *DefaultMQAdminExtImpl) ExamineTopicConfig(addr, topic string) (*stgcommon.TopicConfig, error)
- func (impl *DefaultMQAdminExtImpl) ExamineTopicRouteInfo(topic string) (*route.TopicRouteData, error)
- func (impl *DefaultMQAdminExtImpl) ExamineTopicStats(topic string) (*admin.TopicStatsTable, error)
- func (impl *DefaultMQAdminExtImpl) FetchAllTopicList() (*body.TopicList, error)
- func (impl *DefaultMQAdminExtImpl) FetchBrokerNameByAddr(brokerAddr string) (string, error)
- func (impl *DefaultMQAdminExtImpl) FetchBrokerNameByClusterName(clusterName string) (set.Set, error)
- func (impl *DefaultMQAdminExtImpl) FetchBrokerRuntimeStats(brokerAddr string) (*body.KVTable, error)
- func (impl *DefaultMQAdminExtImpl) FetchMasterAddrByClusterName(clusterName string) (set.Set, error)
- func (impl *DefaultMQAdminExtImpl) GetAdminExtGroup() string
- func (impl *DefaultMQAdminExtImpl) GetAllClusterNames() ([]string, map[string]*route.BrokerData, error)
- func (impl *DefaultMQAdminExtImpl) GetClusterTopicWappers() ([]*body.TopicBrokerClusterWapper, error)
- func (impl *DefaultMQAdminExtImpl) GetConsumeStatus(topic, consumerGroupId, clientAddr string) (map[string]map[*message.MessageQueue]int64, error)
- func (impl *DefaultMQAdminExtImpl) GetConsumerRunningInfo(consumerGroupId, clientId string, jstack bool) (*body.ConsumerRunningInfo, error)
- func (impl *DefaultMQAdminExtImpl) GetCreateTopicKey() string
- func (impl *DefaultMQAdminExtImpl) GetIpsByProjectGroup(projectGroup string) (string, error)
- func (impl *DefaultMQAdminExtImpl) GetKVConfig(namespace, key string) (string, error)
- func (impl *DefaultMQAdminExtImpl) GetKVListByNamespace(namespace string) (*body.KVTable, error)
- func (impl *DefaultMQAdminExtImpl) GetNameServerAddressList() ([]string, error)
- func (impl *DefaultMQAdminExtImpl) GetProjectGroupByIp(ip string) (string, error)
- func (impl *DefaultMQAdminExtImpl) GetTopicsByCluster(clusterName string) ([]*body.TopicBrokerClusterWapper, error)
- func (impl *DefaultMQAdminExtImpl) MaxOffset(mq *message.MessageQueue) (int64, error)
- func (impl *DefaultMQAdminExtImpl) MessageTrackDetail(msg *message.MessageExt) ([]*track.MessageTrack, error)
- func (impl *DefaultMQAdminExtImpl) MinOffset(mq *message.MessageQueue) (int64, error)
- func (impl *DefaultMQAdminExtImpl) PutKVConfig(namespace, key, value string) error
- func (impl *DefaultMQAdminExtImpl) QueryConsumeTimeSpan(topic, consumerGroupId string) (set.Set, error)
- func (impl *DefaultMQAdminExtImpl) QueryMessage(topic, key string, maxNum int, begin, end int64) (*admin.QueryResult, error)
- func (impl *DefaultMQAdminExtImpl) QueryTopicConsumeByWho(topic string) (*body.GroupList, error)
- func (impl *DefaultMQAdminExtImpl) ResetOffsetByTimestamp(topic, group string, timestamp int64, force bool) (map[*message.MessageQueue]int64, error)
- func (impl *DefaultMQAdminExtImpl) ResetOffsetByTimestampOld(consumerGroup, topic string, timestamp int64, force bool) ([]*admin.RollbackStats, error)
- func (impl *DefaultMQAdminExtImpl) ResetOffsetNew(consumerGroup, topic string, timestamp int64) error
- func (impl *DefaultMQAdminExtImpl) SearchOffset(mq message.MessageQueue, timestamp int64) (int64, error)
- func (impl *DefaultMQAdminExtImpl) Shutdown() error
- func (impl *DefaultMQAdminExtImpl) Start() error
- func (impl *DefaultMQAdminExtImpl) UpdateBrokerConfig(brokerAddr string, properties map[string]interface{}) error
- func (impl *DefaultMQAdminExtImpl) ViewBrokerStatsData(brokerAddr, statsName, statsKey string) (*body.BrokerStatsData, error)
- func (impl *DefaultMQAdminExtImpl) ViewMessage(msgId string) (*message.MessageExt, error)
- func (impl *DefaultMQAdminExtImpl) WipeWritePermOfBroker(namesrvAddr, brokerName string) (int, error)
- type MQAdminExtInner
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DefaultMQAdminExtImpl ¶
type DefaultMQAdminExtImpl struct {
MQAdminExtInner
// contains filtered or unexported fields
}
DefaultMQAdminExtImpl 所有运维接口都在这里实现 Author: tianyuliang Since: 2017/11/2
func NewCustomMQAdminExtImpl ¶
func NewCustomMQAdminExtImpl(rpcHook remoting.RPCHook, namesrvAddr string) *DefaultMQAdminExtImpl
NewCustomMQAdminExtImpl 初始化admin控制器 Author: tianyuliang Since: 2017/11/1
func NewDefaultMQAdminExtImpl ¶
func NewDefaultMQAdminExtImpl(namesrvAddr string) *DefaultMQAdminExtImpl
NewDefaultMQAdminExtImpl 初始化admin控制器 Author: tianyuliang Since: 2017/11/1
func (*DefaultMQAdminExtImpl) CleanExpiredConsumerQueue ¶
func (impl *DefaultMQAdminExtImpl) CleanExpiredConsumerQueue(clusterName string) (result bool, err error)
触发清理失效的消费队列 cluster 如果参数cluster为空,则表示所有集群 return 清理是否成功
func (*DefaultMQAdminExtImpl) CleanExpiredConsumerQueueByAddr ¶
func (impl *DefaultMQAdminExtImpl) CleanExpiredConsumerQueueByAddr(brokerAddr string) (bool, error)
触发指定的broker清理失效的消费队列 return 清理是否成功
func (*DefaultMQAdminExtImpl) CloneGroupOffset ¶
func (impl *DefaultMQAdminExtImpl) CloneGroupOffset(srcGroup, destGroup, topic string, isOffline bool) error
克隆某一个组的消费进度到新的组
func (*DefaultMQAdminExtImpl) ConsumeMessageDirectly ¶
func (impl *DefaultMQAdminExtImpl) ConsumeMessageDirectly(consumerGroup, clientId, msgId string) (*body.ConsumeMessageDirectlyResult, error)
向指定Consumer发送某条消息
func (*DefaultMQAdminExtImpl) Consumed ¶
func (impl *DefaultMQAdminExtImpl) Consumed(msg *message.MessageExt, consumerGroupId string) (bool, error)
Consumed 校验某条消息是否被某个消费组消费过
return: true表示已被消费; false:表示未被消费
Author: tianyuliang Since: 2017/11/6
func (*DefaultMQAdminExtImpl) CreateAndUpdateKvConfig ¶
func (impl *DefaultMQAdminExtImpl) CreateAndUpdateKvConfig(namespace, key, value string) error
在 namespace 上添加或者更新 KV 配置
func (*DefaultMQAdminExtImpl) CreateAndUpdateSubscriptionGroupConfig ¶
func (impl *DefaultMQAdminExtImpl) CreateAndUpdateSubscriptionGroupConfig(addr string, config *subscription.SubscriptionGroupConfig) error
向指定Broker创建或者更新订阅组配置
func (*DefaultMQAdminExtImpl) CreateAndUpdateTopicConfig ¶
func (impl *DefaultMQAdminExtImpl) CreateAndUpdateTopicConfig(brokerAddr string, topicConfig *stgcommon.TopicConfig) error
创建或更新Topic
func (*DefaultMQAdminExtImpl) CreateCustomTopic ¶
func (impl *DefaultMQAdminExtImpl) CreateCustomTopic(brokerAddr string, topicConfig *stgcommon.TopicConfig) error
创建Topic key 消息队列已存在的topic newTopic 需新建的topic queueNum 读写队列的数量
func (*DefaultMQAdminExtImpl) CreateOrUpdateOrderConf ¶
func (impl *DefaultMQAdminExtImpl) CreateOrUpdateOrderConf(key, value string, isCluster bool) error
创建或更新顺序消息的分区配置
func (*DefaultMQAdminExtImpl) CreateTopic ¶
func (impl *DefaultMQAdminExtImpl) CreateTopic(key, newTopic string, queueNum int) error
创建Topic key 消息队列已存在的topic newTopic 需新建的topic queueNum 读写队列的数量
func (*DefaultMQAdminExtImpl) DeleteIpsByProjectGroup ¶
func (impl *DefaultMQAdminExtImpl) DeleteIpsByProjectGroup(key string) error
删除 project group 对应的所有 server ip
func (*DefaultMQAdminExtImpl) DeleteKvConfig ¶
func (impl *DefaultMQAdminExtImpl) DeleteKvConfig(namespace, key string) error
删除 namespace 上的 KV 配置
func (*DefaultMQAdminExtImpl) DeleteSubscriptionGroup ¶
func (impl *DefaultMQAdminExtImpl) DeleteSubscriptionGroup(brokerAddr, groupName string) error
删除 broker 上的 subscription group 信息
func (*DefaultMQAdminExtImpl) DeleteTopicInBroker ¶
func (impl *DefaultMQAdminExtImpl) DeleteTopicInBroker(brokerAddrs set.Set, topic string) error
删除 broker 上的 topic 信息
func (*DefaultMQAdminExtImpl) DeleteTopicInNameServer ¶
func (impl *DefaultMQAdminExtImpl) DeleteTopicInNameServer(namesrvSet set.Set, topic string) error
删除 namesrv维护的topic信息
func (*DefaultMQAdminExtImpl) EarliestMsgStoreTime ¶
func (impl *DefaultMQAdminExtImpl) EarliestMsgStoreTime(mq *message.MessageQueue) (int64, error)
查询较早的存储消息
func (*DefaultMQAdminExtImpl) ExamineBrokerClusterInfo ¶
func (impl *DefaultMQAdminExtImpl) ExamineBrokerClusterInfo() (*body.ClusterPlusInfo, []*body.ClusterBrokerWapper, error)
查看集群信息
func (*DefaultMQAdminExtImpl) ExamineConsumeStats ¶
func (impl *DefaultMQAdminExtImpl) ExamineConsumeStats(consumerGroup string) (*admin.ConsumeStats, error)
查询消费进度
func (*DefaultMQAdminExtImpl) ExamineConsumeStatsByTopic ¶
func (impl *DefaultMQAdminExtImpl) ExamineConsumeStatsByTopic(consumerGroup, topic string) (*admin.ConsumeStats, error)
基于Topic查询消费进度
func (*DefaultMQAdminExtImpl) ExamineConsumerConnectionInfo ¶
func (impl *DefaultMQAdminExtImpl) ExamineConsumerConnectionInfo(consumerGroup, topic string) (*body.ConsumerConnectionPlus, int, error)
查看Consumer网络连接、订阅关系
func (*DefaultMQAdminExtImpl) ExamineProducerConnectionInfo ¶
func (impl *DefaultMQAdminExtImpl) ExamineProducerConnectionInfo(producerGroup, topic string) (*body.ProducerConnection, error)
查看Producer网络连接
func (*DefaultMQAdminExtImpl) ExamineSubscriptionGroupConfig ¶
func (impl *DefaultMQAdminExtImpl) ExamineSubscriptionGroupConfig(addr, group string) (*subscription.SubscriptionGroupConfig, error)
查询指定Broker的订阅组配置
func (*DefaultMQAdminExtImpl) ExamineTopicConfig ¶
func (impl *DefaultMQAdminExtImpl) ExamineTopicConfig(addr, topic string) (*stgcommon.TopicConfig, error)
查询指定Broker的Topic配置
func (*DefaultMQAdminExtImpl) ExamineTopicRouteInfo ¶
func (impl *DefaultMQAdminExtImpl) ExamineTopicRouteInfo(topic string) (*route.TopicRouteData, error)
查看Topic路由信息
func (*DefaultMQAdminExtImpl) ExamineTopicStats ¶
func (impl *DefaultMQAdminExtImpl) ExamineTopicStats(topic string) (*admin.TopicStatsTable, error)
查询Topic Offset信息
func (*DefaultMQAdminExtImpl) FetchAllTopicList ¶
func (impl *DefaultMQAdminExtImpl) FetchAllTopicList() (*body.TopicList, error)
从Name Server获取所有Topic列表
func (*DefaultMQAdminExtImpl) FetchBrokerNameByAddr ¶
func (impl *DefaultMQAdminExtImpl) FetchBrokerNameByAddr(brokerAddr string) (string, error)
FetchBrokerNameByAddr 根据broker地址查询对应的broker名称
返回值: set<brokerName>
Author: tianyuliang Since: 2017/11/7
func (*DefaultMQAdminExtImpl) FetchBrokerNameByClusterName ¶
func (impl *DefaultMQAdminExtImpl) FetchBrokerNameByClusterName(clusterName string) (set.Set, error)
FetchBrokerNameByClusterName 根据Cluster集群名称,拉取所有broker名称
返回值: set<brokerName>
Author: tianyuliang Since: 2017/11/7
func (*DefaultMQAdminExtImpl) FetchBrokerRuntimeStats ¶
func (impl *DefaultMQAdminExtImpl) FetchBrokerRuntimeStats(brokerAddr string) (*body.KVTable, error)
获取Broker运行时数据
func (*DefaultMQAdminExtImpl) FetchMasterAddrByClusterName ¶
func (impl *DefaultMQAdminExtImpl) FetchMasterAddrByClusterName(clusterName string) (set.Set, error)
FetchMasterAddrByClusterName 拉取所有角色是“master”的broker地址列表
返回值: set.Set保存所有角色是master的 brokerAddr地址,即set<brokerAddr>
Author: tianyuliang Since: 2017/11/7
func (*DefaultMQAdminExtImpl) GetAdminExtGroup ¶
func (impl *DefaultMQAdminExtImpl) GetAdminExtGroup() string
GetAdminExtGroup 查询admin管理的组名称 Author: tianyuliang Since: 2017/11/6
func (*DefaultMQAdminExtImpl) GetAllClusterNames ¶
func (impl *DefaultMQAdminExtImpl) GetAllClusterNames() ([]string, map[string]*route.BrokerData, error)
GetClusterList 获取集群名称 Author: tianyuliang Since: 2017/11/7
func (*DefaultMQAdminExtImpl) GetClusterTopicWappers ¶
func (impl *DefaultMQAdminExtImpl) GetClusterTopicWappers() ([]*body.TopicBrokerClusterWapper, error)
GetClusterList 获取集群名称 Author: tianyuliang Since: 2017/11/7
func (*DefaultMQAdminExtImpl) GetConsumeStatus ¶
func (impl *DefaultMQAdminExtImpl) GetConsumeStatus(topic, consumerGroupId, clientAddr string) (map[string]map[*message.MessageQueue]int64, error)
通过客户端查看消费者的消费情况
func (*DefaultMQAdminExtImpl) GetConsumerRunningInfo ¶
func (impl *DefaultMQAdminExtImpl) GetConsumerRunningInfo(consumerGroupId, clientId string, jstack bool) (*body.ConsumerRunningInfo, error)
查询Consumer内存数据结构
func (*DefaultMQAdminExtImpl) GetCreateTopicKey ¶
func (impl *DefaultMQAdminExtImpl) GetCreateTopicKey() string
GetCreateTopicKey 查询创建Topic的key值 Author: tianyuliang Since: 2017/11/6
func (*DefaultMQAdminExtImpl) GetIpsByProjectGroup ¶
func (impl *DefaultMQAdminExtImpl) GetIpsByProjectGroup(projectGroup string) (string, error)
通过 project 获取所有的 server ip 信息
func (*DefaultMQAdminExtImpl) GetKVConfig ¶
func (impl *DefaultMQAdminExtImpl) GetKVConfig(namespace, key string) (string, error)
从Name Server获取一个配置项
func (*DefaultMQAdminExtImpl) GetKVListByNamespace ¶
func (impl *DefaultMQAdminExtImpl) GetKVListByNamespace(namespace string) (*body.KVTable, error)
获取指定Namespace下的所有kv
func (*DefaultMQAdminExtImpl) GetNameServerAddressList ¶
func (impl *DefaultMQAdminExtImpl) GetNameServerAddressList() ([]string, error)
获取Name Server地址列表
func (*DefaultMQAdminExtImpl) GetProjectGroupByIp ¶
func (impl *DefaultMQAdminExtImpl) GetProjectGroupByIp(ip string) (string, error)
通过 server ip 获取 project 信息
func (*DefaultMQAdminExtImpl) GetTopicsByCluster ¶
func (impl *DefaultMQAdminExtImpl) GetTopicsByCluster(clusterName string) ([]*body.TopicBrokerClusterWapper, error)
GetTopicsByCluster 根据ClusterName,查询该集群管理的所有Topic Author: tianyuliang Since: 2017/11/8
func (*DefaultMQAdminExtImpl) MaxOffset ¶
func (impl *DefaultMQAdminExtImpl) MaxOffset(mq *message.MessageQueue) (int64, error)
查询MessageQueue最大偏移量
func (*DefaultMQAdminExtImpl) MessageTrackDetail ¶
func (impl *DefaultMQAdminExtImpl) MessageTrackDetail(msg *message.MessageExt) ([]*track.MessageTrack, error)
查询消息被谁消费了
func (*DefaultMQAdminExtImpl) MinOffset ¶
func (impl *DefaultMQAdminExtImpl) MinOffset(mq *message.MessageQueue) (int64, error)
查询MessageQueue最小偏移量
func (*DefaultMQAdminExtImpl) PutKVConfig ¶
func (impl *DefaultMQAdminExtImpl) PutKVConfig(namespace, key, value string) error
向Name Server增加一个配置项
func (*DefaultMQAdminExtImpl) QueryConsumeTimeSpan ¶
func (impl *DefaultMQAdminExtImpl) QueryConsumeTimeSpan(topic, consumerGroupId string) (set.Set, error)
根据 topic 和 group 获取消息的时间跨度 retutn set<QueueTimeSpan>
func (*DefaultMQAdminExtImpl) QueryMessage ¶
func (impl *DefaultMQAdminExtImpl) QueryMessage(topic, key string, maxNum int, begin, end int64) (*admin.QueryResult, error)
搜索消息 topic topic名称 key 消息key关键字[业务系统基于此字段唯一标识消息] maxNum 最大搜索条数 begin 开始查询消息的时间戳 end 结束查询消息的时间戳
func (*DefaultMQAdminExtImpl) QueryTopicConsumeByWho ¶
func (impl *DefaultMQAdminExtImpl) QueryTopicConsumeByWho(topic string) (*body.GroupList, error)
根据Topic查询被哪些订阅组消费
func (*DefaultMQAdminExtImpl) ResetOffsetByTimestamp ¶
func (impl *DefaultMQAdminExtImpl) ResetOffsetByTimestamp(topic, group string, timestamp int64, force bool) (map[*message.MessageQueue]int64, error)
按照时间回溯消费进度(客户端不需要重启)
func (*DefaultMQAdminExtImpl) ResetOffsetByTimestampOld ¶
func (impl *DefaultMQAdminExtImpl) ResetOffsetByTimestampOld(consumerGroup, topic string, timestamp int64, force bool) ([]*admin.RollbackStats, error)
按照时间回溯消费进度(客户端需要重启)
func (*DefaultMQAdminExtImpl) ResetOffsetNew ¶
func (impl *DefaultMQAdminExtImpl) ResetOffsetNew(consumerGroup, topic string, timestamp int64) error
重置消费进度,无论Consumer是否在线,都可以执行。不保证最终结果是否成功,需要调用方通过消费进度查询来再次确认
func (*DefaultMQAdminExtImpl) SearchOffset ¶
func (impl *DefaultMQAdminExtImpl) SearchOffset(mq message.MessageQueue, timestamp int64) (int64, error)
根据时间戳搜索MessageQueue偏移量(注意:可能会出现大量IO开销)
func (*DefaultMQAdminExtImpl) Shutdown ¶
func (impl *DefaultMQAdminExtImpl) Shutdown() error
Shutdown 关闭Admin Author: tianyuliang Since: 2017/11/6
func (*DefaultMQAdminExtImpl) Start ¶
func (impl *DefaultMQAdminExtImpl) Start() error
Start 启动Admin Author: tianyuliang Since: 2017/11/6
func (*DefaultMQAdminExtImpl) UpdateBrokerConfig ¶
func (impl *DefaultMQAdminExtImpl) UpdateBrokerConfig(brokerAddr string, properties map[string]interface{}) error
更新Broker配置
func (*DefaultMQAdminExtImpl) ViewBrokerStatsData ¶
func (impl *DefaultMQAdminExtImpl) ViewBrokerStatsData(brokerAddr, statsName, statsKey string) (*body.BrokerStatsData, error)
服务器统计数据输出
func (*DefaultMQAdminExtImpl) ViewMessage ¶
func (impl *DefaultMQAdminExtImpl) ViewMessage(msgId string) (*message.MessageExt, error)
根据msgId查询消息消费结果
func (*DefaultMQAdminExtImpl) WipeWritePermOfBroker ¶
func (impl *DefaultMQAdminExtImpl) WipeWritePermOfBroker(namesrvAddr, brokerName string) (int, error)
清除某个Broker的写权限,针对所有Name Server return 返回清除了多少个topic
type MQAdminExtInner ¶
type MQAdminExtInner interface {
// 创建Topic
CreateTopic(key, newTopic string, queueNum int) error
// 启动Admin
Start() error
// 关闭Admin
Shutdown() error
// 更新Broker配置
UpdateBrokerConfig(brokerAddr string, properties map[string]interface{}) error
// 向指定Broker创建或者更新Topic配置
CreateAndUpdateTopicConfig(addr string, config *stgcommon.TopicConfig) error
// 向指定Broker创建或者更新订阅组配置
CreateAndUpdateSubscriptionGroupConfig(addr string, config *subscription.SubscriptionGroupConfig) error
// 查询指定Broker的订阅组配置
ExamineSubscriptionGroupConfig(addr, group string) (*subscription.SubscriptionGroupConfig, error)
// 查询指定Broker的Topic配置
ExamineTopicConfig(addr, topic string) (*stgcommon.TopicConfig, error)
// 查询Topic Offset信息
ExamineTopicStats(topic string) (*admin.TopicStatsTable, error)
// 从Name Server获取所有Topic列表
FetchAllTopicList() (*body.TopicList, error)
// 获取Broker运行时数据
FetchBrokerRuntimeStats(brokerAddr string) (*body.KVTable, error)
// 查询消费进度
ExamineConsumeStats(consumerGroup string) (*admin.ConsumeStats, error)
// 基于Topic查询消费进度
ExamineConsumeStatsByTopic(consumerGroup, topic string) (*admin.ConsumeStats, error)
// 查看集群信息
ExamineBrokerClusterInfo() (*body.ClusterPlusInfo, []*body.ClusterBrokerWapper, error)
// 查看Topic路由信息
ExamineTopicRouteInfo(topic string) (*route.TopicRouteData, error)
// 查看Consumer网络连接、订阅关系
// 注意:第二个参数标记消费组对应的connection进程是否在线 code=206说明消费进程不在线
ExamineConsumerConnectionInfo(consumerGroup, topic string) (*body.ConsumerConnectionPlus, int, error)
// 查看Producer网络连接
ExamineProducerConnectionInfo(producerGroup, topic string) (*body.ProducerConnection, error)
// 获取Name Server地址列表
GetNameServerAddressList() ([]string, error)
// 清除某个Broker的写权限,针对所有Name Server
// return 返回清除了多少个topic
WipeWritePermOfBroker(namesrvAddr, brokerName string) (int, error)
// 向Name Server增加一个配置项
PutKVConfig(namespace, key, value string) error
// 从Name Server获取一个配置项
GetKVConfig(namespace, key string) (string, error)
// 在 namespace 上添加或者更新 KV 配置
CreateAndUpdateKvConfig(namespace, key, value string) error
// 删除 namespace 上的 KV 配置
DeleteKvConfig(namespace, key string) error
// 获取指定Namespace下的所有kv
GetKVListByNamespace(namespace string) (*body.KVTable, error)
// 删除 broker 上的 topic 信息
DeleteTopicInBroker(brokerAddrs set.Set, topic string) error
// 删除 namesrv维护的topic信息
DeleteTopicInNameServer(namesrvs set.Set, topic string) error
// 删除 broker 上的 subscription group 信息
DeleteSubscriptionGroup(brokerAddr, groupName string) error
// 通过 server ip 获取 project 信息
GetProjectGroupByIp(ip string) (string, error)
// 通过 project 获取所有的 server ip 信息
GetIpsByProjectGroup(projectGroup string) (string, error)
// 删除 project group 对应的所有 server ip
DeleteIpsByProjectGroup(key string) error
// 按照时间回溯消费进度(客户端需要重启)
ResetOffsetByTimestampOld(consumerGroup, topic string, timestamp int64, force bool) ([]*admin.RollbackStats, error)
// 按照时间回溯消费进度(客户端不需要重启)
ResetOffsetByTimestamp(topic, group string, timestamp int64, force bool) (map[*message.MessageQueue]int64, error)
// 重置消费进度,无论Consumer是否在线,都可以执行。不保证最终结果是否成功,需要调用方通过消费进度查询来再次确认
ResetOffsetNew(consumerGroup, topic string, timestamp int64) error
// 通过客户端查看消费者的消费情况
GetConsumeStatus(topic, consumerGroupId, clientAddr string) (map[string]map[*message.MessageQueue]int64, error)
// 创建或更新顺序消息的分区配置
CreateOrUpdateOrderConf(key, value string, isCluster bool) error
// 根据Topic查询被哪些订阅组消费
QueryTopicConsumeByWho(topic string) (*body.GroupList, error)
// 根据 topic 和 group 获取消息的时间跨度
// retutn set<QueueTimeSpan>
QueryConsumeTimeSpan(topic, group string) (set.Set, error)
// 触发清理失效的消费队列
// cluster 如果参数cluster为空,则表示所有集群
// return 清理是否成功
CleanExpiredConsumerQueue(cluster string) (bool, error)
// 触发指定的broker清理失效的消费队列
// return 清理是否成功
CleanExpiredConsumerQueueByAddr(addr string) (bool, error)
// 查询Consumer内存数据结构
GetConsumerRunningInfo(consumerGroupId, clientId string, jstack bool) (*body.ConsumerRunningInfo, error)
// 向指定Consumer发送某条消息
ConsumeMessageDirectly(consumerGroup, clientId, msgId string) (*body.ConsumeMessageDirectlyResult, error)
//查询消息被谁消费了
MessageTrackDetail(msg *message.MessageExt) ([]*track.MessageTrack, error)
// 克隆某一个组的消费进度到新的组
CloneGroupOffset(srcGroup, destGroup, topic string, isOffline bool) error
// 服务器统计数据输出
ViewBrokerStatsData(brokerAddr, statsName, statsKey string) (*body.BrokerStatsData, error)
// 创建指定Topic
CreateCustomTopic(brokerAddr string, topicConfig *stgcommon.TopicConfig) error
// 根据msgId查询消息消费结果
ViewMessage(msgId string) (*message.MessageExt, error)
// 搜索消息
// topic topic名称
// key 消息key关键字[业务系统基于此字段唯一标识消息]
// maxNum 最大搜索条数
// begin 开始查询消息的时间戳
// end 结束查询消息的时间戳
QueryMessage(topic, key string, maxNum int, begin, end int64) (*admin.QueryResult, error)
// 查询较早的存储消息
EarliestMsgStoreTime(mq *message.MessageQueue) (int64, error)
// 根据时间戳搜索MessageQueue偏移量(注意:可能会出现大量IO开销)
SearchOffset(mq message.MessageQueue, timestamp int64) (int64, error)
// 查询MessageQueue最大偏移量
MaxOffset(mq *message.MessageQueue) (int64, error)
// 查询MessageQueue最小偏移量
MinOffset(mq *message.MessageQueue) (int64, error)
}
MQAdminExtInner 运维接口
(1)MQ管理类接口,涉及所有与MQ管理相关的对外接口 (2)包括Topic创建、订阅组创建、配置修改等
Author: tianyuliang Since: 2017/11/1
Source Files
¶
- default_mq_admin_api.go
- default_mq_admin_impl.go
- mq_admin_ext_inner.go