go/mqtt/auth/mq_sat.go (84 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package auth
import (
"bytes"
"os"
"path/filepath"
"sync"
"github.com/fsnotify/fsnotify"
)
// AIOServiceAccountToken impelements an enhanced authentication provider that
// reads a Kubernetes Service Account Token for the AIO Broker.
type AIOServiceAccountToken struct {
filename string
watcher *fsnotify.Watcher
reauth func()
token []byte
mu sync.RWMutex
}
// NewAIOServiceAccountToken creates a new AIO SAT auth provider from the given
// filename.
func NewAIOServiceAccountToken(
filename string,
) (*AIOServiceAccountToken, error) {
token, err := os.ReadFile(filename)
if err != nil {
return nil, err
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
if err := watcher.Add(filepath.Dir(filename)); err != nil {
_ = watcher.Close()
return nil, err
}
sat := &AIOServiceAccountToken{
filename: filename,
watcher: watcher,
token: token,
}
go sat.watch()
return sat, nil
}
func (sat *AIOServiceAccountToken) InitiateAuth(bool) (*Values, error) {
return &Values{AuthMethod: "K8S-SAT", AuthData: sat.token}, nil
}
func (*AIOServiceAccountToken) ContinueAuth(*Values) (*Values, error) {
return nil, ErrUnexpected
}
func (sat *AIOServiceAccountToken) AuthSuccess(requestReauth func()) {
sat.mu.Lock()
defer sat.mu.Unlock()
sat.reauth = requestReauth
}
func (sat *AIOServiceAccountToken) Close() error {
return sat.watcher.Close()
}
func (sat *AIOServiceAccountToken) watch() {
for {
select {
case evt, ok := <-sat.watcher.Events:
if !ok {
return
}
// Since we're listening to the parent directory, only pay attention
// to operations which could have reasonably modified the data the
// SAT token file represents.
switch evt.Op {
case fsnotify.Write, fsnotify.Create, fsnotify.Rename:
default:
continue
}
// Some file writes (e.g. using > on the command line) will clear
// the file before rewriting it. We see this as two write events,
// so we need to ignore the first one in order to not send an empty
// AUTH packet to the MQTT server.
token, err := os.ReadFile(sat.filename)
if err != nil || len(token) == 0 {
continue
}
sat.attemptReauth(token)
case _, ok := <-sat.watcher.Errors:
if !ok {
return
}
// Nothing useful to do; we just don't want to block.
}
}
}
func (sat *AIOServiceAccountToken) attemptReauth(token []byte) {
sat.mu.RLock()
defer sat.mu.RUnlock()
// If the file changes but we don't have the reauth callback, we never
// actually succeeded auth, so it will get retried anyways. In addition,
// since we're listening to the folder, only request reauth if the token
// has actually changed to cut down noise.
if sat.reauth != nil && !bytes.Equal(sat.token, token) {
sat.token = token
sat.reauth()
}
}