Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewProducer ¶
func NewProducer(manager *KafkaManager, topic string, identity *authnapi.Identity) (*kafkaProducer, error)
NewProducer produces a kafka producer that is bound to a particular topic.
func NewProducerEventsCounter ¶
func NewProducerEventsCounter(meter metric.Meter, histogramName string) (metric.Int64Counter, error)
NewProducerEventsCounter creates a meter for capturing event metrics
Types ¶
type CompletedConfig ¶
type CompletedConfig struct {
// contains filtered or unexported fields
}
type Config ¶
type Config struct {
*Options
// this can be set manually for testing
KafkaConfig *kafka.ConfigMap
}
func (*Config) Complete ¶
func (c *Config) Complete() (CompletedConfig, error)
type KafkaManager ¶
type KafkaManager struct {
Config CompletedConfig
Source string
Protocol *confluent.Protocol
Client cloudevents.Client
Errors <-chan error
Logger *log.Helper
}
func New ¶
func New(config CompletedConfig, source string, logger *log.Helper) (*KafkaManager, error)
func (*KafkaManager) Errs ¶
func (m *KafkaManager) Errs() <-chan error
type Options ¶
type Options struct {
DefaultTopic string `mapstructure:"default-topic"`
BuiltInFeatures string `mapstructure:"builtin-features"`
ClientId string `mapstructure:"client-id"`
//MetadataBrokerList string `mapstructure:"metadata-broker-list"`
BootstrapServers string `mapstructure:"bootstrap-servers"`
MessageMaxBytes int `mapstructure:"message-max-bytes"`
MessageCopyMaxBytes int `mapstructure:"message-copy-max-bytes"`
ReceiveMessageMaxBytes int `mapstructure:"receive-message-max-bytes"`
MaxInFlightRequestsPerConnection int `mapstructure:"max-in-flight-requests-per-connection"`
MaxInFlight int `mapstructure:"max-in-flight"`
TopicMetadataRefreshIntervalMs int `mapstructure:"topic-metadata-refresh-interval-ms"`
MetadataMaxAgeMs int `mapstructure:"metadata-max-age-ms"`
TopicMetadataRefreshFastIntervalMs int `mapstructure:"topic-metadata-refresh-fast-interval-ms"`
TopicMetadataRefreshSparse bool `mapstructure:"topic-metadata-refresh-sparse"`
TopicMetadataPropagationMaxMs int `mapstructure:"topic-metadata-propagation-max-ms"`
TopicBlacklist string `mapstructure:"topic-blacklist"`
Debug string `mapstructure:"debug"`
SocketTimeoutMs int `mapstructure:"socket-timeout-ms"`
SocketSendBufferBytes int `mapstructure:"socket-send-buffer-bytes"`
SocketReceiveBufferBytes int `mapstructure:"socket-receive-buffer-bytes"`
SocketKeepAliveEnable bool `mapstructure:"socket-keepalive-enable"`
SocketNagleDisable bool `mapstructure:"socket-nagle-disable"`
SocketMaxFails int `mapstructure:"socket-max-fails"`
BrokerAddressTtl int `mapstructure:"broker-address-ttl"`
BrokerAddressFamily string `mapstructure:"broker-address-family"`
SocketConnectionSetupTimeoutMs int `mapstructure:"socket-connection-setup-timeout-ms"`
ConnectionsMaxIdleMs int `mapstructure:"connections-max-idle-ms"`
ReconnectBackoffMs int `mapstructure:"reconnect-backoff-ms"`
ReconnectBackoffMaxMs int `mapstructure:"reconnect-backoff-max-ms"`
StatisticsIntervalMs int `mapstructure:"statistics-interval-ms"`
EnabledEvents int `mapstructure:"enabled-events"`
LogLevel int `mapstructure:"log-level"`
LogQueue bool `mapstructure:"log-queue"`
LogThreadName bool `mapstructure:"log-thread-name"`
EnableRandomSeed bool `mapstructure:"enabled-random-seed"`
LogConnectionClose bool `mapstructure:"log-connection-close"`
InternalTerminationSignal int `mapstructure:"internal-termination-signal"`
ApiVersionRequest bool `mapstructure:"api-version-request"`
ApiVersionRequestTimeoutMs int `mapstructure:"api-version-request-timeout-ms"`
ApiVersionVersionFallbackMs int `mapstructure:"api-version-version-fallback-ms"`
BrokerVersionFallback string `mapstructure:"broker-version-fallback"`
AllowAutoCreateTopics bool `mapstructure:"allow-auto-create-topics"`
SecurityProtocol string `mapstructure:"security-protocol"`
SslCipherSuites string `mapstructure:"ssl-cipher-suites"`
SslCurvesList string `mapstructure:"ssl-curves-list"`
SslSigAlgsList string `mapstructure:"ssl-sigalgs-list"`
SslKeyLocation string `mapstructure:"ssl-key-location"`
SslKeyPassword string `mapstructure:"ssl-key-password"`
SslKeyPem string `mapstructure:"ssl-key-pem"`
SslCertificateLocation string `mapstructure:"ssl-certificate-location"`
SslCertificatePem string `mapstructure:"ssl-certificate-pem"`
SslCaLocation string `mapstructure:"ssl-ca-location"`
SslCaPem string `mapstructure:"ssl-ca-pem"`
SslCrlLocation string `mapstructure:"ssl-crl-location"`
SslKeystoreLocation string `mapstructure:"ssl-keystore-location"`
SslKeystorePassword string `mapstructure:"ssl-keystore-password"`
SslProviders string `mapstructure:"ssl-providers"`
SslEngineId string `mapstructure:"ssl-engine-id"`
EnableSslCertificateVerification bool `mapstructure:"enable-ssl-certificate-verification"`
SslEndpointIdentificationAlgorithm string `mapstructure:"ssl-endpoint-identification-algorithm"`
// SaslMechanisms string `mapstructure:"sasl-mechanisms"`
SaslMechanism string `mapstructure:"sasl-mechanism"`
SaslKerberosServiceName string `mapstructure:"sasl-kerberos-service-name"`
SaslKerberosPrincipal string `mapstructure:"sasl-kerberos-principal"`
SaslKerberosKinitCmd string `mapstructure:"sasl-kerberos-kinit-cmd"`
SaslKerberosKeytab string `mapstructure:"sasl-kerberos-keytab"`
SaslKerberosMinTimeBeforeRelogin int `mapstructure:"sasl-kerberos-min-time-before-relogin"`
SaslUsername string `mapstructure:"sasl-username"`
SaslPassword string `mapstructure:"sasl-password"`
SaslOauthBearerConfig string `mapstructure:"sasl-oauthbearer-config"`
EnableSaslOauthBearerUnsecureJwt bool `mapstructure:"enable-sasl-oauthbearer-unsecure-jwt"`
SaslOauthBearerMethod string `mapstructure:"sasl-oauthbearer-method"`
SaslOauthBearerClientId string `mapstructure:"sasl-oauthbearer-client-id"`
SaslOauthBearerClientSecret string `mapstructure:"sasl-oauthbearer-client-secret"`
SaslOauthBearerScope string `mapstructure:"sasl-oauthbearer-scope"`
SaslOauthBearerExtensions string `mapstructure:"sasl-oauthbearer-extensions"`
SaslOauthBearerTokenEndpointUrl string `mapstructure:"sasl-oauthbearer-token-endpoint-url"`
PluginLibraryPaths string `mapstructure:"plugin-library-paths"`
ClientDnsLookup string `mapstructure:"client-dns-lookup"`
EnableMetricsPush bool `mapstructure:"enable-metrics-push"`
ClientRack string `mapstructure:"client-rack"`
RetryBackoffMs int `mapstructure:"retry-backoff-ms"`
RetryBackoffMaxMs int `mapstructure:"retry-backoff-max-ms"`
}
func NewOptions ¶
func NewOptions() *Options
Click to show internal directories.
Click to hide internal directories.