Documentation
¶
Index ¶
- Constants
- Variables
- func GetCommittedOffset(groupID string, topic string, partition uint32) int64
- func LoadGroupMetadataState()
- func UpdateGroupMetadataState(recordBatchBytes []byte)
- type APIKey
- type APIKeyHandler
- type APIVersionsResponse
- type AbortedTransaction
- type Broker
- func (b *Broker) APIDispatcher(requestAPIKey uint16) APIKeyHandler
- func (b *Broker) AppendRaftEntry(entryType raft.CommandType, entry any) (any, error)
- func (b *Broker) CreateTopicPartitions(name string, numPartitions uint32, configs map[string]string) error
- func (b *Broker) GetClusterNodes() ([]*types.Node, error)
- func (b *Broker) HandleConnection(conn net.Conn)
- func (b *Broker) IsController() bool
- func (b *Broker) SetupRaft() error
- func (b *Broker) SetupSerf() error
- func (b *Broker) Shutdown()
- func (b *Broker) Startup()
- type CreateTopicsRequest
- type CreateTopicsRequestAssignment
- type CreateTopicsRequestConfig
- type CreateTopicsRequestTopic
- type CreateTopicsResponse
- type CreateTopicsResponseConfig
- type CreateTopicsResponseTopic
- type DescribeConfigsRequest
- type DescribeConfigsResource
- type DescribeConfigsResponse
- type DescribeConfigsResponseConfig
- type DescribeConfigsResponseResult
- type DescribeConfigsResponseSynonym
- type Error
- type FetchPartitionResponse
- type FetchRequest
- type FetchRequestForgottenTopic
- type FetchRequestPartitionData
- type FetchRequestTopic
- type FetchResponse
- type FetchTopicResponse
- type FindCoordinatorRequestV1To3
- type FindCoordinatorRequestV4Plus
- type FindCoordinatorResponse
- type FindCoordinatorResponseCoordinator
- type HeartbeatResponse
- type InitProducerIDRequest
- type InitProducerIDResponse
- type JoinGroupRequest
- type JoinGroupRequestProtocol
- type JoinGroupResponse
- type JoinGroupResponseMember
- type ListOffsetsRequest
- type ListOffsetsRequestPartition
- type ListOffsetsRequestTopic
- type ListOffsetsResponse
- type ListOffsetsResponsePartition
- type ListOffsetsResponseTopic
- type MetadataRequest
- type MetadataRequestTopic
- type MetadataResponse
- type MetadataResponseBroker
- type MetadataResponsePartition
- type MetadataResponseTopic
- type OffsetCommitRequest
- type OffsetCommitRequestPartition
- type OffsetCommitRequestTopic
- type OffsetCommitResponse
- type OffsetCommitResponsePartition
- type OffsetCommitResponseTopic
- type OffsetFetchGroup
- type OffsetFetchPartition
- type OffsetFetchRequest
- type OffsetFetchRequestGroup
- type OffsetFetchRequestTopic
- type OffsetFetchResponse
- type OffsetFetchTopic
- type ProducePartitionResponse
- type ProduceRequest
- type ProduceRequestPartitionData
- type ProduceRequestTopicData
- type ProduceResponse
- type ProduceTopicResponse
- type RecordError
- type ResourceType
- type SyncGroupRequest
- type SyncGroupRequestMember
- type SyncGroupResponse
Constants ¶
const DefaultNumPartition = 1
DefaultNumPartition represents the num of partitions during creation if unspecified TODO: make configurable
Variables ¶
var ( ErrUnknownServerError = Error{Code: -1, Message: "The server experienced an unexpected error when processing the request.", IsRetriable: false} ErrNone = Error{Code: 0, Message: "", IsRetriable: false} ErrOffsetOutOfRange = Error{Code: 1, Message: "The requested offset is not within the range of offsets maintained by the server.", IsRetriable: false} ErrCorruptMessage = Error{Code: 2, Message: "This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt.", IsRetriable: true} ErrUnknownTopicOrPartition = Error{Code: 3, Message: "This server does not host this topic-partition.", IsRetriable: true} ErrInvalidFetchSize = Error{Code: 4, Message: "The requested fetch size is invalid.", IsRetriable: false} ErrLeaderNotAvailable = Error{Code: 5, Message: "There is no leader for this topic-partition as we are in the middle of a leadership election.", IsRetriable: true} ErrNotLeaderOrFollower = Error{Code: 6, Message: "For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.", IsRetriable: true} ErrRequestTimedOut = Error{Code: 7, Message: "The request timed out.", IsRetriable: true} ErrBrokerNotAvailable = Error{Code: 8, Message: "The broker is not available.", IsRetriable: false} ErrReplicaNotAvailable = Error{Code: 9, Message: "The replica is not available for the requested topic-partition. Produce/Fetch requests and other requests intended only for the leader or follower return NOT_LEADER_OR_FOLLOWER if the broker is not a replica of the topic-partition.", IsRetriable: true} ErrMessageTooLarge = Error{Code: 10, Message: "The request included a message larger than the max message size the server will accept.", IsRetriable: false} ErrStaleControllerEpoch = Error{Code: 11, Message: "The controller moved to another broker.", IsRetriable: false} ErrOffsetMetadataTooLarge = Error{Code: 12, Message: "The metadata field of the offset request was too large.", IsRetriable: false} ErrNetworkException = Error{Code: 13, Message: "The server disconnected before a response was received.", IsRetriable: true} ErrCoordinatorLoadInProgress = Error{Code: 14, Message: "The coordinator is loading and hence can't process requests.", IsRetriable: true} ErrCoordinatorNotAvailable = Error{Code: 15, Message: "The coordinator is not available.", IsRetriable: true} ErrNotCoordinator = Error{Code: 16, Message: "This is not the correct coordinator.", IsRetriable: true} ErrInvalidTopicException = Error{Code: 17, Message: "The request attempted to perform an operation on an invalid topic.", IsRetriable: false} ErrRecordListTooLarge = Error{Code: 18, Message: "The request included message batch larger than the configured segment size on the server.", IsRetriable: false} ErrNotEnoughReplicas = Error{Code: 19, Message: "Messages are rejected since there are fewer in-sync replicas than required.", IsRetriable: true} ErrNotEnoughReplicasAfterAppend = Error{Code: 20, Message: "Messages are written to the log, but to fewer in-sync replicas than required.", IsRetriable: true} ErrInvalidRequiredAcks = Error{Code: 21, Message: "Produce request specified an invalid value for required acks.", IsRetriable: false} ErrIllegalGeneration = Error{Code: 22, Message: "Specified group generation id is not valid.", IsRetriable: false} ErrInconsistentGroupProtocol = Error{Code: 23, Message: "The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.", IsRetriable: false} ErrInvalidGroupID = Error{Code: 24, Message: "The configured groupId is invalid.", IsRetriable: false} ErrUnknownMemberID = Error{Code: 25, Message: "The coordinator is not aware of this member.", IsRetriable: false} ErrInvalidSessionTimeout = Error{Code: 26, Message: "The session timeout is not within the range allowed by the broker (as configured by group.min.session.timeout.ms and group.max.session.timeout.ms).", IsRetriable: false} ErrRebalanceInProgress = Error{Code: 27, Message: "The group is rebalancing, so a rejoin is needed.", IsRetriable: false} ErrInvalidCommitOffsetSize = Error{Code: 28, Message: "The committing offset data size is not valid.", IsRetriable: false} ErrTopicAuthorizationFailed = Error{Code: 29, Message: "Topic authorization failed.", IsRetriable: false} ErrGroupAuthorizationFailed = Error{Code: 30, Message: "Group authorization failed.", IsRetriable: false} ErrClusterAuthorizationFailed = Error{Code: 31, Message: "Cluster authorization failed.", IsRetriable: false} ErrInvalidTimestamp = Error{Code: 32, Message: "The timestamp of the message is out of acceptable range.", IsRetriable: false} ErrUnsupportedSaslMechanism = Error{Code: 33, Message: "The broker does not support the requested SASL mechanism.", IsRetriable: false} ErrIllegalSaslState = Error{Code: 34, Message: "Request is not valid given the current SASL state.", IsRetriable: false} ErrUnsupportedVersion = Error{Code: 35, Message: "The version of API is not supported.", IsRetriable: false} ErrTopicAlreadyExists = Error{Code: 36, Message: "Topic with this name already exists.", IsRetriable: false} ErrInvalidPartitions = Error{Code: 37, Message: "Number of partitions is below 1.", IsRetriable: false} ErrInvalidReplicationFactor = Error{Code: 38, Message: "Replication factor is below 1 or larger than the number of available brokers.", IsRetriable: false} ErrInvalidReplicaAssignment = Error{Code: 39, Message: "Replica assignment is invalid.", IsRetriable: false} ErrInvalidConfig = Error{Code: 40, Message: "Configuration is invalid.", IsRetriable: false} ErrNotController = Error{Code: 41, Message: "This is not the correct controller for this cluster.", IsRetriable: true} )
Define each error as a variable of type Error
var ( ListOffsetsEarliestTimestamp = -2 ListOffsetsLatestTimestamp = -1 ListOffsetsMaxTimestamp = -3 )
Constants for list offsets timestamps.
var ClusterID = "MONKAFKA-CLUSTER"
ClusterID should be configurable?
var ConsumerOffsetsTopic = "__consumer-offsets"
ConsumerOffsetsTopic is the topic where consumers committed offsets are saved
var ErrorMap = map[int16]Error{ -1: ErrUnknownServerError, 0: ErrNone, 1: ErrOffsetOutOfRange, 2: ErrCorruptMessage, 3: ErrUnknownTopicOrPartition, 4: ErrInvalidFetchSize, 5: ErrLeaderNotAvailable, 6: ErrNotLeaderOrFollower, 7: ErrRequestTimedOut, 8: ErrBrokerNotAvailable, 9: ErrReplicaNotAvailable, 10: ErrMessageTooLarge, 11: ErrStaleControllerEpoch, 12: ErrOffsetMetadataTooLarge, 13: ErrNetworkException, 14: ErrCoordinatorLoadInProgress, 15: ErrCoordinatorNotAvailable, 16: ErrNotCoordinator, 17: ErrInvalidTopicException, 18: ErrRecordListTooLarge, 19: ErrNotEnoughReplicas, 20: ErrNotEnoughReplicasAfterAppend, 21: ErrInvalidRequiredAcks, 22: ErrIllegalGeneration, 23: ErrInconsistentGroupProtocol, 24: ErrInvalidGroupID, 25: ErrUnknownMemberID, 26: ErrInvalidSessionTimeout, 27: ErrRebalanceInProgress, 28: ErrInvalidCommitOffsetSize, 29: ErrTopicAuthorizationFailed, 30: ErrGroupAuthorizationFailed, 31: ErrClusterAuthorizationFailed, 32: ErrInvalidTimestamp, 33: ErrUnsupportedSaslMechanism, 34: ErrIllegalSaslState, 35: ErrUnsupportedVersion, 36: ErrTopicAlreadyExists, 37: ErrInvalidPartitions, 38: ErrInvalidReplicationFactor, 39: ErrInvalidReplicaAssignment, 40: ErrInvalidConfig, }
ErrorMap associates error codes with corresponding Error structs
var MinusOne int = -1
MinusOne used through variable because setting it as uint directly is rejected
Functions ¶
func GetCommittedOffset ¶
GetCommittedOffset returns the committed offset if it exists, otherwise -1
func LoadGroupMetadataState ¶
func LoadGroupMetadataState()
LoadGroupMetadataState from __consumer-offsets metadata
func UpdateGroupMetadataState ¶
func UpdateGroupMetadataState(recordBatchBytes []byte)
UpdateGroupMetadataState given a __consumer-offsets record, updates the state accordingly
Types ¶
type APIKeyHandler ¶
APIKeyHandler represents a kafka api key with its handler
type APIVersionsResponse ¶
APIVersionsResponse represents the response for API versions request.
type AbortedTransaction ¶
AbortedTransaction represents an aborted transaction in the fetch response.
type Broker ¶
type Broker struct { Config *types.Configuration ShutDownSignal chan bool Serf *serf.Serf // Serf cluster maintained inside the DC Raft *hraft.Raft // Raft cluster maintained inside the DC FSM *raft.FSM RaftNotifyCh <-chan bool // raftNotifyCh ensures that we get reliable leader transition notifications from the Raft layer. SerfEventCh chan serf.Event // eventCh is used to receive events from the serf cluster ReconcileCh chan serf.Member // used to pass events from the serf handler into the leader manager to update the strong state }
Broker represents a Kafka broker instance
func NewBroker ¶
func NewBroker(config *types.Configuration) *Broker
NewBroker creates a new Broker instance with the provided configuration
func (*Broker) APIDispatcher ¶
func (b *Broker) APIDispatcher(requestAPIKey uint16) APIKeyHandler
APIDispatcher maps the Request key to its handler
func (*Broker) AppendRaftEntry ¶
AppendRaftEntry add a new entry to the raft log
func (*Broker) CreateTopicPartitions ¶
func (b *Broker) CreateTopicPartitions(name string, numPartitions uint32, configs map[string]string) error
CreateTopicPartitions creates a new topic with its partition by appending them to the raft log.
func (*Broker) GetClusterNodes ¶
GetClusterNodes returns the raft cluster nodes each representing a broker
func (*Broker) HandleConnection ¶
HandleConnection processes incoming requests from a client connection
func (*Broker) IsController ¶
IsController return if the broker is the cluster's controller which is also the raft leader
type CreateTopicsRequest ¶
type CreateTopicsRequest struct { Topics []CreateTopicsRequestTopic TimeoutMs uint32 ValidateOnly bool }
CreateTopicsRequest represents the Kafka request to create topics.
type CreateTopicsRequestAssignment ¶
CreateTopicsRequestAssignment represents the partition assignments for a topic.
type CreateTopicsRequestConfig ¶
type CreateTopicsRequestConfig struct { Name string `kafka:"CompactString"` Value string `kafka:"CompactNullableString"` }
CreateTopicsRequestConfig represents the configuration for a topic.
type CreateTopicsRequestTopic ¶
type CreateTopicsRequestTopic struct { Name string `kafka:"CompactString"` NumPartitions uint32 ReplicationFactor uint16 Assignments []CreateTopicsRequestAssignment Configs []CreateTopicsRequestConfig }
CreateTopicsRequestTopic represents the details of a topic to be created.
type CreateTopicsResponse ¶
type CreateTopicsResponse struct { ThrottleTimeMs uint32 Topics []CreateTopicsResponseTopic }
CreateTopicsResponse represents the response to a topic creation request.
type CreateTopicsResponseConfig ¶
type CreateTopicsResponseConfig struct { Name string Value string ReadOnly bool ConfigSource uint8 IsSensitive bool }
CreateTopicsResponseConfig represents a configuration for a topic.
type CreateTopicsResponseTopic ¶
type CreateTopicsResponseTopic struct { Name string `kafka:"CompactString"` TopicID [16]byte ErrorCode uint16 ErrorMessage string `kafka:"CompactString"` NumPartitions uint32 ReplicationFactor uint16 Configs []CreateTopicsResponseConfig }
CreateTopicsResponseTopic represents a topic's creation result.
type DescribeConfigsRequest ¶
type DescribeConfigsRequest struct { Resources []DescribeConfigsResource IncludeSynonyms bool IncludeDocumentation bool }
DescribeConfigsRequest struct represents the request for DescribeConfigs with version 4.
type DescribeConfigsResource ¶
type DescribeConfigsResource struct { ResourceType uint8 ResourceName string `kafka:"CompactString"` ConfigurationKeys []string `kafka:"CompactString"` }
DescribeConfigsResource struct represents the resource inside DescribeConfigsRequest.
type DescribeConfigsResponse ¶
type DescribeConfigsResponse struct { ThrottleTimeMs uint32 Results []DescribeConfigsResponseResult }
DescribeConfigsResponse struct represents the response for DescribeConfigs with version 4.
type DescribeConfigsResponseConfig ¶
type DescribeConfigsResponseConfig struct { Name string Value string `kafka:"CompactNullableString"` ReadOnly bool ConfigSource uint8 IsSensitive bool Synonyms []DescribeConfigsResponseSynonym ConfigType uint Documentation string `kafka:"CompactNullableString"` }
DescribeConfigsResponseConfig struct represents the configuration details within a result.
type DescribeConfigsResponseResult ¶
type DescribeConfigsResponseResult struct { ErrorCode uint16 ErrorMessage string `kafka:"CompactString"` ResourceType uint8 ResourceName string `kafka:"CompactString"` Configs []DescribeConfigsResponseConfig }
DescribeConfigsResponseResult struct represents the result inside DescribeConfigsResponse.
type DescribeConfigsResponseSynonym ¶
type DescribeConfigsResponseSynonym struct { Name string `kafka:"CompactString"` Value string `kafka:"CompactNullableString"` Source uint }
DescribeConfigsResponseSynonym struct represents synonym details within a config.
type FetchPartitionResponse ¶
type FetchPartitionResponse struct { PartitionIndex uint32 ErrorCode uint16 HighWatermark uint64 LastStableOffset uint64 LogStartOffset uint64 AbortedTransactions []AbortedTransaction PreferredReadReplica uint32 Records []byte }
FetchPartitionResponse represents the response for a partition in a fetch request.
type FetchRequest ¶
type FetchRequest struct { ReplicaID uint32 MaxWaitMs uint32 MinBytes uint32 MaxBytes uint32 IsolationLevel uint8 SessionID uint32 SessionEpoch uint32 Topics []FetchRequestTopic ForgottenTopicsData []FetchRequestForgottenTopic RackID string `kafka:"CompactString"` }
FetchRequest represents the details of a FetchRequest (Version: 12).
type FetchRequestForgottenTopic ¶
FetchRequestForgottenTopic represents the forgotten topic data in a FetchRequest.
type FetchRequestPartitionData ¶
type FetchRequestPartitionData struct { PartitionIndex uint32 CurrentLeaderEpoch uint32 FetchOffset uint64 LastFetchedEpoch uint32 LogStartOffset uint64 PartitionMaxBytes uint32 }
FetchRequestPartitionData represents the partition-level data in a FetchRequest.
type FetchRequestTopic ¶
type FetchRequestTopic struct { Name string `kafka:"CompactString"` Partitions []FetchRequestPartitionData }
FetchRequestTopic represents the topic-level data in a FetchRequest.
type FetchResponse ¶
type FetchResponse struct { ThrottleTimeMs uint32 ErrorCode uint16 SessionID uint32 Responses []FetchTopicResponse }
FetchResponse represents the response to a fetch request.
type FetchTopicResponse ¶
type FetchTopicResponse struct { TopicName string `kafka:"CompactString"` Partitions []FetchPartitionResponse }
FetchTopicResponse represents the response for a topic in a fetch request.
type FindCoordinatorRequestV1To3 ¶
type FindCoordinatorRequestV1To3 struct { Key types.NonCompactString KeyType uint8 }
FindCoordinatorRequestV1To3 represents the request for coordinator finding
type FindCoordinatorRequestV4Plus ¶
type FindCoordinatorRequestV4Plus struct { // Key string `kafka:"CompactString"` KeyType uint8 CoordinatorKeys []string }
FindCoordinatorRequestV4Plus represents the request for coordinator finding
type FindCoordinatorResponse ¶
type FindCoordinatorResponse struct { ThrottleTimeMs uint32 Coordinators []FindCoordinatorResponseCoordinator }
FindCoordinatorResponse represents the response for a coordinator finding request.
type FindCoordinatorResponseCoordinator ¶
type FindCoordinatorResponseCoordinator struct { Key string `kafka:"CompactString"` NodeID uint32 Host string `kafka:"CompactString"` Port uint32 ErrorCode uint16 ErrorMessage string `kafka:"CompactString"` }
FindCoordinatorResponseCoordinator represents a coordinator.
type HeartbeatResponse ¶
HeartbeatResponse represents a HearBeat
type InitProducerIDRequest ¶
type InitProducerIDRequest struct { TransactionalID string `kafka:"CompactString"` TransactionTimeoutMs uint32 ProducerID uint64 ProducerEpoch uint16 }
InitProducerIDRequest represents the request for producer ID initialization.
type InitProducerIDResponse ¶
type InitProducerIDResponse struct { ThrottleTimeMs uint32 ErrorCode uint16 ProducerID uint64 ProducerEpoch uint16 }
InitProducerIDResponse represents the response to a producer ID initialization request.
type JoinGroupRequest ¶
type JoinGroupRequest struct { GroupID string `kafka:"CompactString"` SessionTimeoutMs uint32 RebalanceTimeoutMs uint32 MemberID string `kafka:"CompactString"` GroupInstanceID string `kafka:"CompactNullableString"` // Nullable fields are represented as empty strings if not set ProtocolType string `kafka:"CompactString"` Protocols []JoinGroupRequestProtocol Reason string `kafka:"CompactNullableString"` // Nullable fields are represented as empty strings if not set }
JoinGroupRequest represents a JoinGroup request
type JoinGroupRequestProtocol ¶
JoinGroupRequestProtocol represents a protocol in JoinGroupRequest
type JoinGroupResponse ¶
type JoinGroupResponse struct { ThrottleTimeMS uint32 ErrorCode uint16 GenerationID uint32 ProtocolType string `kafka:"CompactString"` ProtocolName string `kafka:"CompactString"` Leader string `kafka:"CompactString"` SkipAssignment bool MemberID string `kafka:"CompactString"` Members []JoinGroupResponseMember }
JoinGroupResponse represents the response to a join group request.
type JoinGroupResponseMember ¶
type JoinGroupResponseMember struct { MemberID string `kafka:"CompactString"` GroupInstanceID string `kafka:"CompactString"` Metadata []byte }
JoinGroupResponseMember represents a member in a join group response.
type ListOffsetsRequest ¶
type ListOffsetsRequest struct { ReplicaID uint32 IsolationLevel uint8 Topics []ListOffsetsRequestTopic }
ListOffsetsRequest represents a request to list offsets for specific partitions.
type ListOffsetsRequestPartition ¶
type ListOffsetsRequestPartition struct { PartitionIndex uint32 CurrentLeaderEpoch uint32 Timestamp uint64 }
ListOffsetsRequestPartition represents a partition in a list offsets request.
type ListOffsetsRequestTopic ¶
type ListOffsetsRequestTopic struct { Name string Partitions []ListOffsetsRequestPartition }
ListOffsetsRequestTopic represents a topic in a list offsets request.
type ListOffsetsResponse ¶
type ListOffsetsResponse struct { ThrottleTimeMs uint32 Topics []ListOffsetsResponseTopic }
ListOffsetsResponse represents the response to a list offsets request.
type ListOffsetsResponsePartition ¶
type ListOffsetsResponsePartition struct { PartitionIndex uint32 ErrorCode uint16 Timestamp uint64 Offset uint64 LeaderEpoch uint32 }
ListOffsetsResponsePartition represents a partition in a list offsets response.
type ListOffsetsResponseTopic ¶
type ListOffsetsResponseTopic struct { Name string `kafka:"CompactString"` Partitions []ListOffsetsResponsePartition }
ListOffsetsResponseTopic represents a topic in a list offsets response.
type MetadataRequest ¶
type MetadataRequest struct { Topics []MetadataRequestTopic AllowAutoTopicCreation bool IncludeTopicAuthorizedOperations bool }
MetadataRequest represents a metadata request.
type MetadataRequestTopic ¶
MetadataRequestTopic represents a topic in the metadata request.
type MetadataResponse ¶
type MetadataResponse struct { ThrottleTimeMs uint32 Brokers []MetadataResponseBroker ClusterID string `kafka:"CompactString"` // nullable ControllerID uint32 Topics []MetadataResponseTopic }
MetadataResponse represents a metadata response with brokers, topics, and more.
type MetadataResponseBroker ¶
type MetadataResponseBroker struct { NodeID uint32 Host string `kafka:"CompactString"` Port uint32 Rack string `kafka:"CompactString"` }
MetadataResponseBroker represents a broker in a metadata response.
type MetadataResponsePartition ¶
type MetadataResponsePartition struct { ErrorCode uint16 PartitionIndex uint32 LeaderID uint32 LeaderEpoch uint32 ReplicaNodes []uint32 IsrNodes []uint32 OfflineReplicas []uint32 }
MetadataResponsePartition represents partition information in a metadata response.
type MetadataResponseTopic ¶
type MetadataResponseTopic struct { ErrorCode uint16 Name string `kafka:"CompactString"` TopicID [16]byte IsInternal bool Partitions []MetadataResponsePartition TopicAuthorizedOperations uint32 }
MetadataResponseTopic represents a topic in the metadata response.
type OffsetCommitRequest ¶
type OffsetCommitRequest struct { GroupID string GenerationIDOrMemberEpoch uint32 MemberID string GroupInstanceID string Topics []OffsetCommitRequestTopic }
OffsetCommitRequest represents a request to commit offsets.
type OffsetCommitRequestPartition ¶
type OffsetCommitRequestPartition struct { PartitionIndex uint32 CommittedOffset uint64 CommittedLeaderEpoch uint32 CommittedMetadata string `kafka:"CompactString"` }
OffsetCommitRequestPartition represents a partition in an offset commit request.
type OffsetCommitRequestTopic ¶
type OffsetCommitRequestTopic struct { Name string Partitions []OffsetCommitRequestPartition }
OffsetCommitRequestTopic represents a topic in an offset commit request.
type OffsetCommitResponse ¶
type OffsetCommitResponse struct { ThrottleTimeMs uint32 Topics []OffsetCommitResponseTopic }
OffsetCommitResponse represents the response to an offset commit request.
type OffsetCommitResponsePartition ¶
OffsetCommitResponsePartition represents a partition in an offset commit response.
type OffsetCommitResponseTopic ¶
type OffsetCommitResponseTopic struct { Name string `kafka:"CompactString"` Partitions []OffsetCommitResponsePartition }
OffsetCommitResponseTopic represents a topic in an offset commit response.
type OffsetFetchGroup ¶
type OffsetFetchGroup struct { GroupID string `kafka:"CompactString"` Topics []OffsetFetchTopic ErrorCode uint16 }
OffsetFetchGroup represents a group in an offset fetch response.
type OffsetFetchPartition ¶
type OffsetFetchPartition struct { PartitionIndex uint32 CommittedOffset uint64 CommittedLeaderEpoch uint32 Metadata string ErrorCode uint16 }
OffsetFetchPartition represents a partition in an offset fetch response.
type OffsetFetchRequest ¶
type OffsetFetchRequest struct { Groups []OffsetFetchRequestGroup RequiresStable bool }
OffsetFetchRequest represents the offset fetch request.
type OffsetFetchRequestGroup ¶
type OffsetFetchRequestGroup struct { GroupID string `kafka:"CompactString"` MemberID string `kafka:"CompactNullableString"` MemberEpoch uint32 Topics []OffsetFetchRequestTopic }
OffsetFetchRequestGroup represents a group in OffsetFetchRequest
type OffsetFetchRequestTopic ¶
type OffsetFetchRequestTopic struct { Name string `kafka:"CompactString"` PartitionIndexes []uint32 }
OffsetFetchRequestTopic represents a topic in OffsetFetchRequestGroup
type OffsetFetchResponse ¶
type OffsetFetchResponse struct { ThrottleTimeMs uint32 Groups []OffsetFetchGroup }
OffsetFetchResponse represents the response to an offset fetch request.
type OffsetFetchTopic ¶
type OffsetFetchTopic struct { Name string Partitions []OffsetFetchPartition ErrorCode uint16 }
OffsetFetchTopic represents a topic in an offset fetch response.
type ProducePartitionResponse ¶
type ProducePartitionResponse struct { Index uint32 ErrorCode uint16 BaseOffset uint64 LogAppendTimeMs uint64 LogStartOffset uint64 RecordErrors []RecordError ErrorMessage string `kafka:"CompactString"` }
ProducePartitionResponse represents the response for a partition in a produce request.
type ProduceRequest ¶
type ProduceRequest struct { TransactionalID string `kafka:"CompactNullableString"` Acks uint16 TimeoutMs uint32 TopicData []ProduceRequestTopicData }
ProduceRequest represents the details of a ProduceRequest.
type ProduceRequestPartitionData ¶
ProduceRequestPartitionData represents the partition data in a ProduceRequest.
type ProduceRequestTopicData ¶
type ProduceRequestTopicData struct { Name string `kafka:"CompactString"` PartitionData []ProduceRequestPartitionData }
ProduceRequestTopicData represents the topic data in a ProduceRequest.
type ProduceResponse ¶
type ProduceResponse struct { ProduceTopicResponses []ProduceTopicResponse ThrottleTimeMs uint32 }
ProduceResponse represents the response to a produce request.
type ProduceTopicResponse ¶
type ProduceTopicResponse struct { Name string `kafka:"CompactString"` ProducePartitionResponses []ProducePartitionResponse }
ProduceTopicResponse represents the response for a topic in a produce request.
type RecordError ¶
RecordError represents an error in a specific batch of records.
type ResourceType ¶
type ResourceType int8
ResourceType represents the kafka resource type (topic, group, etc.)
const ( ResourceTypeUnknown ResourceType = 0 ResourceTypeAny ResourceType = 1 ResourceTypeTopic ResourceType = 2 ResourceTypeGroup ResourceType = 3 ResourceTypeBroker ResourceType = 4 ResourceTypeCluster ResourceType = 4 ResourceTypeTransactionalID ResourceType = 5 ResourceTypeDelegationToken ResourceType = 6 )
type SyncGroupRequest ¶
type SyncGroupRequest struct { GroupID string `kafka:"CompactString"` GenerationID uint32 MemberID string `kafka:"CompactString"` GroupInstanceID string `kafka:"CompactNullableString"` ProtocolType string `kafka:"CompactNullableString"` ProtocolName string `kafka:"CompactNullableString"` Assignments []SyncGroupRequestMember }
SyncGroupRequest represents the details of a SyncGroupRequest.
type SyncGroupRequestMember ¶
SyncGroupRequestMember represents a member and its assignment in the SyncGroupRequest.