func()

in pkg/exporter/task-agent/agent.go [125:236]


func (a *Agent) syncIPCache() error {
	var conn *grpc.ClientConn
	var watchClient rpc.IPCacheService_WatchCacheClient

	entry2IPInfo := func(e *rpc.CacheEntry) *nettop.IPInfo {
		info := &nettop.IPInfo{
			IP: e.IP,
		}
		switch v := e.Meta.(type) {
		case *rpc.CacheEntry_Node:
			info.Type = nettop.IPTypeNode
			info.NodeName = v.Node.Name
		case *rpc.CacheEntry_Pod:
			info.Type = nettop.IPTypePod
			info.PodNamespace = v.Pod.Namespace
			info.PodName = v.Pod.Name
		default:
			return nil
		}
		return info
	}

	reconn := func(maxAttempts int, relist bool) error {
		return retry("watching ipcache", maxAttempts, func() error {
			log.Infof("connecting to controller.")
			if conn != nil {
				_ = conn.Close()
			}
			var err error
			conn, err = a.rpcConnect()
			if err != nil {
				return err
			}

			a.ipCacheClient = rpc.NewIPCacheServiceClient(conn)
			period, revision := nettop.IPCacheRevision()

			if period == "" {
				relist = true
			}

			if relist {
				log.Warnf("list ipcache")
				listResp, err := a.ipCacheClient.ListCache(context.TODO(), &rpc.ListCacheRequest{})
				if err != nil {
					return err
				}

				period = listResp.Period
				revision = listResp.Revision

				log.Infof("current period:%s, revision: %d", period, revision)

				var ipInfoSlice []*nettop.IPInfo
				for _, e := range listResp.Entries {
					info := entry2IPInfo(e)
					if info == nil {
						continue
					}
					ipInfoSlice = append(ipInfoSlice, info)
				}

				nettop.UpdateIPCache(period, revision, ipInfoSlice)
			}

			rq := &rpc.WatchCacheRequest{
				Period:   period,
				Revision: revision,
			}

			watchClient, err = a.ipCacheClient.WatchCache(context.TODO(), rq)
			if err != nil {
				log.Errorf("failed to watch ipcache: %v", err)
				return err
			}
			return nil
		})
	}

	err := reconn(3, false)
	if err != nil {
		return err
	}

	go func() {
		for {
			select {
			case <-watchClient.Context().Done():
				log.Errorf("ipcache watch client closed")
				_ = reconn(-1, false)
				continue
			default:
				resp, err := watchClient.Recv()
				if err != nil {
					log.Errorf("ipcache failed to receive task: %v", err)
					s, ok := status.FromError(err)
					if ok && (s.Code() == codes.DataLoss || s.Code() == codes.InvalidArgument) {
						_ = reconn(-1, true)
					} else {
						_ = reconn(-1, false)
					}
					continue
				}
				info := entry2IPInfo(resp.Entry)
				log.Debugf("ip cache changed op=%d,info=[%s]", resp.Opcode, info)
				nettop.ApplyIPCacheChange(resp.Revision, resp.Opcode, info)
			}
		}
	}()

	return nil
}