in controller/tasks/etcd/ingestion_assignment.go [317:426]
func (ia *ingestionAssignmentTask) processIngestionAssignment(state jobSubscriberState) (changes, errs int) {
// build consistent hash ring where a ring node is a subscriber
// and resource key is the kafka topic name.
// this will guarantee minimum change for a topic's ingestion assignment
// when subscribers join/leave the cluster
ring := consistenthasing.NewRing()
for _, subscriber := range state.subscribers {
err := ring.AddNode(subscriber.Name)
if err != nil {
ia.reportError(err, &errs)
return
}
}
jobAssignments := map[string][]models.JobConfig{}
// TODO: take subscriber instance capacity into consideration when assigning jobs
for _, job := range state.jobs {
processorsNeeded := job.StreamingConfig.ProcessorCount
if processorsNeeded <= 0 {
continue
}
processorsPerSubscriber := processorsNeeded / len(state.subscribers)
if processorsPerSubscriber < 1 {
processorsPerSubscriber = 1
}
// calculate starting node of task assignment base on kafka topic name
startingIndex, _ := ring.Get(job.StreamingConfig.Topic)
for i := 0; processorsNeeded > 0; i++ {
processorsToAssign := processorsPerSubscriber
if processorsNeeded < processorsToAssign {
processorsToAssign = processorsNeeded
}
j := job
j.StreamingConfig.ProcessorCount = processorsToAssign
index := (startingIndex + i) % len(state.subscribers)
subscriberName := ring.Nodes[index].ID
jobAssignments[subscriberName] = append(jobAssignments[subscriberName], j)
processorsNeeded -= processorsToAssign
}
}
existingAssignments, err := ia.assignmentsMutator.GetIngestionAssignments(state.namespace)
if err != nil {
ia.reportError(err, &errs)
return
}
existingAssignmentsMap := map[string]*models.IngestionAssignment{}
for _, assignment := range existingAssignments {
subscriber := assignment.Subscriber
existingAssignmentsMap[subscriber] = &assignment
}
for _, subscriberObj := range state.subscribers {
subscriber := subscriberObj.Name
jobAssignment, newExists := jobAssignments[subscriber]
existingAssignment, oldExist := existingAssignmentsMap[subscriber]
if newExists {
if oldExist {
if !reflect.DeepEqual(existingAssignment.Jobs, jobAssignment) {
// update existing assignment
err = ia.assignmentsMutator.UpdateIngestionAssignment(state.namespace, models.IngestionAssignment{
Subscriber: subscriber,
Jobs: jobAssignment,
})
if err != nil {
ia.reportError(err, &errs)
} else {
changes++
}
}
// mark not deleted
existingAssignmentsMap[subscriber] = nil
} else {
// new assignment
err = ia.assignmentsMutator.AddIngestionAssignment(state.namespace, models.IngestionAssignment{
Subscriber: subscriber,
Jobs: jobAssignment,
})
if err != nil {
ia.reportError(err, &errs)
} else {
changes++
}
}
} else if !oldExist {
// add dummy assignment
err = ia.assignmentsMutator.AddIngestionAssignment(state.namespace, models.IngestionAssignment{
Subscriber: subscriber,
Jobs: []models.JobConfig{},
})
}
}
for k, v := range existingAssignmentsMap {
if v != nil {
ia.logger.With(
"assignment", k,
).Info("deleting assignment")
err = ia.assignmentsMutator.DeleteIngestionAssignment(state.namespace, k)
if err != nil {
ia.reportError(err, &errs)
}
changes++
}
}
return
}