Documentation
¶
Overview ¶
Package kafka provides a real-time Kafka-based interface for PostgreSQL, similar to how PostgREST exposes PostgreSQL over HTTP.
Kafka topic naming conventions: - Case-sensitive, no spaces - Valid chars: alphanumeric, `.`, `-`, `_` - Recommended max length: 249 bytes (to avoid potential issues) - Forward slash (`/`) can be used for logical separation but requires proper escaping
pgo uses `[prefix].[schema_name].[table_name].[operation]` topic pattern to interact with PostgreSQL
Operations: - create (or c): Insert operations - update (or u): Update operations - delete (or d): Delete operations - read (or r): Query operations - truncate (or t): Truncate operations
Examples: - public.users.c → Create user - inventory.products.u → Update product - accounting.invoices.r → Read invoices
Payload: JSON
Message Format: - Key: Unique identifier (e.g., primary key) - Value: JSON payload - Headers: Metadata including timestamp, operation type, etc.
Partitioning Strategy: - Default: Hash partitioning based on primary key - Custom: Can be configured based on specific fields
Query Parameters: [schema_name].[table_name].read.[field].[value] Example: public.users.r.id.123 → read by id 123
Consumer Groups: - Use meaningful names: `[app_name].[purpose]` - Example: myapp.user_updates
Configuration: - Replication Factor: Minimum 2 recommended for production - Number of Partitions: Based on throughput requirements - Retention: Configurable per topic
Use 'prefix.pg' topic for system operations
Note: Ensure proper ACLs are configured for topic access
Index ¶
- Variables
- type Client
- func (c *Client) ConsumeMessages(consumer sarama.Consumer, topic string, logMsg bool)
- func (c *Client) CreateACL(resource sarama.Resource, acl sarama.Acl) error
- func (c *Client) CreateConsumer() (sarama.Consumer, error)
- func (c *Client) CreateProducer() (sarama.SyncProducer, error)
- func (c *Client) CreateTopic(topicName string, detail *sarama.TopicDetail) error
- func (c *Client) DeleteACL(filter sarama.AclFilter) ([]sarama.MatchingAcl, error)
- func (c *Client) ListAcls(filter sarama.AclFilter) ([]sarama.ResourceAcls, error)
- func (c *Client) ListTopics() (map[string]sarama.TopicDetail, error)
- func (c *Client) ProduceMessage(producer sarama.SyncProducer, topic string, message []byte) error
- type Config
- type PeerKafka
- type SASL
- type TLS
- type XDGSCRAMClient
Constants ¶
This section is empty.
Variables ¶
var ( SHA256 scram.HashGeneratorFcn = sha256.New SHA512 scram.HashGeneratorFcn = sha512.New )
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client handles produce, consume and ACL-related operations
func (*Client) ConsumeMessages ¶
ConsumeMessages consumes messages from a topic
func (*Client) CreateConsumer ¶
CreateConsumer creates a new Consumer
func (*Client) CreateProducer ¶
func (c *Client) CreateProducer() (sarama.SyncProducer, error)
CreateProducer creates a new SyncProducer
func (*Client) CreateTopic ¶
func (c *Client) CreateTopic(topicName string, detail *sarama.TopicDetail) error
CreateTopic creates a new topic
func (*Client) ListTopics ¶
func (c *Client) ListTopics() (map[string]sarama.TopicDetail, error)
ListTopics lists all topics
func (*Client) ProduceMessage ¶
ProduceMessage produces a message to a topic
type Config ¶
type Config struct { Brokers []string `json:"brokers"` TopicPrefix string `json:"topicPrefix"` Version string `json:"version,omitempty"` SASL *SASL `json:"sasl,omitempty"` Partitions int32 `json:"partitions,omitempty"` Replicas int16 `json:"replicas,omitempty"` RetentionMS int64 `json:"retentionMs,omitempty"` TLS TLS }
Config represents Kafka-specific configuration
func (*Config) GetBrokers ¶
GetBrokers returns the list of Kafka brokers
type PeerKafka ¶
type PeerKafka struct {
// contains filtered or unexported fields
}
PeerKafka implements the source and sink for Kafka
func (*PeerKafka) Disconnect ¶
func (*PeerKafka) Type ¶
func (p *PeerKafka) Type() pipeline.ConnectorType
type XDGSCRAMClient ¶
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool