func()

in datasource/etcd/sd/k8s/adaptor/cacher_instance.go [76:144]


func (c *InstanceCacher) onEndpointsEvent(evt K8sEvent) {
	ep := evt.Object.(*v1.Endpoints)
	svc := Kubernetes().GetService(ep.Namespace, ep.Name)
	if svc == nil || !ShouldRegisterService(svc) {
		return
	}

	domainProject := Kubernetes().GetDomainProject()
	serviceID := generateServiceID(domainProject, svc)

	oldKvs := c.getInstances(domainProject, serviceID)
	newKvs := make(map[string]*kvstore.KeyValue)
	for _, ss := range ep.Subsets {
		for _, ea := range ss.Addresses {
			pod := Kubernetes().GetPodByIP(ea.IP)
			if pod == nil {
				continue
			}

			instanceID := UUID(pod.UID)
			key := path.GenerateInstanceKey(Kubernetes().GetDomainProject(), serviceID, instanceID)
			switch evt.EventType {
			case pb.EVT_CREATE, pb.EVT_UPDATE:
				if pod.Status.Phase != v1.PodRunning {
					continue
				}

				node := Kubernetes().GetNodeByPod(pod)
				if node == nil {
					continue
				}

				inst := &pb.MicroServiceInstance{
					InstanceId:     instanceID,
					ServiceId:      serviceID,
					HostName:       pod.Name,
					Status:         pb.MSI_UP,
					DataCenterInfo: &pb.DataCenterInfo{},
					Timestamp:      strconv.FormatInt(pod.CreationTimestamp.Unix(), 10),
					Version:        getLabel(svc.Labels, LabelVersion, pb.VERSION),
					Properties: map[string]string{
						PropNodeIP: pod.Status.HostIP,
					},
				}
				inst.DataCenterInfo.Region, inst.DataCenterInfo.AvailableZone = getRegionAZ(node)
				inst.ModTimestamp = inst.Timestamp
				for _, port := range ss.Ports {
					inst.Endpoints = append(inst.Endpoints, generateEndpoint(ea.IP, port))
				}

				old := c.Cache().Get(key)
				kv := AsKeyValue(key, inst, pod.ResourceVersion)
				newKvs[key] = kv

				if old == nil {
					c.Notify(pb.EVT_CREATE, key, kv)
				} else if !reflect.DeepEqual(old, kv) {
					c.Notify(pb.EVT_UPDATE, key, kv)
				}
			case pb.EVT_DELETE:
			}
		}
	}
	for k, v := range oldKvs {
		if _, ok := newKvs[k]; !ok {
			c.Notify(pb.EVT_DELETE, k, v)
		}
	}
}