kafka

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package kafka provides a real-time Kafka-based interface for PostgreSQL, similar to how PostgREST exposes PostgreSQL over HTTP.

Kafka topic naming conventions: - Case-sensitive, no spaces - Valid chars: alphanumeric, `.`, `-`, `_` - Recommended max length: 249 bytes (to avoid potential issues) - Forward slash (`/`) can be used for logical separation but requires proper escaping

pgo uses `[prefix].[schema_name].[table_name].[operation]` topic pattern to interact with PostgreSQL

Operations: - create (or c): Insert operations - update (or u): Update operations - delete (or d): Delete operations - read (or r): Query operations - truncate (or t): Truncate operations

Examples: - public.users.c → Create user - inventory.products.u → Update product - accounting.invoices.r → Read invoices

Payload: JSON

Message Format: - Key: Unique identifier (e.g., primary key) - Value: JSON payload - Headers: Metadata including timestamp, operation type, etc.

Partitioning Strategy: - Default: Hash partitioning based on primary key - Custom: Can be configured based on specific fields

Query Parameters: [schema_name].[table_name].read.[field].[value] Example: public.users.r.id.123 → read by id 123

Consumer Groups: - Use meaningful names: `[app_name].[purpose]` - Example: myapp.user_updates

Configuration: - Replication Factor: Minimum 2 recommended for production - Number of Partitions: Based on throughput requirements - Retention: Configurable per topic

Use 'prefix.pg' topic for system operations

Note: Ensure proper ACLs are configured for topic access

Index

Constants

This section is empty.

Variables

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client handles produce, consume and ACL-related operations

func NewClient

func NewClient(config *Config, logger *zap.Logger) *Client

NewClient creates a new ACLManager

func (*Client) ConsumeMessages

func (c *Client) ConsumeMessages(consumer sarama.Consumer, topic string, logMsg bool)

ConsumeMessages consumes messages from a topic

func (*Client) CreateACL

func (c *Client) CreateACL(resource sarama.Resource, acl sarama.Acl) error

CreateACL creates a new ACL

func (*Client) CreateConsumer

func (c *Client) CreateConsumer() (sarama.Consumer, error)

CreateConsumer creates a new Consumer

func (*Client) CreateProducer

func (c *Client) CreateProducer() (sarama.SyncProducer, error)

CreateProducer creates a new SyncProducer

func (*Client) CreateTopic

func (c *Client) CreateTopic(topicName string, detail *sarama.TopicDetail) error

CreateTopic creates a new topic

func (*Client) DeleteACL

func (c *Client) DeleteACL(filter sarama.AclFilter) ([]sarama.MatchingAcl, error)

DeleteACL deletes ACLs based on the provided filter

func (*Client) ListAcls

func (c *Client) ListAcls(filter sarama.AclFilter) ([]sarama.ResourceAcls, error)

ListAcls lists ACLs based on the provided filter

func (*Client) ListTopics

func (c *Client) ListTopics() (map[string]sarama.TopicDetail, error)

ListTopics lists all topics

func (*Client) ProduceMessage

func (c *Client) ProduceMessage(producer sarama.SyncProducer, topic string, message []byte) error

ProduceMessage produces a message to a topic

type Config

type Config struct {
	Brokers     []string `json:"brokers"`
	TopicPrefix string   `json:"topicPrefix"`
	Version     string   `json:"version,omitempty"`
	SASL        *SASL    `json:"sasl,omitempty"`
	Partitions  int32    `json:"partitions,omitempty"`
	Replicas    int16    `json:"replicas,omitempty"`
	RetentionMS int64    `json:"retentionMs,omitempty"`
	TLS         TLS
}

Config represents Kafka-specific configuration

func (*Config) GetBrokers

func (c *Config) GetBrokers() []string

GetBrokers returns the list of Kafka brokers

func (*Config) ToSaramaConfig

func (c *Config) ToSaramaConfig() (*sarama.Config, error)

ToSaramaConfig converts the Config to a sarama.Config

type PeerKafka

type PeerKafka struct {
	// contains filtered or unexported fields
}

PeerKafka implements the source and sink for Kafka

func (*PeerKafka) Connect

func (p *PeerKafka) Connect(config json.RawMessage, args ...any) error

func (*PeerKafka) Disconnect

func (p *PeerKafka) Disconnect() error

func (*PeerKafka) Pub

func (p *PeerKafka) Pub(event cdc.Event, args ...any) error

func (*PeerKafka) Sub

func (p *PeerKafka) Sub(args ...any) (<-chan cdc.Event, error)

func (*PeerKafka) Type

func (p *PeerKafka) Type() pipeline.ConnectorType

type SASL

type SASL struct {
	Username  string
	Password  string
	Algorithm string
	Enable    bool
}

SASL represents SASL authentication configuration

type TLS

type TLS struct {
	CertFile   string
	KeyFile    string
	CAFile     string
	Enable     bool
	SkipVerify bool
}

TLS represents TLS configuration

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL