in pkg/exporter/task-agent/agent.go [125:236]
func (a *Agent) syncIPCache() error {
var conn *grpc.ClientConn
var watchClient rpc.IPCacheService_WatchCacheClient
entry2IPInfo := func(e *rpc.CacheEntry) *nettop.IPInfo {
info := &nettop.IPInfo{
IP: e.IP,
}
switch v := e.Meta.(type) {
case *rpc.CacheEntry_Node:
info.Type = nettop.IPTypeNode
info.NodeName = v.Node.Name
case *rpc.CacheEntry_Pod:
info.Type = nettop.IPTypePod
info.PodNamespace = v.Pod.Namespace
info.PodName = v.Pod.Name
default:
return nil
}
return info
}
reconn := func(maxAttempts int, relist bool) error {
return retry("watching ipcache", maxAttempts, func() error {
log.Infof("connecting to controller.")
if conn != nil {
_ = conn.Close()
}
var err error
conn, err = a.rpcConnect()
if err != nil {
return err
}
a.ipCacheClient = rpc.NewIPCacheServiceClient(conn)
period, revision := nettop.IPCacheRevision()
if period == "" {
relist = true
}
if relist {
log.Warnf("list ipcache")
listResp, err := a.ipCacheClient.ListCache(context.TODO(), &rpc.ListCacheRequest{})
if err != nil {
return err
}
period = listResp.Period
revision = listResp.Revision
log.Infof("current period:%s, revision: %d", period, revision)
var ipInfoSlice []*nettop.IPInfo
for _, e := range listResp.Entries {
info := entry2IPInfo(e)
if info == nil {
continue
}
ipInfoSlice = append(ipInfoSlice, info)
}
nettop.UpdateIPCache(period, revision, ipInfoSlice)
}
rq := &rpc.WatchCacheRequest{
Period: period,
Revision: revision,
}
watchClient, err = a.ipCacheClient.WatchCache(context.TODO(), rq)
if err != nil {
log.Errorf("failed to watch ipcache: %v", err)
return err
}
return nil
})
}
err := reconn(3, false)
if err != nil {
return err
}
go func() {
for {
select {
case <-watchClient.Context().Done():
log.Errorf("ipcache watch client closed")
_ = reconn(-1, false)
continue
default:
resp, err := watchClient.Recv()
if err != nil {
log.Errorf("ipcache failed to receive task: %v", err)
s, ok := status.FromError(err)
if ok && (s.Code() == codes.DataLoss || s.Code() == codes.InvalidArgument) {
_ = reconn(-1, true)
} else {
_ = reconn(-1, false)
}
continue
}
info := entry2IPInfo(resp.Entry)
log.Debugf("ip cache changed op=%d,info=[%s]", resp.Opcode, info)
nettop.ApplyIPCacheChange(resp.Revision, resp.Opcode, info)
}
}
}()
return nil
}