static int ll_find_deltas()

in src/pack-objects.c [1163:1308]


static int ll_find_deltas(git_packbuilder *pb, git_pobject **list,
			  size_t list_size, size_t window, size_t depth)
{
	struct thread_params *p;
	size_t i;
	int ret, active_threads = 0;

	if (!pb->nr_threads)
		pb->nr_threads = git_online_cpus();

	if (pb->nr_threads <= 1) {
		find_deltas(pb, list, &list_size, window, depth);
		return 0;
	}

	p = git__mallocarray(pb->nr_threads, sizeof(*p));
	GIT_ERROR_CHECK_ALLOC(p);

	/* Partition the work among the threads */
	for (i = 0; i < pb->nr_threads; ++i) {
		size_t sub_size = list_size / (pb->nr_threads - i);

		/* don't use too small segments or no deltas will be found */
		if (sub_size < 2*window && i+1 < pb->nr_threads)
			sub_size = 0;

		p[i].pb = pb;
		p[i].window = window;
		p[i].depth = depth;
		p[i].working = 1;
		p[i].data_ready = 0;

		/* try to split chunks on "path" boundaries */
		while (sub_size && sub_size < list_size &&
		       list[sub_size]->hash &&
		       list[sub_size]->hash == list[sub_size-1]->hash)
			sub_size++;

		p[i].list = list;
		p[i].list_size = sub_size;
		p[i].remaining = sub_size;

		list += sub_size;
		list_size -= sub_size;
	}

	/* Start work threads */
	for (i = 0; i < pb->nr_threads; ++i) {
		if (!p[i].list_size)
			continue;

		git_mutex_init(&p[i].mutex);
		git_cond_init(&p[i].cond);

		ret = git_thread_create(&p[i].thread,
					threaded_find_deltas, &p[i]);
		if (ret) {
			git_error_set(GIT_ERROR_THREAD, "unable to create thread");
			return -1;
		}
		active_threads++;
	}

	/*
	 * Now let's wait for work completion.  Each time a thread is done
	 * with its work, we steal half of the remaining work from the
	 * thread with the largest number of unprocessed objects and give
	 * it to that newly idle thread.  This ensure good load balancing
	 * until the remaining object list segments are simply too short
	 * to be worth splitting anymore.
	 */
	while (active_threads) {
		struct thread_params *target = NULL;
		struct thread_params *victim = NULL;
		size_t sub_size = 0;

		/* Start by locating a thread that has transitioned its
		 * 'working' flag from 1 -> 0. This indicates that it is
		 * ready to receive more work using our work-stealing
		 * algorithm. */
		git_packbuilder__progress_lock(pb);
		for (;;) {
			for (i = 0; !target && i < pb->nr_threads; i++)
				if (!p[i].working)
					target = &p[i];
			if (target)
				break;
			git_cond_wait(&pb->progress_cond, &pb->progress_mutex);
		}

		/* At this point we hold the progress lock and have located
		 * a thread to receive more work. We still need to locate a
		 * thread from which to steal work (the victim). */
		for (i = 0; i < pb->nr_threads; i++)
			if (p[i].remaining > 2*window &&
			    (!victim || victim->remaining < p[i].remaining))
				victim = &p[i];

		if (victim) {
			sub_size = victim->remaining / 2;
			list = victim->list + victim->list_size - sub_size;
			while (sub_size && list[0]->hash &&
			       list[0]->hash == list[-1]->hash) {
				list++;
				sub_size--;
			}
			if (!sub_size) {
				/*
				 * It is possible for some "paths" to have
				 * so many objects that no hash boundary
				 * might be found.  Let's just steal the
				 * exact half in that case.
				 */
				sub_size = victim->remaining / 2;
				list -= sub_size;
			}
			target->list = list;
			victim->list_size -= sub_size;
			victim->remaining -= sub_size;
		}
		target->list_size = sub_size;
		target->remaining = sub_size;
		target->working = 1;
		git_packbuilder__progress_unlock(pb);

		if (git_mutex_lock(&target->mutex)) {
			git_error_set(GIT_ERROR_THREAD, "unable to lock packfile condition mutex");
			git__free(p);
			return -1;
		}

		target->data_ready = 1;
		git_cond_signal(&target->cond);
		git_mutex_unlock(&target->mutex);

		if (!sub_size) {
			git_thread_join(&target->thread, NULL);
			git_cond_free(&target->cond);
			git_mutex_free(&target->mutex);
			active_threads--;
		}
	}

	git__free(p);
	return 0;
}