lambda/core/internalagent.go (130 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package core
import (
"fmt"
"time"
"go.amzn.com/lambda/core/statejson"
"github.com/google/uuid"
)
// InternalAgent represents internal agent
type InternalAgent struct {
Name string
ID uuid.UUID
events map[Event]struct{}
ManagedThread Suspendable
currentState InternalAgentState
stateLastModified time.Time
StartedState InternalAgentState
RegisteredState InternalAgentState
RunningState InternalAgentState
ReadyState InternalAgentState
InitErrorState InternalAgentState
ExitErrorState InternalAgentState
errorType string
}
// NewInternalAgent returns new instance of a named agent
func NewInternalAgent(name string, initFlow InitFlowSynchronization, invokeFlow InvokeFlowSynchronization) *InternalAgent {
agent := &InternalAgent{
Name: name,
ID: uuid.New(),
ManagedThread: NewManagedThread(),
events: make(map[Event]struct{}),
}
agent.StartedState = &InternalAgentStartedState{agent: agent}
agent.RegisteredState = &InternalAgentRegisteredState{agent: agent, initFlow: initFlow}
agent.RunningState = &InternalAgentRunningState{agent: agent, invokeFlow: invokeFlow}
agent.ReadyState = &InternalAgentReadyState{agent: agent}
agent.InitErrorState = &InternalAgentInitErrorState{}
agent.ExitErrorState = &InternalAgentExitErrorState{}
agent.setStateUnsafe(agent.StartedState)
return agent
}
func (s *InternalAgent) String() string {
return fmt.Sprintf("%s (%s)", s.Name, s.ID)
}
// SuspendUnsafe the current running thread
func (s *InternalAgent) SuspendUnsafe() {
s.ManagedThread.SuspendUnsafe()
}
// Release will resume a suspended thread
func (s *InternalAgent) Release() {
s.ManagedThread.Release()
}
// SetState using the lock
func (s *InternalAgent) SetState(state InternalAgentState) {
s.ManagedThread.Lock()
defer s.ManagedThread.Unlock()
s.setStateUnsafe(state)
}
func (s *InternalAgent) setStateUnsafe(state InternalAgentState) {
s.currentState = state
s.stateLastModified = time.Now()
}
func ValidateInternalAgentEvent(e Event) error {
switch e {
case InvokeEvent:
return nil
case ShutdownEvent:
return errEventNotSupportedForInternalAgent
}
return errInvalidEventType
}
func (s *InternalAgent) subscribeUnsafe(e Event) error {
if err := ValidateInternalAgentEvent(e); err != nil {
return err
}
s.events[e] = struct{}{}
return nil
}
// IsSubscribed checks whether agent is subscribed the Event
func (s *InternalAgent) IsSubscribed(e Event) bool {
s.ManagedThread.Lock()
defer s.ManagedThread.Unlock()
_, found := s.events[e]
return found
}
// SubscribedEvents returns events to which the agent is subscribed
func (s *InternalAgent) SubscribedEvents() []string {
s.ManagedThread.Lock()
defer s.ManagedThread.Unlock()
events := []string{}
for event := range s.events {
events = append(events, string(event))
}
return events
}
// GetState returns agent's current state
func (s *InternalAgent) GetState() InternalAgentState {
s.ManagedThread.Lock()
defer s.ManagedThread.Unlock()
return s.currentState
}
// Register an agent with the platform
func (s *InternalAgent) Register(events []Event) error {
s.ManagedThread.Lock()
defer s.ManagedThread.Unlock()
return s.currentState.Register(events)
}
// Ready - mark an agent as ready
func (s *InternalAgent) Ready() error {
s.ManagedThread.Lock()
defer s.ManagedThread.Unlock()
return s.currentState.Ready()
}
// InitError - agent registered but failed to initialize
func (s *InternalAgent) InitError(errorType string) error {
s.ManagedThread.Lock()
defer s.ManagedThread.Unlock()
return s.currentState.InitError(errorType)
}
// ExitError - agent registered but failed to initialize
func (s *InternalAgent) ExitError(errorType string) error {
s.ManagedThread.Lock()
defer s.ManagedThread.Unlock()
return s.currentState.ExitError(errorType)
}
// ErrorType returns error type reported during init or exit
func (s *InternalAgent) ErrorType() string {
s.ManagedThread.Lock()
defer s.ManagedThread.Unlock()
return s.errorType
}
// GetAgentDescription returns agent description object for debugging purposes
func (s *InternalAgent) GetAgentDescription() statejson.ExtensionDescription {
s.ManagedThread.Lock()
defer s.ManagedThread.Unlock()
return statejson.ExtensionDescription{
Name: s.Name,
ID: s.ID.String(),
State: statejson.StateDescription{
Name: s.currentState.Name(),
LastModified: s.stateLastModified.UnixNano() / int64(time.Millisecond),
},
ErrorType: s.errorType,
}
}