func()

in subscriber/common/job/controller.go [277:424]


func (c *Controller) SyncUpJobConfigs() {
	c.Lock()
	defer c.Unlock()

	// Check if the hash of the assignment is changed or not
	updateHash, newAssignmentHash := c.updateAssignmentHash()
	if !updateHash {
		c.serviceConfig.Scope.Counter("syncUp.skipped").Inc(1)
		return
	}

	// Get assignment from aresDB controller since hash is changed
	assigned, err := c.aresControllerClient.GetAssignment(c.jobNS, c.serviceConfig.Environment.InstanceID)
	if err != nil {
		c.serviceConfig.Logger.Error("Failed to get assignment from aresDB controller",
			zap.String("jobNamespace", c.jobNS),
			zap.String("aresDB Controller", c.serviceConfig.ControllerConfig.Address),
			zap.Error(err))
		c.serviceConfig.Scope.Counter("syncUp.failed").Inc(1)
		return
	}

	assignment, err := rules.NewAssignmentFromController(assigned)
	if err != nil {
		c.serviceConfig.Logger.Error("Failed to populate assignment from controller assignment",
			zap.String("jobNamespace", c.jobNS),
			zap.String("aresDB Controller", c.serviceConfig.ControllerConfig.Address),
			zap.Error(err))
		c.serviceConfig.Scope.Counter("syncUp.failed").Inc(1)
		return
	}

	c.serviceConfig.Logger.Info("Got assignment from aresDB controller",
		zap.String("jobNamespace", c.jobNS),
		zap.String("aresDB Controller", c.serviceConfig.ControllerConfig.Address),
		zap.String("activeAresNameSpace", config.ActiveAresNameSpace),
		zap.Any("aresClusterNSConfig", c.serviceConfig.AresNSConfig),
		zap.Any("activeAresClusters", c.serviceConfig.ActiveAresClusters),
		zap.Any("assignement", assignment))

	newJobs := make(map[string]*rules.JobConfig)
	// Add or Update jobs
	for _, jobConfig := range assignment.Jobs {
		newJobs[jobConfig.Name] = jobConfig
		if aresClusterDrivers, ok := c.Drivers[jobConfig.Name]; ok {
			// case1: existing jobConfig
			for aresCluster, driver := range aresClusterDrivers {
				if _, ok := assignment.AresClusters[aresCluster]; !ok {
					// case1.1: delete the driver because aresCluster is deleted
					activeAresCluster, exist := c.serviceConfig.ActiveAresClusters[aresCluster]
					if exist && activeAresCluster.GetSinkMode() != config.Sink_Kafka {
						c.deleteDriver(driver, aresCluster, aresClusterDrivers)
						c.serviceConfig.Logger.Info("deleted driver due to the removed aresCluster",
							zap.String("job", jobConfig.Name),
							zap.String("aresCluster", aresCluster))
					}
					continue
				}
				if driver.jobConfig.Version != jobConfig.Version {
					// case1.2: restart the driver because jobConfig version is changed,
					if !c.addDriver(jobConfig, aresCluster, aresClusterDrivers, true) {
						updateHash = false
						c.serviceConfig.Logger.Info("restarted driver due to version changes",
							zap.String("job", jobConfig.Name),
							zap.String("aresCluster", aresCluster))
					}
				}
			}
			for aresCluster, aresClusterObj := range assignment.AresClusters {
				if _, ok := aresClusterDrivers[aresCluster]; !ok {
					// case1.3 add a new driver because a new aresCluster is added
					c.serviceConfig.ActiveAresClusters[aresCluster] = aresClusterObj
					if !c.addDriver(jobConfig, aresCluster, aresClusterDrivers, false) {
						updateHash = false
						c.serviceConfig.Logger.Info("added driver due to the new aresCluster",
							zap.String("job", jobConfig.Name),
							zap.String("aresCluster", aresCluster))
					}
				}
			}
		} else {
			// case2: a new jobConfig
			aresClusterDrivers := make(map[string]*Driver)
			if len(assignment.AresClusters) != 0 {
				for aresCluster, aresClusterObj := range assignment.AresClusters {
					// case2.1: add a new driver for each aresCluster
					c.serviceConfig.ActiveAresClusters[aresCluster] = aresClusterObj
					if !c.addDriver(jobConfig, aresCluster, aresClusterDrivers, false) {
						updateHash = false
						c.serviceConfig.Logger.Info("added driver (aresDB sink) due to the new job",
							zap.String("job", jobConfig.Name),
							zap.String("aresCluster", aresCluster))
					}
				}
			} else {
				for aresCluster, aresClusterObj := range c.serviceConfig.ActiveAresClusters {
					if aresClusterObj.GetSinkMode() == config.Sink_Kafka {
						if !c.addDriver(jobConfig, aresCluster, aresClusterDrivers, false) {
							updateHash = false
							c.serviceConfig.Logger.Info("added driver (kafka sink) due to the new job",
								zap.String("job", jobConfig.Name),
								zap.String("aresCluster", aresCluster))
						}
					} else {
						c.serviceConfig.Logger.Error("missing aresDB instance in assignment",
							zap.String("job", jobConfig.Name),
							zap.String("aresCluster", aresCluster))
					}
				}
			}
			c.Drivers[jobConfig.Name] = aresClusterDrivers
			for aresCluster, aresClusterObj := range c.serviceConfig.ActiveAresClusters {
				// case2.2: delete the aresCluster from ActiveAresClusters because it is deleted from assignment
				if _, ok := assignment.AresClusters[aresCluster]; !ok && aresClusterObj.GetSinkMode() != config.Sink_Kafka {
					delete(c.serviceConfig.ActiveAresClusters, aresCluster)
				}
			}
		}
	}

	// Delete jobs
	for jobName, aresClusterDrivers := range c.Drivers {
		if _, ok := newJobs[jobName]; !ok {
			// case3: jobConfig is deleted
			for aresCluster, driver := range aresClusterDrivers {
				c.deleteDriver(driver, aresCluster, aresClusterDrivers)
			}
			c.Drivers[jobName] = nil
			delete(c.Drivers, jobName)
			c.serviceConfig.Logger.Info("deleted all drivers",
				zap.String("job", jobName))
		}
	}

	// Update local hash codes
	if updateHash {
		c.serviceConfig.Logger.Info("Update assignment hash",
			zap.String("jobNamespace", c.jobNS),
			zap.String("aresDB Controller", c.serviceConfig.ControllerConfig.Address),
			zap.String("oldHash", c.assignmentHashCode),
			zap.String("newHash", newAssignmentHash))
		c.assignmentHashCode = newAssignmentHash
		c.serviceConfig.Scope.Counter("syncUp.succeeded").Inc(1)
	}
	c.serviceConfig.Scope.Counter("syncUp.failed").Inc(1)

	return
}