in subscriber/common/job/controller.go [277:424]
func (c *Controller) SyncUpJobConfigs() {
c.Lock()
defer c.Unlock()
// Check if the hash of the assignment is changed or not
updateHash, newAssignmentHash := c.updateAssignmentHash()
if !updateHash {
c.serviceConfig.Scope.Counter("syncUp.skipped").Inc(1)
return
}
// Get assignment from aresDB controller since hash is changed
assigned, err := c.aresControllerClient.GetAssignment(c.jobNS, c.serviceConfig.Environment.InstanceID)
if err != nil {
c.serviceConfig.Logger.Error("Failed to get assignment from aresDB controller",
zap.String("jobNamespace", c.jobNS),
zap.String("aresDB Controller", c.serviceConfig.ControllerConfig.Address),
zap.Error(err))
c.serviceConfig.Scope.Counter("syncUp.failed").Inc(1)
return
}
assignment, err := rules.NewAssignmentFromController(assigned)
if err != nil {
c.serviceConfig.Logger.Error("Failed to populate assignment from controller assignment",
zap.String("jobNamespace", c.jobNS),
zap.String("aresDB Controller", c.serviceConfig.ControllerConfig.Address),
zap.Error(err))
c.serviceConfig.Scope.Counter("syncUp.failed").Inc(1)
return
}
c.serviceConfig.Logger.Info("Got assignment from aresDB controller",
zap.String("jobNamespace", c.jobNS),
zap.String("aresDB Controller", c.serviceConfig.ControllerConfig.Address),
zap.String("activeAresNameSpace", config.ActiveAresNameSpace),
zap.Any("aresClusterNSConfig", c.serviceConfig.AresNSConfig),
zap.Any("activeAresClusters", c.serviceConfig.ActiveAresClusters),
zap.Any("assignement", assignment))
newJobs := make(map[string]*rules.JobConfig)
// Add or Update jobs
for _, jobConfig := range assignment.Jobs {
newJobs[jobConfig.Name] = jobConfig
if aresClusterDrivers, ok := c.Drivers[jobConfig.Name]; ok {
// case1: existing jobConfig
for aresCluster, driver := range aresClusterDrivers {
if _, ok := assignment.AresClusters[aresCluster]; !ok {
// case1.1: delete the driver because aresCluster is deleted
activeAresCluster, exist := c.serviceConfig.ActiveAresClusters[aresCluster]
if exist && activeAresCluster.GetSinkMode() != config.Sink_Kafka {
c.deleteDriver(driver, aresCluster, aresClusterDrivers)
c.serviceConfig.Logger.Info("deleted driver due to the removed aresCluster",
zap.String("job", jobConfig.Name),
zap.String("aresCluster", aresCluster))
}
continue
}
if driver.jobConfig.Version != jobConfig.Version {
// case1.2: restart the driver because jobConfig version is changed,
if !c.addDriver(jobConfig, aresCluster, aresClusterDrivers, true) {
updateHash = false
c.serviceConfig.Logger.Info("restarted driver due to version changes",
zap.String("job", jobConfig.Name),
zap.String("aresCluster", aresCluster))
}
}
}
for aresCluster, aresClusterObj := range assignment.AresClusters {
if _, ok := aresClusterDrivers[aresCluster]; !ok {
// case1.3 add a new driver because a new aresCluster is added
c.serviceConfig.ActiveAresClusters[aresCluster] = aresClusterObj
if !c.addDriver(jobConfig, aresCluster, aresClusterDrivers, false) {
updateHash = false
c.serviceConfig.Logger.Info("added driver due to the new aresCluster",
zap.String("job", jobConfig.Name),
zap.String("aresCluster", aresCluster))
}
}
}
} else {
// case2: a new jobConfig
aresClusterDrivers := make(map[string]*Driver)
if len(assignment.AresClusters) != 0 {
for aresCluster, aresClusterObj := range assignment.AresClusters {
// case2.1: add a new driver for each aresCluster
c.serviceConfig.ActiveAresClusters[aresCluster] = aresClusterObj
if !c.addDriver(jobConfig, aresCluster, aresClusterDrivers, false) {
updateHash = false
c.serviceConfig.Logger.Info("added driver (aresDB sink) due to the new job",
zap.String("job", jobConfig.Name),
zap.String("aresCluster", aresCluster))
}
}
} else {
for aresCluster, aresClusterObj := range c.serviceConfig.ActiveAresClusters {
if aresClusterObj.GetSinkMode() == config.Sink_Kafka {
if !c.addDriver(jobConfig, aresCluster, aresClusterDrivers, false) {
updateHash = false
c.serviceConfig.Logger.Info("added driver (kafka sink) due to the new job",
zap.String("job", jobConfig.Name),
zap.String("aresCluster", aresCluster))
}
} else {
c.serviceConfig.Logger.Error("missing aresDB instance in assignment",
zap.String("job", jobConfig.Name),
zap.String("aresCluster", aresCluster))
}
}
}
c.Drivers[jobConfig.Name] = aresClusterDrivers
for aresCluster, aresClusterObj := range c.serviceConfig.ActiveAresClusters {
// case2.2: delete the aresCluster from ActiveAresClusters because it is deleted from assignment
if _, ok := assignment.AresClusters[aresCluster]; !ok && aresClusterObj.GetSinkMode() != config.Sink_Kafka {
delete(c.serviceConfig.ActiveAresClusters, aresCluster)
}
}
}
}
// Delete jobs
for jobName, aresClusterDrivers := range c.Drivers {
if _, ok := newJobs[jobName]; !ok {
// case3: jobConfig is deleted
for aresCluster, driver := range aresClusterDrivers {
c.deleteDriver(driver, aresCluster, aresClusterDrivers)
}
c.Drivers[jobName] = nil
delete(c.Drivers, jobName)
c.serviceConfig.Logger.Info("deleted all drivers",
zap.String("job", jobName))
}
}
// Update local hash codes
if updateHash {
c.serviceConfig.Logger.Info("Update assignment hash",
zap.String("jobNamespace", c.jobNS),
zap.String("aresDB Controller", c.serviceConfig.ControllerConfig.Address),
zap.String("oldHash", c.assignmentHashCode),
zap.String("newHash", newAssignmentHash))
c.assignmentHashCode = newAssignmentHash
c.serviceConfig.Scope.Counter("syncUp.succeeded").Inc(1)
}
c.serviceConfig.Scope.Counter("syncUp.failed").Inc(1)
return
}