func()

in controller/tasks/etcd/ingestion_assignment.go [317:426]


func (ia *ingestionAssignmentTask) processIngestionAssignment(state jobSubscriberState) (changes, errs int) {
	// build consistent hash ring where a ring node is a subscriber
	// and resource key is the kafka topic name.
	// this will guarantee minimum change for a topic's ingestion assignment
	// when subscribers join/leave the cluster
	ring := consistenthasing.NewRing()
	for _, subscriber := range state.subscribers {
		err := ring.AddNode(subscriber.Name)
		if err != nil {
			ia.reportError(err, &errs)
			return
		}
	}

	jobAssignments := map[string][]models.JobConfig{}

	// TODO: take subscriber instance capacity into consideration when assigning jobs
	for _, job := range state.jobs {
		processorsNeeded := job.StreamingConfig.ProcessorCount
		if processorsNeeded <= 0 {
			continue
		}
		processorsPerSubscriber := processorsNeeded / len(state.subscribers)
		if processorsPerSubscriber < 1 {
			processorsPerSubscriber = 1
		}

		// calculate starting node of task assignment base on kafka topic name
		startingIndex, _ := ring.Get(job.StreamingConfig.Topic)
		for i := 0; processorsNeeded > 0; i++ {
			processorsToAssign := processorsPerSubscriber
			if processorsNeeded < processorsToAssign {
				processorsToAssign = processorsNeeded
			}
			j := job
			j.StreamingConfig.ProcessorCount = processorsToAssign
			index := (startingIndex + i) % len(state.subscribers)
			subscriberName := ring.Nodes[index].ID
			jobAssignments[subscriberName] = append(jobAssignments[subscriberName], j)
			processorsNeeded -= processorsToAssign
		}
	}

	existingAssignments, err := ia.assignmentsMutator.GetIngestionAssignments(state.namespace)
	if err != nil {
		ia.reportError(err, &errs)
		return
	}

	existingAssignmentsMap := map[string]*models.IngestionAssignment{}
	for _, assignment := range existingAssignments {
		subscriber := assignment.Subscriber
		existingAssignmentsMap[subscriber] = &assignment
	}

	for _, subscriberObj := range state.subscribers {
		subscriber := subscriberObj.Name
		jobAssignment, newExists := jobAssignments[subscriber]
		existingAssignment, oldExist := existingAssignmentsMap[subscriber]
		if newExists {
			if oldExist {
				if !reflect.DeepEqual(existingAssignment.Jobs, jobAssignment) {
					// update existing assignment
					err = ia.assignmentsMutator.UpdateIngestionAssignment(state.namespace, models.IngestionAssignment{
						Subscriber: subscriber,
						Jobs:       jobAssignment,
					})
					if err != nil {
						ia.reportError(err, &errs)
					} else {
						changes++
					}
				}
				// mark not deleted
				existingAssignmentsMap[subscriber] = nil
			} else {
				// new assignment
				err = ia.assignmentsMutator.AddIngestionAssignment(state.namespace, models.IngestionAssignment{
					Subscriber: subscriber,
					Jobs:       jobAssignment,
				})
				if err != nil {
					ia.reportError(err, &errs)
				} else {
					changes++
				}
			}
		} else if !oldExist {
			// add dummy assignment
			err = ia.assignmentsMutator.AddIngestionAssignment(state.namespace, models.IngestionAssignment{
				Subscriber: subscriber,
				Jobs:       []models.JobConfig{},
			})
		}
	}

	for k, v := range existingAssignmentsMap {
		if v != nil {
			ia.logger.With(
				"assignment", k,
			).Info("deleting assignment")
			err = ia.assignmentsMutator.DeleteIngestionAssignment(state.namespace, k)
			if err != nil {
				ia.reportError(err, &errs)
			}
			changes++
		}
	}
	return
}