in datasource/etcd/sd/k8s/adaptor/cacher_instance.go [76:144]
func (c *InstanceCacher) onEndpointsEvent(evt K8sEvent) {
ep := evt.Object.(*v1.Endpoints)
svc := Kubernetes().GetService(ep.Namespace, ep.Name)
if svc == nil || !ShouldRegisterService(svc) {
return
}
domainProject := Kubernetes().GetDomainProject()
serviceID := generateServiceID(domainProject, svc)
oldKvs := c.getInstances(domainProject, serviceID)
newKvs := make(map[string]*kvstore.KeyValue)
for _, ss := range ep.Subsets {
for _, ea := range ss.Addresses {
pod := Kubernetes().GetPodByIP(ea.IP)
if pod == nil {
continue
}
instanceID := UUID(pod.UID)
key := path.GenerateInstanceKey(Kubernetes().GetDomainProject(), serviceID, instanceID)
switch evt.EventType {
case pb.EVT_CREATE, pb.EVT_UPDATE:
if pod.Status.Phase != v1.PodRunning {
continue
}
node := Kubernetes().GetNodeByPod(pod)
if node == nil {
continue
}
inst := &pb.MicroServiceInstance{
InstanceId: instanceID,
ServiceId: serviceID,
HostName: pod.Name,
Status: pb.MSI_UP,
DataCenterInfo: &pb.DataCenterInfo{},
Timestamp: strconv.FormatInt(pod.CreationTimestamp.Unix(), 10),
Version: getLabel(svc.Labels, LabelVersion, pb.VERSION),
Properties: map[string]string{
PropNodeIP: pod.Status.HostIP,
},
}
inst.DataCenterInfo.Region, inst.DataCenterInfo.AvailableZone = getRegionAZ(node)
inst.ModTimestamp = inst.Timestamp
for _, port := range ss.Ports {
inst.Endpoints = append(inst.Endpoints, generateEndpoint(ea.IP, port))
}
old := c.Cache().Get(key)
kv := AsKeyValue(key, inst, pod.ResourceVersion)
newKvs[key] = kv
if old == nil {
c.Notify(pb.EVT_CREATE, key, kv)
} else if !reflect.DeepEqual(old, kv) {
c.Notify(pb.EVT_UPDATE, key, kv)
}
case pb.EVT_DELETE:
}
}
}
for k, v := range oldKvs {
if _, ok := newKvs[k]; !ok {
c.Notify(pb.EVT_DELETE, k, v)
}
}
}