lazyAmqp

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 10, 2024 License: Unlicense Imports: 9 Imported by: 0

README

Lazy AMPQ

The library provides a simple and reliable connection interface for rabbitMQ

Features

  • Simple interface
  • Channel pooling
  • Automatic reconnect
  • Easy to use

Example

package main
import (
    "github.com/LydinaPavuh/lazyAmqp"
    "github.com/LydinaPavuh/lazyAmqp/common"
    "github.com/LydinaPavuh/lazyAmqp/connection"
)

func consumeMsg(delivery *amqp.Delivery) {
    fmt.Printf("Consumed 2 msg: %s\n", string(delivery.Body))
    delivery.Ack(false)
}

func main() {
    conf := common.RmqConfig{Url: "amqp://rmuser:rmpassword@127.0.0.1:5672"}
    client := lazyAmqp.NewClient(&conf)
    client.Connect()
    
    // Declare Exchange
    client.ExchangeDeclare("exchange", "fanout", true, false, false, false, false, nil)
    
    // Queue and bind it to exchange
    client.QueueDeclare("queue", true, false, false, false, false, nil)
    client.QueueBind("queue", "", "exchange", false, nil)
    
    // Make consumer
    consumerConf := common.ConsumerConf{Queue: "queue", RetryDelay: time.Second}
    consumer2Cancel, _ := client.Consume(consumerConf, consumeMsg)
    
    // Publishing
    client.PublishText(context.Background(), "queue", "", true, false, "Hello")
    
    // Cancel consumer
    consumer2Cancel(false)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type RmqClient

type RmqClient struct {
	// contains filtered or unexported fields
}

RmqClient Rabbitmq client opens and maintains the connection automatically restores it in case disconnection

func NewClient

func NewClient(conf *common.RmqConfig) *RmqClient

func (*RmqClient) Close

func (client *RmqClient) Close() error

Close cancel all consumers and close active connections

func (*RmqClient) Connect

func (client *RmqClient) Connect() error

Connect Open new connection

func (*RmqClient) ConnectionIsOpen

func (client *RmqClient) ConnectionIsOpen() bool

ConnectionIsOpen Return true if connection is open in current time

func (*RmqClient) Consume added in v1.0.0

func (client *RmqClient) Consume(conf common.ConsumerConf, callback consumer.DeliveryCallback) (cancel func(noWait bool), err error)

func (*RmqClient) CreateConsumer

func (client *RmqClient) CreateConsumer(conf common.ConsumerConf, callback consumer.DeliveryCallback) *consumer.Consumer

CreateConsumer Create new consumer

func (*RmqClient) ExchangeDeclare

func (client *RmqClient) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait, passive bool, args amqp.Table) error

ExchangeDeclare declare new exchange

func (*RmqClient) ExchangeDelete

func (client *RmqClient) ExchangeDelete(name string, ifUnused, noWait bool) error

ExchangeDelete delete exchange

func (*RmqClient) Get

func (client *RmqClient) Get(queue string, autoAck bool) (amqp.Delivery, bool, error)

Get simple message from queue

func (*RmqClient) IsReady

func (client *RmqClient) IsReady() bool

IsReady Return true if client ready to consume and publish messages

func (*RmqClient) PublishBinary

func (client *RmqClient) PublishBinary(ctx context.Context, exchange, key string, mandatory, immediate bool, data []byte) error

PublishBinary publish binary message

func (*RmqClient) PublishJson

func (client *RmqClient) PublishJson(ctx context.Context, exchange, key string, mandatory, immediate bool, obj any) error

PublishJson publish json object

func (*RmqClient) PublishText

func (client *RmqClient) PublishText(ctx context.Context, exchange, key string, mandatory, immediate bool, text string) error

PublishText publish string message

func (*RmqClient) QueueBind

func (client *RmqClient) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error

QueueBind bind queue to exchange

func (*RmqClient) QueueDeclare

func (client *RmqClient) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, passive bool, args amqp.Table) error

QueueDeclare declare new queue

func (*RmqClient) QueueDelete

func (client *RmqClient) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) error

QueueDelete delete queue

func (*RmqClient) QueueUnbind

func (client *RmqClient) QueueUnbind(name, key, exchange string, args amqp.Table) error

QueueUnbind Unbind queue from exchange

func (*RmqClient) RemoveConsumer

func (client *RmqClient) RemoveConsumer(consumer *consumer.Consumer) error

RemoveConsumer Cancel and remove consumer

Directories

Path Synopsis
test_data

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL