protocol

package
v0.0.0-...-089e9e6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2025 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultNumPartition = 1

DefaultNumPartition represents the num of partitions during creation if unspecified TODO: make configurable

Variables

View Source
var (
	ErrUnknownServerError           = Error{Code: -1, Message: "The server experienced an unexpected error when processing the request.", IsRetriable: false}
	ErrNone                         = Error{Code: 0, Message: "", IsRetriable: false}
	ErrOffsetOutOfRange             = Error{Code: 1, Message: "The requested offset is not within the range of offsets maintained by the server.", IsRetriable: false}
	ErrCorruptMessage               = Error{Code: 2, Message: "This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt.", IsRetriable: true}
	ErrUnknownTopicOrPartition      = Error{Code: 3, Message: "This server does not host this topic-partition.", IsRetriable: true}
	ErrInvalidFetchSize             = Error{Code: 4, Message: "The requested fetch size is invalid.", IsRetriable: false}
	ErrLeaderNotAvailable           = Error{Code: 5, Message: "There is no leader for this topic-partition as we are in the middle of a leadership election.", IsRetriable: true}
	ErrNotLeaderOrFollower          = Error{Code: 6, Message: "For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.", IsRetriable: true}
	ErrRequestTimedOut              = Error{Code: 7, Message: "The request timed out.", IsRetriable: true}
	ErrBrokerNotAvailable           = Error{Code: 8, Message: "The broker is not available.", IsRetriable: false}
	ErrReplicaNotAvailable          = Error{Code: 9, Message: "The replica is not available for the requested topic-partition. Produce/Fetch requests and other requests intended only for the leader or follower return NOT_LEADER_OR_FOLLOWER if the broker is not a replica of the topic-partition.", IsRetriable: true}
	ErrMessageTooLarge              = Error{Code: 10, Message: "The request included a message larger than the max message size the server will accept.", IsRetriable: false}
	ErrStaleControllerEpoch         = Error{Code: 11, Message: "The controller moved to another broker.", IsRetriable: false}
	ErrOffsetMetadataTooLarge       = Error{Code: 12, Message: "The metadata field of the offset request was too large.", IsRetriable: false}
	ErrNetworkException             = Error{Code: 13, Message: "The server disconnected before a response was received.", IsRetriable: true}
	ErrCoordinatorLoadInProgress    = Error{Code: 14, Message: "The coordinator is loading and hence can't process requests.", IsRetriable: true}
	ErrCoordinatorNotAvailable      = Error{Code: 15, Message: "The coordinator is not available.", IsRetriable: true}
	ErrNotCoordinator               = Error{Code: 16, Message: "This is not the correct coordinator.", IsRetriable: true}
	ErrInvalidTopicException        = Error{Code: 17, Message: "The request attempted to perform an operation on an invalid topic.", IsRetriable: false}
	ErrRecordListTooLarge           = Error{Code: 18, Message: "The request included message batch larger than the configured segment size on the server.", IsRetriable: false}
	ErrNotEnoughReplicas            = Error{Code: 19, Message: "Messages are rejected since there are fewer in-sync replicas than required.", IsRetriable: true}
	ErrNotEnoughReplicasAfterAppend = Error{Code: 20, Message: "Messages are written to the log, but to fewer in-sync replicas than required.", IsRetriable: true}
	ErrInvalidRequiredAcks          = Error{Code: 21, Message: "Produce request specified an invalid value for required acks.", IsRetriable: false}
	ErrIllegalGeneration            = Error{Code: 22, Message: "Specified group generation id is not valid.", IsRetriable: false}
	ErrInconsistentGroupProtocol    = Error{Code: 23, Message: "The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.", IsRetriable: false}
	ErrInvalidGroupID               = Error{Code: 24, Message: "The configured groupId is invalid.", IsRetriable: false}
	ErrUnknownMemberID              = Error{Code: 25, Message: "The coordinator is not aware of this member.", IsRetriable: false}
	ErrInvalidSessionTimeout        = Error{Code: 26, Message: "The session timeout is not within the range allowed by the broker (as configured by group.min.session.timeout.ms and group.max.session.timeout.ms).", IsRetriable: false}
	ErrRebalanceInProgress          = Error{Code: 27, Message: "The group is rebalancing, so a rejoin is needed.", IsRetriable: false}
	ErrInvalidCommitOffsetSize      = Error{Code: 28, Message: "The committing offset data size is not valid.", IsRetriable: false}
	ErrTopicAuthorizationFailed     = Error{Code: 29, Message: "Topic authorization failed.", IsRetriable: false}
	ErrGroupAuthorizationFailed     = Error{Code: 30, Message: "Group authorization failed.", IsRetriable: false}
	ErrClusterAuthorizationFailed   = Error{Code: 31, Message: "Cluster authorization failed.", IsRetriable: false}
	ErrInvalidTimestamp             = Error{Code: 32, Message: "The timestamp of the message is out of acceptable range.", IsRetriable: false}
	ErrUnsupportedSaslMechanism     = Error{Code: 33, Message: "The broker does not support the requested SASL mechanism.", IsRetriable: false}
	ErrIllegalSaslState             = Error{Code: 34, Message: "Request is not valid given the current SASL state.", IsRetriable: false}
	ErrUnsupportedVersion           = Error{Code: 35, Message: "The version of API is not supported.", IsRetriable: false}
	ErrTopicAlreadyExists           = Error{Code: 36, Message: "Topic with this name already exists.", IsRetriable: false}
	ErrInvalidPartitions            = Error{Code: 37, Message: "Number of partitions is below 1.", IsRetriable: false}
	ErrInvalidReplicationFactor     = Error{Code: 38, Message: "Replication factor is below 1 or larger than the number of available brokers.", IsRetriable: false}
	ErrInvalidReplicaAssignment     = Error{Code: 39, Message: "Replica assignment is invalid.", IsRetriable: false}
	ErrInvalidConfig                = Error{Code: 40, Message: "Configuration is invalid.", IsRetriable: false}
	ErrNotController                = Error{Code: 41, Message: "This is not the correct controller for this cluster.", IsRetriable: true}
)

Define each error as a variable of type Error

View Source
var (
	ListOffsetsEarliestTimestamp = -2
	ListOffsetsLatestTimestamp   = -1
	ListOffsetsMaxTimestamp      = -3
)

Constants for list offsets timestamps.

View Source
var ClusterID = "MONKAFKA-CLUSTER"

ClusterID should be configurable?

View Source
var ConsumerOffsetsTopic = "__consumer-offsets"

ConsumerOffsetsTopic is the topic where consumers committed offsets are saved

ErrorMap associates error codes with corresponding Error structs

View Source
var MinusOne int = -1

MinusOne used through variable because setting it as uint directly is rejected

Functions

func GetCommittedOffset

func GetCommittedOffset(groupID string, topic string, partition uint32) int64

GetCommittedOffset returns the committed offset if it exists, otherwise -1

func LoadGroupMetadataState

func LoadGroupMetadataState()

LoadGroupMetadataState from __consumer-offsets metadata

func UpdateGroupMetadataState

func UpdateGroupMetadataState(recordBatchBytes []byte)

UpdateGroupMetadataState given a __consumer-offsets record, updates the state accordingly

Types

type APIKey

type APIKey struct {
	APIKey     uint16
	MinVersion uint16
	MaxVersion uint16
}

APIKey represents an API key and its supported version range.

type APIKeyHandler

type APIKeyHandler struct {
	Name    string
	Handler func(req types.Request) []byte
}

APIKeyHandler represents a kafka api key with its handler

type APIVersionsResponse

type APIVersionsResponse struct {
	ErrorCode    uint16
	APIKeys      []APIKey
	ThrottleTime uint32
}

APIVersionsResponse represents the response for API versions request.

type AbortedTransaction

type AbortedTransaction struct {
	ProducerID  uint64
	FirstOffset uint64
}

AbortedTransaction represents an aborted transaction in the fetch response.

type Broker

type Broker struct {
	Config         *types.Configuration
	ShutDownSignal chan bool
	Serf           *serf.Serf  // Serf cluster maintained inside the DC
	Raft           *hraft.Raft // Raft cluster maintained inside the DC
	FSM            *raft.FSM

	RaftNotifyCh <-chan bool // raftNotifyCh ensures that we get reliable leader transition notifications from the Raft layer.

	SerfEventCh chan serf.Event  // eventCh is used to receive events from the serf cluster
	ReconcileCh chan serf.Member //  used to pass events from the serf handler into the leader manager to update the strong state
}

Broker represents a Kafka broker instance

func NewBroker

func NewBroker(config *types.Configuration) *Broker

NewBroker creates a new Broker instance with the provided configuration

func (*Broker) APIDispatcher

func (b *Broker) APIDispatcher(requestAPIKey uint16) APIKeyHandler

APIDispatcher maps the Request key to its handler

func (*Broker) AppendRaftEntry

func (b *Broker) AppendRaftEntry(entryType raft.CommandType, entry any) (any, error)

AppendRaftEntry add a new entry to the raft log

func (*Broker) CreateTopicPartitions

func (b *Broker) CreateTopicPartitions(name string, numPartitions uint32, configs map[string]string) error

CreateTopicPartitions creates a new topic with its partition by appending them to the raft log.

func (*Broker) GetClusterNodes

func (b *Broker) GetClusterNodes() ([]*types.Node, error)

GetClusterNodes returns the raft cluster nodes each representing a broker

func (*Broker) HandleConnection

func (b *Broker) HandleConnection(conn net.Conn)

HandleConnection processes incoming requests from a client connection

func (*Broker) IsController

func (b *Broker) IsController() bool

IsController return if the broker is the cluster's controller which is also the raft leader

func (*Broker) SetupRaft

func (b *Broker) SetupRaft() error

SetupRaft inits Raft for the broker

func (*Broker) SetupSerf

func (b *Broker) SetupSerf() error

SetupSerf to setup the serf agent and maybe join a serf cluster

func (*Broker) Shutdown

func (b *Broker) Shutdown()

Shutdown gracefully shuts down the broker and its components

func (*Broker) Startup

func (b *Broker) Startup()

Startup initializes the broker, starts the storage, loads group metadata, and listens for incoming connections

type CreateTopicsRequest

type CreateTopicsRequest struct {
	Topics       []CreateTopicsRequestTopic
	TimeoutMs    uint32
	ValidateOnly bool
}

CreateTopicsRequest represents the Kafka request to create topics.

type CreateTopicsRequestAssignment

type CreateTopicsRequestAssignment struct {
	PartitionIndex uint32
	BrokerIds      []uint32
}

CreateTopicsRequestAssignment represents the partition assignments for a topic.

type CreateTopicsRequestConfig

type CreateTopicsRequestConfig struct {
	Name  string `kafka:"CompactString"`
	Value string `kafka:"CompactNullableString"`
}

CreateTopicsRequestConfig represents the configuration for a topic.

type CreateTopicsRequestTopic

type CreateTopicsRequestTopic struct {
	Name              string `kafka:"CompactString"`
	NumPartitions     uint32
	ReplicationFactor uint16
	Assignments       []CreateTopicsRequestAssignment
	Configs           []CreateTopicsRequestConfig
}

CreateTopicsRequestTopic represents the details of a topic to be created.

type CreateTopicsResponse

type CreateTopicsResponse struct {
	ThrottleTimeMs uint32
	Topics         []CreateTopicsResponseTopic
}

CreateTopicsResponse represents the response to a topic creation request.

type CreateTopicsResponseConfig

type CreateTopicsResponseConfig struct {
	Name         string
	Value        string
	ReadOnly     bool
	ConfigSource uint8
	IsSensitive  bool
}

CreateTopicsResponseConfig represents a configuration for a topic.

type CreateTopicsResponseTopic

type CreateTopicsResponseTopic struct {
	Name              string `kafka:"CompactString"`
	TopicID           [16]byte
	ErrorCode         uint16
	ErrorMessage      string `kafka:"CompactString"`
	NumPartitions     uint32
	ReplicationFactor uint16
	Configs           []CreateTopicsResponseConfig
}

CreateTopicsResponseTopic represents a topic's creation result.

type DescribeConfigsRequest

type DescribeConfigsRequest struct {
	Resources            []DescribeConfigsResource
	IncludeSynonyms      bool
	IncludeDocumentation bool
}

DescribeConfigsRequest struct represents the request for DescribeConfigs with version 4.

type DescribeConfigsResource

type DescribeConfigsResource struct {
	ResourceType      uint8
	ResourceName      string   `kafka:"CompactString"`
	ConfigurationKeys []string `kafka:"CompactString"`
}

DescribeConfigsResource struct represents the resource inside DescribeConfigsRequest.

type DescribeConfigsResponse

type DescribeConfigsResponse struct {
	ThrottleTimeMs uint32
	Results        []DescribeConfigsResponseResult
}

DescribeConfigsResponse struct represents the response for DescribeConfigs with version 4.

type DescribeConfigsResponseConfig

type DescribeConfigsResponseConfig struct {
	Name          string
	Value         string `kafka:"CompactNullableString"`
	ReadOnly      bool
	ConfigSource  uint8
	IsSensitive   bool
	Synonyms      []DescribeConfigsResponseSynonym
	ConfigType    uint
	Documentation string `kafka:"CompactNullableString"`
}

DescribeConfigsResponseConfig struct represents the configuration details within a result.

type DescribeConfigsResponseResult

type DescribeConfigsResponseResult struct {
	ErrorCode    uint16
	ErrorMessage string `kafka:"CompactString"`
	ResourceType uint8
	ResourceName string `kafka:"CompactString"`
	Configs      []DescribeConfigsResponseConfig
}

DescribeConfigsResponseResult struct represents the result inside DescribeConfigsResponse.

type DescribeConfigsResponseSynonym

type DescribeConfigsResponseSynonym struct {
	Name   string `kafka:"CompactString"`
	Value  string `kafka:"CompactNullableString"`
	Source uint
}

DescribeConfigsResponseSynonym struct represents synonym details within a config.

type Error

type Error struct {
	Code        int16
	Message     string
	IsRetriable bool
}

Error is a struct to hold the code, message, and retriability status

type FetchPartitionResponse

type FetchPartitionResponse struct {
	PartitionIndex       uint32
	ErrorCode            uint16
	HighWatermark        uint64
	LastStableOffset     uint64
	LogStartOffset       uint64
	AbortedTransactions  []AbortedTransaction
	PreferredReadReplica uint32
	Records              []byte
}

FetchPartitionResponse represents the response for a partition in a fetch request.

type FetchRequest

type FetchRequest struct {
	ReplicaID           uint32
	MaxWaitMs           uint32
	MinBytes            uint32
	MaxBytes            uint32
	IsolationLevel      uint8
	SessionID           uint32
	SessionEpoch        uint32
	Topics              []FetchRequestTopic
	ForgottenTopicsData []FetchRequestForgottenTopic
	RackID              string `kafka:"CompactString"`
}

FetchRequest represents the details of a FetchRequest (Version: 12).

type FetchRequestForgottenTopic

type FetchRequestForgottenTopic struct {
	Topic      string `kafka:"CompactString"`
	Partitions []uint32
}

FetchRequestForgottenTopic represents the forgotten topic data in a FetchRequest.

type FetchRequestPartitionData

type FetchRequestPartitionData struct {
	PartitionIndex     uint32
	CurrentLeaderEpoch uint32
	FetchOffset        uint64
	LastFetchedEpoch   uint32
	LogStartOffset     uint64
	PartitionMaxBytes  uint32
}

FetchRequestPartitionData represents the partition-level data in a FetchRequest.

type FetchRequestTopic

type FetchRequestTopic struct {
	Name       string `kafka:"CompactString"`
	Partitions []FetchRequestPartitionData
}

FetchRequestTopic represents the topic-level data in a FetchRequest.

type FetchResponse

type FetchResponse struct {
	ThrottleTimeMs uint32
	ErrorCode      uint16
	SessionID      uint32
	Responses      []FetchTopicResponse
}

FetchResponse represents the response to a fetch request.

type FetchTopicResponse

type FetchTopicResponse struct {
	TopicName  string `kafka:"CompactString"`
	Partitions []FetchPartitionResponse
}

FetchTopicResponse represents the response for a topic in a fetch request.

type FindCoordinatorRequestV1To3

type FindCoordinatorRequestV1To3 struct {
	Key     types.NonCompactString
	KeyType uint8
}

FindCoordinatorRequestV1To3 represents the request for coordinator finding

type FindCoordinatorRequestV4Plus

type FindCoordinatorRequestV4Plus struct {
	// Key     string `kafka:"CompactString"`
	KeyType         uint8
	CoordinatorKeys []string
}

FindCoordinatorRequestV4Plus represents the request for coordinator finding

type FindCoordinatorResponse

type FindCoordinatorResponse struct {
	ThrottleTimeMs uint32
	Coordinators   []FindCoordinatorResponseCoordinator
}

FindCoordinatorResponse represents the response for a coordinator finding request.

type FindCoordinatorResponseCoordinator

type FindCoordinatorResponseCoordinator struct {
	Key          string `kafka:"CompactString"`
	NodeID       uint32
	Host         string `kafka:"CompactString"`
	Port         uint32
	ErrorCode    uint16
	ErrorMessage string `kafka:"CompactString"`
}

FindCoordinatorResponseCoordinator represents a coordinator.

type HeartbeatResponse

type HeartbeatResponse struct {
	ThrottleTimeMs uint32
	ErrorCode      uint16
}

HeartbeatResponse represents a HearBeat

type InitProducerIDRequest

type InitProducerIDRequest struct {
	TransactionalID      string `kafka:"CompactString"`
	TransactionTimeoutMs uint32
	ProducerID           uint64
	ProducerEpoch        uint16
}

InitProducerIDRequest represents the request for producer ID initialization.

type InitProducerIDResponse

type InitProducerIDResponse struct {
	ThrottleTimeMs uint32
	ErrorCode      uint16
	ProducerID     uint64
	ProducerEpoch  uint16
}

InitProducerIDResponse represents the response to a producer ID initialization request.

type JoinGroupRequest

type JoinGroupRequest struct {
	GroupID            string `kafka:"CompactString"`
	SessionTimeoutMs   uint32
	RebalanceTimeoutMs uint32
	MemberID           string `kafka:"CompactString"`
	GroupInstanceID    string `kafka:"CompactNullableString"` // Nullable fields are represented as empty strings if not set
	ProtocolType       string `kafka:"CompactString"`
	Protocols          []JoinGroupRequestProtocol
	Reason             string `kafka:"CompactNullableString"` // Nullable fields are represented as empty strings if not set
}

JoinGroupRequest represents a JoinGroup request

type JoinGroupRequestProtocol

type JoinGroupRequestProtocol struct {
	Name     string `kafka:"CompactString"`
	Metadata []byte
}

JoinGroupRequestProtocol represents a protocol in JoinGroupRequest

type JoinGroupResponse

type JoinGroupResponse struct {
	ThrottleTimeMS uint32
	ErrorCode      uint16
	GenerationID   uint32
	ProtocolType   string `kafka:"CompactString"`
	ProtocolName   string `kafka:"CompactString"`
	Leader         string `kafka:"CompactString"`
	SkipAssignment bool
	MemberID       string `kafka:"CompactString"`
	Members        []JoinGroupResponseMember
}

JoinGroupResponse represents the response to a join group request.

type JoinGroupResponseMember

type JoinGroupResponseMember struct {
	MemberID        string `kafka:"CompactString"`
	GroupInstanceID string `kafka:"CompactString"`
	Metadata        []byte
}

JoinGroupResponseMember represents a member in a join group response.

type ListOffsetsRequest

type ListOffsetsRequest struct {
	ReplicaID      uint32
	IsolationLevel uint8
	Topics         []ListOffsetsRequestTopic
}

ListOffsetsRequest represents a request to list offsets for specific partitions.

type ListOffsetsRequestPartition

type ListOffsetsRequestPartition struct {
	PartitionIndex     uint32
	CurrentLeaderEpoch uint32
	Timestamp          uint64
}

ListOffsetsRequestPartition represents a partition in a list offsets request.

type ListOffsetsRequestTopic

type ListOffsetsRequestTopic struct {
	Name       string
	Partitions []ListOffsetsRequestPartition
}

ListOffsetsRequestTopic represents a topic in a list offsets request.

type ListOffsetsResponse

type ListOffsetsResponse struct {
	ThrottleTimeMs uint32
	Topics         []ListOffsetsResponseTopic
}

ListOffsetsResponse represents the response to a list offsets request.

type ListOffsetsResponsePartition

type ListOffsetsResponsePartition struct {
	PartitionIndex uint32
	ErrorCode      uint16
	Timestamp      uint64
	Offset         uint64
	LeaderEpoch    uint32
}

ListOffsetsResponsePartition represents a partition in a list offsets response.

type ListOffsetsResponseTopic

type ListOffsetsResponseTopic struct {
	Name       string `kafka:"CompactString"`
	Partitions []ListOffsetsResponsePartition
}

ListOffsetsResponseTopic represents a topic in a list offsets response.

type MetadataRequest

type MetadataRequest struct {
	Topics                           []MetadataRequestTopic
	AllowAutoTopicCreation           bool
	IncludeTopicAuthorizedOperations bool
}

MetadataRequest represents a metadata request.

type MetadataRequestTopic

type MetadataRequestTopic struct {
	TopicID [16]byte
	Name    string `kafka:"CompactString"`
}

MetadataRequestTopic represents a topic in the metadata request.

type MetadataResponse

type MetadataResponse struct {
	ThrottleTimeMs uint32
	Brokers        []MetadataResponseBroker
	ClusterID      string `kafka:"CompactString"` // nullable
	ControllerID   uint32
	Topics         []MetadataResponseTopic
}

MetadataResponse represents a metadata response with brokers, topics, and more.

type MetadataResponseBroker

type MetadataResponseBroker struct {
	NodeID uint32
	Host   string `kafka:"CompactString"`
	Port   uint32
	Rack   string `kafka:"CompactString"`
}

MetadataResponseBroker represents a broker in a metadata response.

type MetadataResponsePartition

type MetadataResponsePartition struct {
	ErrorCode       uint16
	PartitionIndex  uint32
	LeaderID        uint32
	LeaderEpoch     uint32
	ReplicaNodes    []uint32
	IsrNodes        []uint32
	OfflineReplicas []uint32
}

MetadataResponsePartition represents partition information in a metadata response.

type MetadataResponseTopic

type MetadataResponseTopic struct {
	ErrorCode                 uint16
	Name                      string `kafka:"CompactString"`
	TopicID                   [16]byte
	IsInternal                bool
	Partitions                []MetadataResponsePartition
	TopicAuthorizedOperations uint32
}

MetadataResponseTopic represents a topic in the metadata response.

type OffsetCommitRequest

type OffsetCommitRequest struct {
	GroupID                   string
	GenerationIDOrMemberEpoch uint32
	MemberID                  string
	GroupInstanceID           string
	Topics                    []OffsetCommitRequestTopic
}

OffsetCommitRequest represents a request to commit offsets.

type OffsetCommitRequestPartition

type OffsetCommitRequestPartition struct {
	PartitionIndex       uint32
	CommittedOffset      uint64
	CommittedLeaderEpoch uint32
	CommittedMetadata    string `kafka:"CompactString"`
}

OffsetCommitRequestPartition represents a partition in an offset commit request.

type OffsetCommitRequestTopic

type OffsetCommitRequestTopic struct {
	Name       string
	Partitions []OffsetCommitRequestPartition
}

OffsetCommitRequestTopic represents a topic in an offset commit request.

type OffsetCommitResponse

type OffsetCommitResponse struct {
	ThrottleTimeMs uint32
	Topics         []OffsetCommitResponseTopic
}

OffsetCommitResponse represents the response to an offset commit request.

type OffsetCommitResponsePartition

type OffsetCommitResponsePartition struct {
	PartitionIndex uint32
	ErrorCode      uint16
}

OffsetCommitResponsePartition represents a partition in an offset commit response.

type OffsetCommitResponseTopic

type OffsetCommitResponseTopic struct {
	Name       string `kafka:"CompactString"`
	Partitions []OffsetCommitResponsePartition
}

OffsetCommitResponseTopic represents a topic in an offset commit response.

type OffsetFetchGroup

type OffsetFetchGroup struct {
	GroupID   string `kafka:"CompactString"`
	Topics    []OffsetFetchTopic
	ErrorCode uint16
}

OffsetFetchGroup represents a group in an offset fetch response.

type OffsetFetchPartition

type OffsetFetchPartition struct {
	PartitionIndex       uint32
	CommittedOffset      uint64
	CommittedLeaderEpoch uint32
	Metadata             string
	ErrorCode            uint16
}

OffsetFetchPartition represents a partition in an offset fetch response.

type OffsetFetchRequest

type OffsetFetchRequest struct {
	Groups         []OffsetFetchRequestGroup
	RequiresStable bool
}

OffsetFetchRequest represents the offset fetch request.

type OffsetFetchRequestGroup

type OffsetFetchRequestGroup struct {
	GroupID     string `kafka:"CompactString"`
	MemberID    string `kafka:"CompactNullableString"`
	MemberEpoch uint32
	Topics      []OffsetFetchRequestTopic
}

OffsetFetchRequestGroup represents a group in OffsetFetchRequest

type OffsetFetchRequestTopic

type OffsetFetchRequestTopic struct {
	Name             string `kafka:"CompactString"`
	PartitionIndexes []uint32
}

OffsetFetchRequestTopic represents a topic in OffsetFetchRequestGroup

type OffsetFetchResponse

type OffsetFetchResponse struct {
	ThrottleTimeMs uint32
	Groups         []OffsetFetchGroup
}

OffsetFetchResponse represents the response to an offset fetch request.

type OffsetFetchTopic

type OffsetFetchTopic struct {
	Name       string
	Partitions []OffsetFetchPartition
	ErrorCode  uint16
}

OffsetFetchTopic represents a topic in an offset fetch response.

type ProducePartitionResponse

type ProducePartitionResponse struct {
	Index           uint32
	ErrorCode       uint16
	BaseOffset      uint64
	LogAppendTimeMs uint64
	LogStartOffset  uint64
	RecordErrors    []RecordError
	ErrorMessage    string `kafka:"CompactString"`
}

ProducePartitionResponse represents the response for a partition in a produce request.

type ProduceRequest

type ProduceRequest struct {
	TransactionalID string `kafka:"CompactNullableString"`
	Acks            uint16
	TimeoutMs       uint32
	TopicData       []ProduceRequestTopicData
}

ProduceRequest represents the details of a ProduceRequest.

type ProduceRequestPartitionData

type ProduceRequestPartitionData struct {
	Index   uint32
	Records []byte
}

ProduceRequestPartitionData represents the partition data in a ProduceRequest.

type ProduceRequestTopicData

type ProduceRequestTopicData struct {
	Name          string `kafka:"CompactString"`
	PartitionData []ProduceRequestPartitionData
}

ProduceRequestTopicData represents the topic data in a ProduceRequest.

type ProduceResponse

type ProduceResponse struct {
	ProduceTopicResponses []ProduceTopicResponse
	ThrottleTimeMs        uint32
}

ProduceResponse represents the response to a produce request.

type ProduceTopicResponse

type ProduceTopicResponse struct {
	Name                      string `kafka:"CompactString"`
	ProducePartitionResponses []ProducePartitionResponse
}

ProduceTopicResponse represents the response for a topic in a produce request.

type RecordError

type RecordError struct {
	BatchIndex             uint32
	BatchIndexErrorMessage string // compact_nullable
}

RecordError represents an error in a specific batch of records.

type ResourceType

type ResourceType int8

ResourceType represents the kafka resource type (topic, group, etc.)

const (
	ResourceTypeUnknown         ResourceType = 0
	ResourceTypeAny             ResourceType = 1
	ResourceTypeTopic           ResourceType = 2
	ResourceTypeGroup           ResourceType = 3
	ResourceTypeBroker          ResourceType = 4
	ResourceTypeCluster         ResourceType = 4
	ResourceTypeTransactionalID ResourceType = 5
	ResourceTypeDelegationToken ResourceType = 6
)

https://github.com/apache/kafka/blob/c6335c2ae86913954d940036917b7556e9ac0460/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java#L31

type SyncGroupRequest

type SyncGroupRequest struct {
	GroupID         string `kafka:"CompactString"`
	GenerationID    uint32
	MemberID        string `kafka:"CompactString"`
	GroupInstanceID string `kafka:"CompactNullableString"`
	ProtocolType    string `kafka:"CompactNullableString"`
	ProtocolName    string `kafka:"CompactNullableString"`
	Assignments     []SyncGroupRequestMember
}

SyncGroupRequest represents the details of a SyncGroupRequest.

type SyncGroupRequestMember

type SyncGroupRequestMember struct {
	MemberID   string `kafka:"CompactString"`
	Assignment []byte
}

SyncGroupRequestMember represents a member and its assignment in the SyncGroupRequest.

type SyncGroupResponse

type SyncGroupResponse struct {
	ThrottleTimeMs  uint32
	ErrorCode       uint16
	ProtocolType    string `kafka:"CompactString"`
	ProtocolName    string `kafka:"CompactString"`
	AssignmentBytes []byte
}

SyncGroupResponse represents a SyncGroup

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL