Documentation
¶
Overview ¶
Package client provides a client for interacting with the ARN service.
NOTE: AKS engineers: If you trying to send node/pod/etc information, you are likely duplicating work that is already done. Please contact AKS runtime engineering (under the node team) for more information.
Allows you to run in two modes: - Synchronous: Use Notify() to send a notification and block until it is sent. - Asynchronous: Use Async() to send notifications without blocking for the result. This will not block until the send channel is full.
The Asynchronous mode provides the concept of a promise. You can send a notification and get a promise back that will be resolved when the notification is sent or an error occurs.
If you do not want a promise, no notification will be sent, but errors will be recorded to an errors channel that you can receive from with ARN.Errors(). This is useful for logging purposes. Errors are dropped when the channel fills, so you are not required to listen on this channel.
These features allow you to make decisions for your service on how important the accuracy of information is where notifications are taking excess time, the ARN service is down, the network is congested, etc.
Example - boilerplate that is needed on AKS to make ARN connections:
// You would need to customize these for yourself. You need an ARN endpoint from the ARN team along with // associated credentials. var ( arnEndpoint = flag.String("arnEndpoint", "https://ms-containerservice-df.receiver.arn-df.core.windows.net", "The ARN endpoint to use") storageAccount = flag.String("storageAccount", "https://accountname.blob.core.windows.net", "The storage account to use") location = flag.String("location", "westus2", "The location of the resource, like eastus") msid = flag.String("msid", "/subscriptions/26fe00f8-9173-4872-9134-bb1d2e00343a/resourcegroups/aksarntest/providers/Microsoft.ManagedIdentity/userAssignedIdentities/aksarnidentity", "The managed identity ID to use") ) // aaa is a helper function to get the k8s clientset and managed identity credential. func aaa() (*kubernetes.Clientset, *azidentity.ManagedIdentityCredential, error) { // If you are not sending notifications from K8, then you would not need this. clientSet, err := k8Clientset() if err != nil { return nil, nil, err } msiCred, err := msiCred() if err != nil { return nil, nil, err } return clientSet, msiCred, nil } // k8Clientset returns a kubernetes clientset. func k8Clientset() (*kubernetes.Clientset, error) { var kubeconfig string if home := homedir.HomeDir(); home != "" { kubeconfig = filepath.Join(home, ".kube", "config") } else { kubeconfig = "" } config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { return nil, err } return kubernetes.NewForConfig(config) } // msiCred returns a managed identity credential. func msiCred() (*azidentity.ManagedIdentityCredential, error) { msiResc := azidentity.ResourceID(*msid) msiOpts := azidentity.ManagedIdentityCredentialOptions{ID: msiResc} cred, err := azidentity.NewManagedIdentityCredential(&msiOpts) if err != nil { return nil, err } return cred, nil } func main() { flag.Parse() // ARN client uses UUIDs, this greatly improves the performance of UUID generation. uuid.EnableRandPool() if arnRscID == "" { panic("RESOURCE_ID environment variable must be set") } // This gets our AAA resources. // Note: I am ignoring the K8 clientset here, as it is not needed for the ARN client. // It would be needed for getting K8 data (if that is your source data) to send to ARN. _, cred, err := aaa() if err != nil { panic(err) } // Create the ARN client. arnClient, err := client.New( bkCtx, client.Args{ HTTP: HTTPArgs{ Endpoint: *arnEndpoint, Cred: cred, }, Blob: BlobArgs{ Endpoint: *storageAccount, Cred: cred, }, }, ) if err != nil { panic(err) }
Example - sending a notification synchronously using the v3 model using a AKS node event:
// Note: node is a k8 Node object that is JSON serializable // Note: rscID is the *arm.ResourceID of the node, which is created with github.com/Azure/azure-sdk-for-go/sdk/azcore/arm.ParseResourceID() // You can get a rescID with arm.ParseResourceID(path.Join(p.rescPrefix, suffix)) // where rescPrefix looks like: /subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/test/providers/Microsoft.ContainerService/managedClusters/something/ // and suffix is something like: nodes/aks-nodepool1-12345678-vmss000000 // Suffix is negotiated with the ARN team. armRsc, err := NewArmResource(types.ActSnapshot, rscID, "2024-01-01", nil) if err != nil { return err } notification := msgs.Notification{ ResourceLocation: "eastus", PublisherInfo: "Microsoft.ContainerService", APIVersion: "2024-01-01", Data: []types.NotificationResource{ { Data: node, // This is the Node object that will be serialized to JSON. ResourceEventTime: n.GetCreationTimestamp().Time.UTC(), ArmResource: armRsc, ResourceID: rescID.String(), ResourceSystemProperties: types.ResourceSystemProperties{ Updated: n.GetCreationTimestamp().Time.UTC(), ChangeAction: types.CAUpdate, }, }, ... } } // This is a blocking call. err := arnClient.Notify(ctx, notification)
Example - sending a notification asynchronously using the v3 model using a AKS node event and using a promise:
notification := arnClient.Async(ctx, notificiation, true) ... // Do stuff if err := notification.Promise(); err != nil { // Handle error } notification.Recycle() // Reuses the promise for the next notification
Example - sending a notification asynchronously using the v3 model using a AKS node event and without a promise:
go func() { for _, err := range arnClient.Errors() { slog.Default().Error(err.Error()) } }() for _, notification := range notifications { arnClient.Async(ctx, notificiation, false) }
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ARN ¶
type ARN struct {
// contains filtered or unexported fields
}
ARN is a client for interacting with the ARN service.
func (*ARN) Async ¶
func (a *ARN) Async(ctx context.Context, n models.Notifications, promise bool) models.Notifications
Async sends a notification to the ARN service asynchronously. This will not block waiting for a response. If the promise is true, .Promise() will be used to send the results. If not, any errors will be sent to the ARN.Errors() channel. The returned Notification will have the Promise set if promise == true. NOTE: If you don't use the returned Notification for a Promise instead of the one you passed, you will not get the results. Thread-safe.
func (*ARN) Close ¶
func (a *ARN) Close()
Close closes the client. This will close the In() channel.
func (*ARN) Errors ¶
Errors returns a channel that will receive any errors that occur in the client where a promise is not used. If using Notify(), this will not be used.
func (*ARN) Notify ¶
Notify sends a notification to the ARN service. This is similar to sending via Async(), however this will block until the notification is sent and returns any error. In reality, this is a thin wrapper around Async() that uses a promise to send the results. If the context is canceled, this will return the context error. Thread-safe (however, order usually matters in ARN).
type Args ¶
type Args struct { // HTTP is used to configure the HTTP client to talk to ARN. HTTP HTTPArgs // Blob is the blob storage client used for large messages. Blob BlobArgs // contains filtered or unexported fields }
Args are the arguments for creating a new ARN client.
type BlobArgs ¶
type BlobArgs struct { // Endpoint is the blob storage endpoint. Endpoint string // Cred is the token credential to use for authentication to blob storage. Cred azcore.TokenCredential // ContainerExt sets a name extension for a blob container. This can be useful for // doing discovery of containers that are created by a particular client. // Names are in the format "arm-ext-nt-YYYY-MM-DD". This will cause the client to create // "arm-ext-nt-[ext]-YYYY-MM-DD". Note characters must be letters, numbers, or hyphens. // Any letters will be automatically lowercased. The ext cannot be more than 41 characters. ContainerExt string // Opts are opttions for the azcore HTTP client. Opts *policy.ClientOptions }
BlobArgs are the arguments for creating a new ARN blob client used for large transfers.
type HTTPArgs ¶
type HTTPArgs struct { // Endpoint is the ARN endpoint. Endpoint string // Cred is the token credential to use for authentication to ARN. Cred azcore.TokenCredential // Opts are opttions for the azcore HTTP client. Opts *policy.ClientOptions // Compression is a flag to enable deflate compression on the HTTP client. Compression bool }
HTTPArgs are the arguments for creating a new ARN HTTP client.
type Option ¶
Option is a function that sets an option on the client.
func WithFakeClients ¶ added in v0.1.1
WithFakeClients sets the fake clients on the ARN client. This is only valid in testing.
func WithLogger ¶
WithLogger sets the logger on the client. By default it uses slog.Default().
func WithMeterProvider ¶
func WithMeterProvider(m metric.MeterProvider) Option
WithMeterProvider sets the meter provider with which to register metrics. Defaults to nil, in which case metrics won't be registered.
func WithNotifyCh ¶
func WithNotifyCh(in chan models.Notifications) Option
WithNotifyCh sets the notification channel on the client. By default it uses a new channel with a buffer size of 1.