internal/plugin/manager/pluginwatcher.go (52 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package manager
import (
"context"
"fmt"
"time"
"github.com/GoogleCloudPlatform/galog"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/events"
pcpb "github.com/GoogleCloudPlatform/google-guest-agent/internal/plugin/proto/google_guest_agent/plugin"
"github.com/GoogleCloudPlatform/google-guest-agent/internal/retry"
)
const (
// WatcherID is the core plugin watcher's ID.
WatcherID = "plugin-status-watcher"
// EventID is the core plugin event type ID.
EventID = "plugin-watcher,status"
)
// InitWatcher initializes and registers the watcher to monitor plugin status
// for a specific request. Runner also removes the watcher as soon as the
// condition is met.
func InitWatcher(ctx context.Context, name string, code int32, req string) (*Watcher, error) {
w := &Watcher{name: name, statusCode: code, request: req}
return w, events.FetchManager().AddWatcher(ctx, w)
}
// Watcher is the plugin event watcher implementation.
type Watcher struct {
// name is name of the plugin watcher is watching.
name string
// statusCode is the status code that should generate successful event.
statusCode int32
// request is the context to get status for.
request string
}
// ID returns the plugin watcher ID.
func (w *Watcher) ID() string {
return WatcherID
}
// Events returns an slice with all implemented events.
func (w *Watcher) Events() []string {
return []string{EventID}
}
// Run implements the plugin event watcher that does status check
// on plugin and notifies when plugin returns the required [status] code.
// Watcher stops watching the plugin once the event is detected.
// Non-nil error is sent only when the required event is encountered.
func (w *Watcher) Run(ctx context.Context, event string) (bool, any, error) {
galog.Debugf("Running watcher for plugin: %q, request: %q, status: %d", w.name, w.request, w.statusCode)
p, err := Instance().fetch(w.name)
if err != nil {
return false, nil, fmt.Errorf("unable to fetch plugin %q: %w", w.name, err)
}
// Returning will cause event manager to run this watcher immediately.
// Use retry policy to have backoff and avoid overloading or any contention
// on the plugin.
policy := retry.Policy{MaxAttempts: 3, BackoffFactor: 2, Jitter: time.Second * 2}
// This function returns non-nil error only when status code condition is
// matched.
f := func() (*pcpb.Status, error) {
resp, status := p.GetStatus(ctx, w.request)
if status.Err() != nil {
return nil, fmt.Errorf("unable to get %q plugin status: %+v", w.name, status)
}
if resp.GetCode() == w.statusCode {
return resp, nil
}
return nil, fmt.Errorf("plugin %q returned [%d] status code, last status: [%+v]", w.name, w.statusCode, resp)
}
status, err := retry.RunWithResponse(ctx, policy, f)
if err == nil {
// Return false, no need to watch as event has already occurred and
// subscribers will be notified.
return false, status, nil
}
return true, nil, err
}