cns/restserver/util.go (829 lines of code) (raw):
package restserver
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/Azure/azure-container-networking/aitelemetry"
"github.com/Azure/azure-container-networking/cns"
"github.com/Azure/azure-container-networking/cns/dockerclient"
"github.com/Azure/azure-container-networking/cns/logger"
"github.com/Azure/azure-container-networking/cns/networkcontainers"
"github.com/Azure/azure-container-networking/cns/nodesubnet"
"github.com/Azure/azure-container-networking/cns/types"
"github.com/Azure/azure-container-networking/cns/wireserver"
acn "github.com/Azure/azure-container-networking/common"
"github.com/Azure/azure-container-networking/platform"
"github.com/Azure/azure-container-networking/store"
"github.com/pkg/errors"
)
// This file contains the utility/helper functions called by either HTTP APIs or Exported/Internal APIs on HTTPRestService
// Get the network info from the service network state
func (service *HTTPRestService) getNetworkInfo(networkName string) (*networkInfo, bool) {
service.RLock()
defer service.RUnlock()
networkInfo, found := service.state.Networks[networkName]
return networkInfo, found
}
// Set the network info in the service network state
func (service *HTTPRestService) setNetworkInfo(networkName string, networkInfo *networkInfo) {
service.Lock()
defer service.Unlock()
service.state.Networks[networkName] = networkInfo
}
func (service *HTTPRestService) SavePnpIDMacaddressMapping(ctx context.Context) error {
// If mapping is already set, skip setting it again.
if len(service.state.PnpIDByMacAddress) != 0 {
return nil
}
p := platform.NewExecClient(nil)
vfMacAddressMapping, err := platform.FetchMacAddressPnpIDMapping(ctx, p)
if err != nil {
return errors.Wrap(err, "failed to fetch MACAddressPnpIDMapping")
}
service.state.PnpIDByMacAddress = vfMacAddressMapping
if err = service.saveState(); err != nil {
logger.Errorf("Failed to save mapping to statefile: %v", err)
}
return nil
}
func (service *HTTPRestService) getPNPIDFromMacAddress(ctx context.Context, macAddress string) (string, error) {
// If map is empty in state file, CNS needs to populate state file before it returns back the response
if len(service.state.PnpIDByMacAddress) == 0 {
if err := service.SavePnpIDMacaddressMapping(ctx); err != nil {
return "", err
}
}
if _, ok := service.state.PnpIDByMacAddress[macAddress]; !ok {
return "", errors.New("Backend Network adapter not found")
}
return service.state.PnpIDByMacAddress[macAddress], nil
}
// Remove the network info from the service network state
func (service *HTTPRestService) removeNetworkInfo(networkName string) {
service.Lock()
defer service.Unlock()
delete(service.state.Networks, networkName)
}
// saveState writes CNS state to persistent store.
func (service *HTTPRestService) saveState() error {
// Skip if a store is not provided.
if service.store == nil {
logger.Printf("[Azure CNS] store not initialized.")
return nil
}
// Update time stamp.
service.state.TimeStamp = time.Now()
err := service.store.Write(storeKey, &service.state)
if err != nil {
logger.Errorf("[Azure CNS] Failed to save state, err: %v", err)
}
return err
}
// restoreState restores CNS state from persistent store.
func (service *HTTPRestService) restoreState() {
logger.Printf("[Azure CNS] restoreState")
// Skip if a store is not provided.
if service.store == nil {
logger.Printf("[Azure CNS] store not initialized.")
return
}
// Read any persisted state.
err := service.store.Read(storeKey, &service.state)
if err != nil {
if err == store.ErrKeyNotFound {
// Nothing to restore.
logger.Printf("[Azure CNS] No state to restore.\n")
} else {
logger.Errorf("[Azure CNS] Failed to restore state, err:%v. Removing azure-cns.json", err)
service.store.Remove()
}
return
}
logger.Printf("[Azure CNS] Restored state, %+v\n", service.state)
if service.Options[acn.OptManageEndpointState] == true {
err := service.EndpointStateStore.Read(EndpointStoreKey, &service.EndpointState)
if err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
// Nothing to restore.
logger.Printf("[Azure CNS] No endpoint state to restore.\n")
} else {
logger.Errorf("[Azure CNS] Failed to restore endpoint state, err:%v. Removing endpoints.json", err)
}
return
}
logger.Printf("[Azure CNS] Restored endpoint state, %+v\n", service.EndpointState)
}
}
func (service *HTTPRestService) saveNetworkContainerGoalState(req cns.CreateNetworkContainerRequest) (types.ResponseCode, string) { //nolint // legacy
// we don't want to overwrite what other calls may have written
service.Lock()
defer service.Unlock()
var (
hostVersion string
existingSecondaryIPConfigs map[string]cns.SecondaryIPConfig // uuid is key
vfpUpdateComplete bool
)
if service.state.ContainerStatus == nil {
service.state.ContainerStatus = make(map[string]containerstatus)
}
existingNCStatus, ok := service.state.ContainerStatus[req.NetworkContainerid]
if ok {
hostVersion = existingNCStatus.HostVersion
existingSecondaryIPConfigs = existingNCStatus.CreateNetworkContainerRequest.SecondaryIPConfigs
vfpUpdateComplete = existingNCStatus.VfpUpdateComplete
}
if req.NetworkContainerid == nodesubnet.NodeSubnetNCID {
hostVersion = nodesubnet.NodeSubnetHostVersion
vfpUpdateComplete = true
}
if hostVersion == "" {
// Host version is the NC version from NMAgent, set it -1 to indicate no result from NMAgent yet.
// TODO, query NMAgent and with aggresive time out and assign latest host version.
hostVersion = "-1"
}
// Remove the auth token before saving the containerStatus to cns json file
createNetworkContainerRequest := req
createNetworkContainerRequest.AuthorizationToken = ""
service.state.ContainerStatus[req.NetworkContainerid] = containerstatus{
ID: req.NetworkContainerid,
VMVersion: req.Version,
CreateNetworkContainerRequest: createNetworkContainerRequest,
HostVersion: hostVersion,
VfpUpdateComplete: vfpUpdateComplete,
}
switch req.NetworkContainerType {
case cns.AzureContainerInstance:
fallthrough
case cns.Docker:
fallthrough
case cns.Kubernetes:
fallthrough
case cns.Basic:
fallthrough
case cns.JobObject:
fallthrough
case cns.COW, cns.BackendNICNC, cns.WebApps:
switch service.state.OrchestratorType {
case cns.Kubernetes:
fallthrough
case cns.ServiceFabric:
fallthrough
case cns.Batch:
fallthrough
case cns.DBforPostgreSQL:
fallthrough
case cns.AzureFirstParty:
fallthrough
case cns.WebApps, cns.BackendNICNC: // todo: Is WebApps an OrchastratorType or ContainerType?
podInfo, err := cns.UnmarshalPodInfo(req.OrchestratorContext)
if err != nil {
errBuf := fmt.Sprintf("Unmarshalling %s failed with error %v", req.NetworkContainerType, err)
return types.UnexpectedError, errBuf
}
orchestratorContext := podInfo.Name() + podInfo.Namespace()
if service.state.ContainerIDByOrchestratorContext == nil {
service.state.ContainerIDByOrchestratorContext = make(map[string]*ncList)
}
if _, ok := service.state.ContainerIDByOrchestratorContext[orchestratorContext]; !ok {
service.state.ContainerIDByOrchestratorContext[orchestratorContext] = new(ncList)
}
ncs := service.state.ContainerIDByOrchestratorContext[orchestratorContext]
ncs.Add(req.NetworkContainerid)
logger.Printf("service.state.ContainerIDByOrchestratorContext[%s] is %+v", orchestratorContext, *service.state.ContainerIDByOrchestratorContext[orchestratorContext])
case cns.KubernetesCRD:
// Validate and Update the SecondaryIpConfig state
returnCode, returnMesage := service.updateIPConfigsStateUntransacted(req, existingSecondaryIPConfigs, hostVersion)
if returnCode != 0 {
return returnCode, returnMesage
}
default:
errMsg := fmt.Sprintf("Unsupported orchestrator type: %s", service.state.OrchestratorType)
logger.Errorf(errMsg)
return types.UnsupportedOrchestratorType, errMsg
}
default:
errMsg := fmt.Sprintf("Unsupported network container type %s", req.NetworkContainerType)
logger.Errorf(errMsg)
return types.UnsupportedNetworkContainerType, errMsg
}
service.saveState()
return 0, ""
}
// This func will compute the deltaIpConfigState which needs to be updated (Added or Deleted) from the inmemory map
// Note: Also this func is an untransacted API as the caller will take a Service lock
func (service *HTTPRestService) updateIPConfigsStateUntransacted(
req cns.CreateNetworkContainerRequest, existingSecondaryIPConfigs map[string]cns.SecondaryIPConfig, hostVersion string,
) (types.ResponseCode, string) {
// parse the existingSecondaryIpConfigState to find the deleted Ips
newIPConfigs := req.SecondaryIPConfigs
tobeDeletedIPConfigs := make(map[string]cns.SecondaryIPConfig)
// Populate the ToBeDeleted list, Secondary IPs which doesnt exist in New request anymore.
// We will later remove them from the in-memory cache
for secondaryIpId, existingIPConfig := range existingSecondaryIPConfigs {
_, exists := newIPConfigs[secondaryIpId]
if !exists {
// IP got removed in the updated request, add it in tobeDeletedIps
tobeDeletedIPConfigs[secondaryIpId] = existingIPConfig
}
}
// Validate TobeDeletedIps are ready to be deleted.
for ipID := range tobeDeletedIPConfigs {
ipConfigStatus, exists := service.PodIPConfigState[ipID]
if exists {
// pod ip exists, validate if state is not assigned, else fail
if ipConfigStatus.GetState() == types.Assigned {
errMsg := fmt.Sprintf("Failed to delete an Assigned IP %v", ipConfigStatus)
return types.InconsistentIPConfigState, errMsg
}
}
}
// now actually remove the deletedIPs
for ipID := range tobeDeletedIPConfigs {
returncode, errMsg := service.removeToBeDeletedIPStateUntransacted(ipID, true)
if returncode != types.Success {
return returncode, errMsg
}
}
// Add new IPs
// TODO, will udpate NC version related variable to int, change it from string to int is a pains
var hostNCVersionInInt int
var err error
if hostNCVersionInInt, err = strconv.Atoi(hostVersion); err != nil {
return types.UnsupportedNCVersion, fmt.Sprintf("Invalid hostVersion is %s, err:%s", hostVersion, err)
}
service.addIPConfigStateUntransacted(req.NetworkContainerid, hostNCVersionInInt, req.SecondaryIPConfigs,
existingSecondaryIPConfigs)
return 0, ""
}
// addIPConfigStateUntransacted adds the IPConfigs to the PodIpConfigState map with Available state
// If the IP is already added then it will be an idempotent call. Also note, caller will
// acquire/release the service lock.
func (service *HTTPRestService) addIPConfigStateUntransacted(ncID string, hostVersion int, ipconfigs,
existingSecondaryIPConfigs map[string]cns.SecondaryIPConfig,
) {
// add ipconfigs to state
for ipID, ipconfig := range ipconfigs {
// New secondary IP configs has new NC version however, CNS don't want to override existing IPs'with new
// NC version. Set it back to previous NC version if IP already exist.
if existingIPConfig, existsInPreviousIPConfig := existingSecondaryIPConfigs[ipID]; existsInPreviousIPConfig {
ipconfig.NCVersion = existingIPConfig.NCVersion
ipconfigs[ipID] = ipconfig
}
if ipState, exists := service.PodIPConfigState[ipID]; exists {
logger.Printf("[Azure-Cns] Set ipId %s, IP %s version to %d, programmed host nc version is %d, "+
"ipState: %s", ipID, ipconfig.IPAddress, ipconfig.NCVersion, hostVersion, ipState)
continue
}
logger.Printf("[Azure-Cns] Set ipId %s, IP %s version to %d, programmed host nc version is %d",
ipID, ipconfig.IPAddress, ipconfig.NCVersion, hostVersion)
// Using the updated NC version attached with IP to compare with latest nmagent version and determine IP statues.
// When reconcile, service.PodIPConfigState doens't exist, rebuild it with the help of NC version attached with IP.
var newIPCNSStatus types.IPState
if hostVersion < ipconfig.NCVersion {
newIPCNSStatus = types.PendingProgramming
} else {
newIPCNSStatus = types.Available
}
// add the new State
ipconfigStatus := cns.IPConfigurationStatus{
NCID: ncID,
ID: ipID,
IPAddress: ipconfig.IPAddress,
PodInfo: nil,
}
ipconfigStatus.WithStateMiddleware(stateTransitionMiddleware)
ipconfigStatus.SetState(newIPCNSStatus)
logger.Printf("[Azure-Cns] Add IP %s as %s", ipconfig.IPAddress, newIPCNSStatus)
service.PodIPConfigState[ipID] = ipconfigStatus
// Todo Update batch API and maintain the count
}
}
// Todo: call this when request is received
func validateIPSubnet(ipSubnet cns.IPSubnet) error {
if ipSubnet.IPAddress == "" {
return fmt.Errorf("Failed to add IPConfig to state: %+v, empty IPSubnet.IPAddress", ipSubnet)
}
if ipSubnet.PrefixLength == 0 {
return fmt.Errorf("Failed to add IPConfig to state: %+v, empty IPSubnet.PrefixLength", ipSubnet)
}
return nil
}
// removeToBeDeletedIPStateUntransacted removes IPConfigs from the PodIpConfigState map
// Caller will acquire/release the service lock.
func (service *HTTPRestService) removeToBeDeletedIPStateUntransacted(
ipID string, skipValidation bool,
) (types.ResponseCode, string) {
// this is set if caller has already done the validation
if !skipValidation {
ipConfigStatus, exists := service.PodIPConfigState[ipID]
if exists {
// pod ip exists, validate if state is not assigned, else fail
if ipConfigStatus.GetState() == types.Assigned {
errMsg := fmt.Sprintf("Failed to delete an Assigned IP %v", ipConfigStatus)
return types.InconsistentIPConfigState, errMsg
}
}
}
// Delete this ip from PODIpConfigState Map
logger.Printf("[Azure-Cns] Delete the PodIpConfigState, IpId: %s, IPConfigStatus: %v",
ipID,
service.PodIPConfigState[ipID])
delete(service.PodIPConfigState, ipID)
return 0, ""
}
func (service *HTTPRestService) getAllNetworkContainerResponses(
req cns.GetNetworkContainerRequest,
) []cns.GetNetworkContainerResponse {
var (
getNetworkContainerResponse cns.GetNetworkContainerResponse
ncs []string
skipNCVersionCheck = false
)
service.Lock()
defer service.Unlock()
switch service.state.OrchestratorType {
case cns.Kubernetes, cns.ServiceFabric, cns.Batch, cns.DBforPostgreSQL, cns.AzureFirstParty:
podInfo, err := cns.UnmarshalPodInfo(req.OrchestratorContext)
getNetworkContainersResponse := []cns.GetNetworkContainerResponse{}
if err != nil {
response := cns.Response{
ReturnCode: types.UnexpectedError,
Message: fmt.Sprintf("Unmarshalling orchestrator context failed with error %v", err),
}
getNetworkContainerResponse.Response = response
getNetworkContainersResponse = append(getNetworkContainersResponse, getNetworkContainerResponse)
return getNetworkContainersResponse
}
// get networkContainerIDs as string, "nc1, nc2"
orchestratorContext := podInfo.Name() + podInfo.Namespace()
if service.state.ContainerIDByOrchestratorContext[orchestratorContext] != nil {
ncs = strings.Split(string(*service.state.ContainerIDByOrchestratorContext[orchestratorContext]), ",")
}
// This indicates that there are no ncs for the given orchestrator context
if len(ncs) == 0 {
response := cns.Response{
ReturnCode: types.UnknownContainerID,
Message: fmt.Sprintf("Failed to find networkContainerID for orchestratorContext %s", orchestratorContext),
}
getNetworkContainerResponse.Response = response
getNetworkContainersResponse = append(getNetworkContainersResponse, getNetworkContainerResponse)
return getNetworkContainersResponse
}
ctx, cancel := context.WithTimeout(context.Background(), nmaAPICallTimeout)
defer cancel()
ncVersionListResp, err := service.nma.GetNCVersionList(ctx)
if err != nil {
skipNCVersionCheck = true
logger.Errorf("failed to get nc version list from nmagent")
// TODO: Add telemetry as this has potential to have containers in the running state w/o datapath working
}
nmaNCs := map[string]string{}
for _, nc := range ncVersionListResp.Containers {
// store nmaNCID as lower case to allow case insensitive comparison with nc stored in CNS
nmaNCs[strings.TrimPrefix(lowerCaseNCGuid(nc.NetworkContainerID), cns.SwiftPrefix)] = nc.Version
}
if !skipNCVersionCheck {
for _, ncid := range ncs {
waitingForUpdate := false
// If the goal state is available with CNS, check if the NC is pending VFP programming
waitingForUpdate, getNetworkContainerResponse.Response.ReturnCode, getNetworkContainerResponse.Response.Message = service.isNCWaitingForUpdate(service.state.ContainerStatus[ncid].CreateNetworkContainerRequest.Version, ncid, nmaNCs) //nolint:lll // bad code
// If the return code is not success, return the error to the caller
if getNetworkContainerResponse.Response.ReturnCode == types.NetworkContainerVfpProgramPending {
logger.Errorf("[Azure-CNS] isNCWaitingForUpdate failed for NCID: %s", ncid)
}
vfpUpdateComplete := !waitingForUpdate
ncstatus := service.state.ContainerStatus[ncid]
// Update the container status if-
// 1. VfpUpdateCompleted successfully
// 2. VfpUpdateComplete changed to false
if (getNetworkContainerResponse.Response.ReturnCode == types.NetworkContainerVfpProgramComplete &&
vfpUpdateComplete && ncstatus.VfpUpdateComplete != vfpUpdateComplete) ||
(!vfpUpdateComplete && ncstatus.VfpUpdateComplete != vfpUpdateComplete) {
logger.Printf("[Azure-CNS] Setting VfpUpdateComplete to %t for NCID: %s", vfpUpdateComplete, ncid)
ncstatus.VfpUpdateComplete = vfpUpdateComplete
service.state.ContainerStatus[ncid] = ncstatus
if err = service.saveState(); err != nil {
logger.Errorf("Failed to save goal states for nc %+v due to %s", getNetworkContainerResponse, err)
}
}
}
}
if service.ChannelMode == cns.Managed {
// If the NC goal state doesn't exist in CNS running in managed mode, call DNC to retrieve the goal state
var (
dncEP = service.GetOption(acn.OptPrivateEndpoint).(string)
infraVnet = service.GetOption(acn.OptInfrastructureNetworkID).(string)
nodeID = service.GetOption(acn.OptNodeID).(string)
)
service.Unlock()
getNetworkContainerResponse.Response.ReturnCode, getNetworkContainerResponse.Response.Message = service.SyncNodeStatus(dncEP, infraVnet, nodeID, req.OrchestratorContext)
service.Lock()
if getNetworkContainerResponse.Response.ReturnCode == types.NotFound {
getNetworkContainersResponse = append(getNetworkContainersResponse, getNetworkContainerResponse)
return getNetworkContainersResponse
}
}
default:
getNetworkContainersResponse := []cns.GetNetworkContainerResponse{}
response := cns.Response{
ReturnCode: types.UnsupportedOrchestratorType,
Message: fmt.Sprintf("Invalid orchestrator type %v", service.state.OrchestratorType),
}
getNetworkContainerResponse.Response = response
getNetworkContainersResponse = append(getNetworkContainersResponse, getNetworkContainerResponse)
return getNetworkContainersResponse
}
getNetworkContainersResponse := []cns.GetNetworkContainerResponse{}
for _, ncid := range ncs {
containerStatus := service.state.ContainerStatus
containerDetails, ok := containerStatus[ncid]
if !ok {
response := cns.Response{
ReturnCode: types.UnknownContainerID,
Message: "NetworkContainer doesn't exist.",
}
getNetworkContainerResponse.Response = response
getNetworkContainersResponse = append(getNetworkContainersResponse, getNetworkContainerResponse)
continue
}
savedReq := containerDetails.CreateNetworkContainerRequest
getNetworkContainerResponse = cns.GetNetworkContainerResponse{
NetworkContainerID: savedReq.NetworkContainerid,
IPConfiguration: savedReq.IPConfiguration,
Routes: savedReq.Routes,
CnetAddressSpace: savedReq.CnetAddressSpace,
MultiTenancyInfo: savedReq.MultiTenancyInfo,
PrimaryInterfaceIdentifier: savedReq.PrimaryInterfaceIdentifier,
LocalIPConfiguration: savedReq.LocalIPConfiguration,
AllowHostToNCCommunication: savedReq.AllowHostToNCCommunication,
AllowNCToHostCommunication: savedReq.AllowNCToHostCommunication,
NetworkInterfaceInfo: savedReq.NetworkInterfaceInfo,
}
// If the NC version check wasn't skipped, take into account the VFP programming status when returning the response
if !skipNCVersionCheck {
if !containerDetails.VfpUpdateComplete {
getNetworkContainerResponse.Response = cns.Response{
ReturnCode: types.NetworkContainerVfpProgramPending,
Message: "NetworkContainer VFP programming is pending",
}
}
}
getNetworkContainersResponse = append(getNetworkContainersResponse, getNetworkContainerResponse)
}
logger.Printf("getNetworkContainersResponses are %+v", getNetworkContainersResponse)
return getNetworkContainersResponse
}
// restoreNetworkState restores Network state that existed before reboot.
func (service *HTTPRestService) restoreNetworkState() error {
logger.Printf("[Azure CNS] Enter Restoring Network State")
if service.store == nil {
logger.Printf("[Azure CNS] Store is not initialized, nothing to restore for network state.")
return nil
}
if !service.store.Exists() {
logger.Printf("[Azure CNS] Store does not exist, nothing to restore for network state.")
return nil
}
rebooted := false
modTime, err := service.store.GetModificationTime()
if err == nil {
logger.Printf("[Azure CNS] Store timestamp is %v.", modTime)
p := platform.NewExecClient(nil)
rebootTime, err := p.GetLastRebootTime()
if err == nil && rebootTime.After(modTime) {
logger.Printf("[Azure CNS] reboot time %v mod time %v", rebootTime, modTime)
rebooted = true
}
}
if rebooted {
for _, nwInfo := range service.state.Networks {
enableSnat := true
logger.Printf("[Azure CNS] Restore nwinfo %v", nwInfo)
if nwInfo.Options != nil {
if _, ok := nwInfo.Options[dockerclient.OptDisableSnat]; ok {
enableSnat = false
}
}
if enableSnat {
err := platform.SetOutboundSNAT(nwInfo.NicInfo.Subnet)
if err != nil {
logger.Printf("[Azure CNS] Error setting up SNAT outbound rule %v", err)
return err
}
}
}
}
return nil
}
func (service *HTTPRestService) attachOrDetachHelper(req cns.ConfigureContainerNetworkingRequest, operation, method string) cns.Response {
if method != "POST" {
return cns.Response{
ReturnCode: types.InvalidParameter,
Message: "[Azure CNS] Error. " + operation + "ContainerToNetwork did not receive a POST.",
}
}
if req.Containerid == "" {
return cns.Response{
ReturnCode: types.DockerContainerNotSpecified,
Message: "[Azure CNS] Error. Containerid is empty",
}
}
if req.NetworkContainerid == "" {
return cns.Response{
ReturnCode: types.NetworkContainerNotSpecified,
Message: "[Azure CNS] Error. NetworkContainerid is empty",
}
}
existing, ok := service.getNetworkContainerDetails(cns.SwiftPrefix + req.NetworkContainerid)
if service.ChannelMode == cns.Managed && operation == attach {
if ok {
if !existing.VfpUpdateComplete {
ctx, cancel := context.WithTimeout(context.Background(), nmaAPICallTimeout)
defer cancel()
ncVersionListResp, err := service.nma.GetNCVersionList(ctx)
if err != nil {
logger.Errorf("failed to get nc version list from nmagent")
return cns.Response{
ReturnCode: types.NmAgentInternalServerError,
Message: err.Error(),
}
}
nmaNCs := map[string]string{}
for _, nc := range ncVersionListResp.Containers {
// store nmaNCID as lower case to allow case insensitive comparison with nc stored in CNS
nmaNCs[strings.TrimPrefix(lowerCaseNCGuid(nc.NetworkContainerID), cns.SwiftPrefix)] = nc.Version
}
_, returnCode, message := service.isNCWaitingForUpdate(existing.CreateNetworkContainerRequest.Version, req.NetworkContainerid, nmaNCs)
if returnCode == types.NetworkContainerVfpProgramPending {
return cns.Response{
ReturnCode: returnCode,
Message: message,
}
}
}
} else {
var (
dncEP = service.GetOption(acn.OptPrivateEndpoint).(string)
infraVnet = service.GetOption(acn.OptInfrastructureNetworkID).(string)
nodeID = service.GetOption(acn.OptNodeID).(string)
)
returnCode, msg := service.SyncNodeStatus(dncEP, infraVnet, nodeID, json.RawMessage{})
if returnCode != types.Success {
return cns.Response{
ReturnCode: returnCode,
Message: msg,
}
}
existing, _ = service.getNetworkContainerDetails(cns.SwiftPrefix + req.NetworkContainerid)
}
} else if !ok {
return cns.Response{
ReturnCode: types.NotFound,
Message: fmt.Sprintf("[Azure CNS] Error. Network Container %s does not exist.", req.NetworkContainerid),
}
}
var returnCode types.ResponseCode
var returnMessage string
switch service.state.OrchestratorType {
case cns.Batch:
podInfo, err := cns.UnmarshalPodInfo(existing.CreateNetworkContainerRequest.OrchestratorContext)
if err != nil {
returnCode = types.UnexpectedError
returnMessage = fmt.Sprintf("Unmarshalling orchestrator context failed with error %+v", err)
} else {
nc := service.networkContainer
netPluginConfig := service.getNetPluginDetails()
switch operation {
case attach:
err = nc.Attach(podInfo, req.Containerid, netPluginConfig)
case detach:
err = nc.Detach(podInfo, req.Containerid, netPluginConfig)
}
if err != nil {
returnCode = types.UnexpectedError
returnMessage = fmt.Sprintf("[Azure CNS] Error. "+operation+"ContainerToNetwork failed %+v", err.Error())
}
}
default:
returnMessage = fmt.Sprintf("[Azure CNS] Invalid orchestrator type %v", service.state.OrchestratorType)
returnCode = types.UnsupportedOrchestratorType
}
return cns.Response{
ReturnCode: returnCode,
Message: returnMessage,
}
}
func (service *HTTPRestService) getNetPluginDetails() *networkcontainers.NetPluginConfiguration {
pluginBinPath, _ := service.GetOption(acn.OptNetPluginPath).(string)
configPath, _ := service.GetOption(acn.OptNetPluginConfigFile).(string)
return networkcontainers.NewNetPluginConfiguration(pluginBinPath, configPath)
}
func (service *HTTPRestService) getNetworkContainerDetails(networkContainerID string) (containerstatus, bool) {
service.RLock()
defer service.RUnlock()
containerDetails, containerExists := service.state.ContainerStatus[networkContainerID]
return containerDetails, containerExists
}
// areNCsPresent returns true if NCs are present in CNS, false if no NCs are present
func (service *HTTPRestService) areNCsPresent() bool {
if len(service.state.ContainerStatus) == 0 && len(service.state.ContainerIDByOrchestratorContext) == 0 {
return false
}
return true
}
// Check if the network is joined
func (service *HTTPRestService) isNetworkJoined(networkID string) bool {
namedLock.LockAcquire(stateJoinedNetworks)
defer namedLock.LockRelease(stateJoinedNetworks)
_, exists := service.state.joinedNetworks[networkID]
return exists
}
// Set the network as joined
func (service *HTTPRestService) setNetworkStateJoined(networkID string) {
namedLock.LockAcquire(stateJoinedNetworks)
defer namedLock.LockRelease(stateJoinedNetworks)
service.state.joinedNetworks[networkID] = struct{}{}
}
func logNCSnapshot(createNetworkContainerRequest cns.CreateNetworkContainerRequest) {
aiEvent := aitelemetry.Event{
EventName: logger.CnsNCSnapshotEventStr,
Properties: make(map[string]string),
ResourceID: createNetworkContainerRequest.NetworkContainerid,
}
aiEvent.Properties[logger.IpConfigurationStr] = fmt.Sprintf("%+v", createNetworkContainerRequest.IPConfiguration)
aiEvent.Properties[logger.LocalIPConfigurationStr] = fmt.Sprintf("%+v", createNetworkContainerRequest.LocalIPConfiguration)
aiEvent.Properties[logger.PrimaryInterfaceIdentifierStr] = createNetworkContainerRequest.PrimaryInterfaceIdentifier
aiEvent.Properties[logger.MultiTenancyInfoStr] = fmt.Sprintf("%+v", createNetworkContainerRequest.MultiTenancyInfo)
aiEvent.Properties[logger.CnetAddressSpaceStr] = fmt.Sprintf("%+v", createNetworkContainerRequest.CnetAddressSpace)
aiEvent.Properties[logger.AllowNCToHostCommunicationStr] = fmt.Sprintf("%t", createNetworkContainerRequest.AllowNCToHostCommunication)
aiEvent.Properties[logger.AllowHostToNCCommunicationStr] = fmt.Sprintf("%t", createNetworkContainerRequest.AllowHostToNCCommunication)
aiEvent.Properties[logger.NetworkContainerTypeStr] = createNetworkContainerRequest.NetworkContainerType
aiEvent.Properties[logger.OrchestratorContextStr] = fmt.Sprintf("%s", createNetworkContainerRequest.OrchestratorContext)
// TODO - Add for SecondaryIPs (Task: https://msazure.visualstudio.com/One/_workitems/edit/7711831)
logger.LogEvent(aiEvent)
}
// Sends network container snapshots to App Insights telemetry.
func (service *HTTPRestService) logNCSnapshots() {
for _, ncStatus := range service.state.ContainerStatus {
logNCSnapshot(ncStatus.CreateNetworkContainerRequest)
}
logger.Printf("[Azure CNS] Logging periodic NC snapshots. NC Count %d", len(service.state.ContainerStatus))
}
// Sets up periodic timer for sending network container snapshots
func (service *HTTPRestService) SendNCSnapShotPeriodically(ctx context.Context, ncSnapshotIntervalInMinutes int) {
// Emit snapshot on startup and then emit it periodically.
service.logNCSnapshots()
ticker := time.NewTicker(time.Minute * time.Duration(ncSnapshotIntervalInMinutes))
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
service.logNCSnapshots()
}
}
}
func (service *HTTPRestService) validateIPConfigsRequest(ctx context.Context, ipConfigsRequest cns.IPConfigsRequest) (cns.PodInfo, types.ResponseCode, string) {
if ipConfigsRequest.OrchestratorContext == nil {
return nil, types.EmptyOrchestratorContext, fmt.Sprintf("OrchestratorContext is not set in the req: %+v", ipConfigsRequest)
}
// retrieve podinfo from orchestrator context
podInfo, err := cns.NewPodInfoFromIPConfigsRequest(ipConfigsRequest)
if err != nil {
return podInfo, types.UnsupportedOrchestratorContext, err.Error()
}
return podInfo, types.Success, ""
}
// getPrimaryHostInterface returns the cached InterfaceInfo, if available, otherwise
// queries the IMDS to get the primary interface info and caches it in the server state
// before returning the result.
func (service *HTTPRestService) getPrimaryHostInterface(ctx context.Context) (*wireserver.InterfaceInfo, error) {
if service.state.primaryInterface == nil {
res, err := service.wscli.GetInterfaces(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to get interfaces from IMDS")
}
primary, err := wireserver.GetPrimaryInterfaceFromResult(res)
if err != nil {
return nil, errors.Wrap(err, "failed to get primary interface from IMDS response")
}
service.state.primaryInterface = primary
}
return service.state.primaryInterface, nil
}
//nolint:gocritic // ignore hugeParam pls
func (service *HTTPRestService) populateIPConfigInfoUntransacted(ipConfigStatus cns.IPConfigurationStatus, podIPInfo *cns.PodIpInfo) error {
ncStatus, exists := service.state.ContainerStatus[ipConfigStatus.NCID]
if !exists {
return fmt.Errorf("Failed to get NC Configuration for NcId: %s", ipConfigStatus.NCID)
}
primaryIPCfg := ncStatus.CreateNetworkContainerRequest.IPConfiguration
podIPInfo.PodIPConfig = cns.IPSubnet{
IPAddress: ipConfigStatus.IPAddress,
PrefixLength: primaryIPCfg.IPSubnet.PrefixLength,
}
podIPInfo.NetworkContainerPrimaryIPConfig = primaryIPCfg
primaryHostInterface, err := service.getPrimaryHostInterface(context.TODO())
if err != nil {
return err
}
podIPInfo.HostPrimaryIPInfo.PrimaryIP = primaryHostInterface.PrimaryIP
podIPInfo.HostPrimaryIPInfo.Subnet = primaryHostInterface.Subnet
podIPInfo.HostPrimaryIPInfo.Gateway = primaryHostInterface.Gateway
podIPInfo.NICType = cns.InfraNIC
return nil
}
// lowerCaseNCGuid() splits incoming NCID by "Swift_" and lowercase NC GUID; i.e,"Swift_ABCD-CD" -> "Swift_abcd-cd"
func lowerCaseNCGuid(ncid string) string {
ncidHasSwiftPrefix := strings.HasPrefix(ncid, cns.SwiftPrefix)
if ncidHasSwiftPrefix {
return cns.SwiftPrefix + strings.ToLower(strings.TrimPrefix(ncid, cns.SwiftPrefix))
}
return strings.ToLower(ncid)
}
// isNCWaitingForUpdate :- Determine whether NC version on NMA matches programmed version
// Return error and waitingForUpdate as true only CNS gets response from NMAgent indicating
// the VFP programming is pending
// This returns success / waitingForUpdate as false in all other cases.
// V2 is using the nmagent get nc version list api v2 which doesn't need authentication token
func (service *HTTPRestService) isNCWaitingForUpdate(ncVersion, ncid string, ncVersionList map[string]string) (waitingForUpdate bool, returnCode types.ResponseCode, message string) {
ncStatus, ok := service.state.ContainerStatus[ncid]
if ok {
if ncStatus.VfpUpdateComplete &&
(ncStatus.CreateNetworkContainerRequest.Version == ncVersion) {
logger.Printf("[Azure CNS] Network container: %s, version: %s has VFP programming already completed", ncid, ncVersion)
return false, types.NetworkContainerVfpProgramCheckSkipped, ""
}
}
ncTargetVersion, err := strconv.Atoi(ncVersion)
if err != nil {
// NMA doesn't have this NC version in string type, bail out
logger.Printf("[Azure CNS] NC %s version %v from NMAgent NC version list is not string "+
"Skipping GetNCVersionStatus check from NMAgent", ncVersion, ncid)
return true, types.NetworkContainerVfpProgramPending, ""
}
// get the ncVersionList with nc GUID as lower case
// when looking up if the ncid is present in ncVersionList, convert it to lowercase and then look up
nmaProgrammedNCVersionStr, ok := ncVersionList[strings.TrimPrefix(lowerCaseNCGuid(ncid), cns.SwiftPrefix)]
if !ok {
// NMA doesn't have this NC that we need programmed yet, bail out
logger.Printf("[Azure CNS] Failed to get NC %s doesn't exist in NMAgent NC version list "+
"Skipping GetNCVersionStatus check from NMAgent", ncid)
return true, types.NetworkContainerVfpProgramPending, ""
}
nmaProgrammedNCVersion, err := strconv.Atoi(nmaProgrammedNCVersionStr)
if err != nil {
// it's unclear whether or not this can actually happen. In the NMAgent
// documentation, Version is described as a string, but in practice the
// values appear to be exclusively integers. Nevertheless, NMAgent is
// allowed to make this parameter anything (by contract), so we should
// defend against it by erroring appropriately:
logger.Printf("[Azure CNS] Failed to get NC version status from NMAgent with error: %+v. "+
"Skipping GetNCVersionStatus check from NMAgent", err)
return true, types.NetworkContainerVfpProgramCheckSkipped, ""
}
if ncTargetVersion > nmaProgrammedNCVersion {
msg := fmt.Sprintf("Network container: %s version: %d is not yet programmed by NMAgent. Programmed version: %d",
ncid, ncTargetVersion, nmaProgrammedNCVersion)
return false, types.NetworkContainerVfpProgramPending, msg
}
msg := "Vfp programming complete"
logger.Printf("[Azure CNS] Vfp programming complete for NC: %s with version: %d", ncid, ncTargetVersion)
return false, types.NetworkContainerVfpProgramComplete, msg
}
// handleGetNetworkContainers returns all NCs in CNS
func (service *HTTPRestService) handleGetNetworkContainers(w http.ResponseWriter) {
logger.Printf("[Azure CNS] handleGetNetworkContainers")
service.RLock()
networkContainers := make([]cns.GetNetworkContainerResponse, len(service.state.ContainerStatus))
i := 0
for ncID := range service.state.ContainerStatus {
ncDetails := service.state.ContainerStatus[ncID]
getNcResp := cns.GetNetworkContainerResponse{
NetworkContainerID: ncDetails.CreateNetworkContainerRequest.NetworkContainerid,
IPConfiguration: ncDetails.CreateNetworkContainerRequest.IPConfiguration,
Routes: ncDetails.CreateNetworkContainerRequest.Routes,
CnetAddressSpace: ncDetails.CreateNetworkContainerRequest.CnetAddressSpace,
MultiTenancyInfo: ncDetails.CreateNetworkContainerRequest.MultiTenancyInfo,
PrimaryInterfaceIdentifier: ncDetails.CreateNetworkContainerRequest.PrimaryInterfaceIdentifier,
LocalIPConfiguration: ncDetails.CreateNetworkContainerRequest.LocalIPConfiguration,
AllowHostToNCCommunication: ncDetails.CreateNetworkContainerRequest.AllowHostToNCCommunication,
AllowNCToHostCommunication: ncDetails.CreateNetworkContainerRequest.AllowNCToHostCommunication,
}
networkContainers[i] = getNcResp
i++
}
service.RUnlock()
response := cns.GetAllNetworkContainersResponse{
NetworkContainers: networkContainers,
Response: cns.Response{
ReturnCode: types.Success,
},
}
err := acn.Encode(w, &response)
logger.Response(service.Name, response, response.Response.ReturnCode, err)
}
// handlePostNetworkContainers stores all the NCs (from the request that client sent) into CNS's state file
func (service *HTTPRestService) handlePostNetworkContainers(w http.ResponseWriter, r *http.Request) {
logger.Printf("[Azure CNS] handlePostNetworkContainers")
var req cns.PostNetworkContainersRequest
err := acn.Decode(w, r, &req)
logger.Request(service.Name, &req, err)
if err != nil {
response := cns.PostNetworkContainersResponse{
Response: cns.Response{
ReturnCode: types.InvalidRequest,
Message: fmt.Sprintf("[Azure CNS] handlePostNetworkContainers failed with error: %s", err.Error()),
},
}
err = acn.Encode(w, &response)
logger.Response(service.Name, response, response.Response.ReturnCode, err)
return
}
if err := req.Validate(); err != nil { //nolint:govet // shadow okay
logger.Errorf("[Azure CNS] handlePostNetworkContainers failed with error: %s", err.Error())
w.WriteHeader(http.StatusBadRequest)
return
}
createNCsResp := service.createNetworkContainers(req.CreateNetworkContainerRequests)
response := cns.PostNetworkContainersResponse{
Response: createNCsResp,
}
err = acn.Encode(w, &response)
logger.Response(service.Name, response, response.Response.ReturnCode, err)
}
func (service *HTTPRestService) createNetworkContainers(createNetworkContainerRequests []cns.CreateNetworkContainerRequest) cns.Response {
for i := 0; i < len(createNetworkContainerRequests); i++ {
createNcReq := createNetworkContainerRequests[i]
ncDetails, found := service.getNetworkContainerDetails(createNcReq.NetworkContainerid)
// Create NC if it doesn't exist, or it exists and the requested version is different from the saved version
if !found || (found && ncDetails.VMVersion != createNcReq.Version) {
nc := service.networkContainer
if err := nc.Create(createNcReq); err != nil {
return cns.Response{
ReturnCode: types.UnexpectedError,
Message: fmt.Sprintf("[Azure CNS] Create Network Containers failed with error: %s", err.Error()),
}
}
}
// Save NC Goal State details
saveNcReturnCode, saveNcReturnMessage := service.saveNetworkContainerGoalState(createNcReq)
// If NC was created successfully, log NC snapshot.
if saveNcReturnCode != types.Success {
return cns.Response{
ReturnCode: saveNcReturnCode,
Message: saveNcReturnMessage,
}
}
logNCSnapshot(createNcReq)
}
return cns.Response{
ReturnCode: types.Success,
Message: "",
}
}
// setResponse encodes the http response
func (service *HTTPRestService) setResponse(w http.ResponseWriter, returnCode types.ResponseCode, response interface{}) {
serviceErr := acn.Encode(w, &response)
logger.Response(service.Name, response, returnCode, serviceErr)
}
// ncList contains comma-separated list of unique NCs
type ncList string
// only add unique NC to ncList
func (n *ncList) Add(nc string) {
var ncs []string
if len(*n) > 0 {
ncs = strings.Split(string(*n), ",")
}
for _, v := range ncs {
// if NC is already present in ncList, do not add it
if nc == v {
return
}
}
ncs = append(ncs, nc)
*n = ncList(strings.Join(ncs, ","))
}
// delete nc from ncList
// split the slice around the index that contains the NC to delete so that neigher of two resulting nc slices cotnains this NC
// use append menthod to join the new NC slices
func (n *ncList) Delete(nc string) {
ncs := strings.Split(string(*n), ",")
for i, v := range ncs {
if nc == v {
ncs = append((ncs)[:i], (ncs)[i+1:]...)
break
}
}
*n = ncList(strings.Join(ncs, ","))
}
// check if ncList contains NC
func (n *ncList) Contains(nc string) bool {
return strings.Contains(string(*n), nc)
}