in src/pack-objects.c [1163:1308]
static int ll_find_deltas(git_packbuilder *pb, git_pobject **list,
size_t list_size, size_t window, size_t depth)
{
struct thread_params *p;
size_t i;
int ret, active_threads = 0;
if (!pb->nr_threads)
pb->nr_threads = git_online_cpus();
if (pb->nr_threads <= 1) {
find_deltas(pb, list, &list_size, window, depth);
return 0;
}
p = git__mallocarray(pb->nr_threads, sizeof(*p));
GIT_ERROR_CHECK_ALLOC(p);
/* Partition the work among the threads */
for (i = 0; i < pb->nr_threads; ++i) {
size_t sub_size = list_size / (pb->nr_threads - i);
/* don't use too small segments or no deltas will be found */
if (sub_size < 2*window && i+1 < pb->nr_threads)
sub_size = 0;
p[i].pb = pb;
p[i].window = window;
p[i].depth = depth;
p[i].working = 1;
p[i].data_ready = 0;
/* try to split chunks on "path" boundaries */
while (sub_size && sub_size < list_size &&
list[sub_size]->hash &&
list[sub_size]->hash == list[sub_size-1]->hash)
sub_size++;
p[i].list = list;
p[i].list_size = sub_size;
p[i].remaining = sub_size;
list += sub_size;
list_size -= sub_size;
}
/* Start work threads */
for (i = 0; i < pb->nr_threads; ++i) {
if (!p[i].list_size)
continue;
git_mutex_init(&p[i].mutex);
git_cond_init(&p[i].cond);
ret = git_thread_create(&p[i].thread,
threaded_find_deltas, &p[i]);
if (ret) {
git_error_set(GIT_ERROR_THREAD, "unable to create thread");
return -1;
}
active_threads++;
}
/*
* Now let's wait for work completion. Each time a thread is done
* with its work, we steal half of the remaining work from the
* thread with the largest number of unprocessed objects and give
* it to that newly idle thread. This ensure good load balancing
* until the remaining object list segments are simply too short
* to be worth splitting anymore.
*/
while (active_threads) {
struct thread_params *target = NULL;
struct thread_params *victim = NULL;
size_t sub_size = 0;
/* Start by locating a thread that has transitioned its
* 'working' flag from 1 -> 0. This indicates that it is
* ready to receive more work using our work-stealing
* algorithm. */
git_packbuilder__progress_lock(pb);
for (;;) {
for (i = 0; !target && i < pb->nr_threads; i++)
if (!p[i].working)
target = &p[i];
if (target)
break;
git_cond_wait(&pb->progress_cond, &pb->progress_mutex);
}
/* At this point we hold the progress lock and have located
* a thread to receive more work. We still need to locate a
* thread from which to steal work (the victim). */
for (i = 0; i < pb->nr_threads; i++)
if (p[i].remaining > 2*window &&
(!victim || victim->remaining < p[i].remaining))
victim = &p[i];
if (victim) {
sub_size = victim->remaining / 2;
list = victim->list + victim->list_size - sub_size;
while (sub_size && list[0]->hash &&
list[0]->hash == list[-1]->hash) {
list++;
sub_size--;
}
if (!sub_size) {
/*
* It is possible for some "paths" to have
* so many objects that no hash boundary
* might be found. Let's just steal the
* exact half in that case.
*/
sub_size = victim->remaining / 2;
list -= sub_size;
}
target->list = list;
victim->list_size -= sub_size;
victim->remaining -= sub_size;
}
target->list_size = sub_size;
target->remaining = sub_size;
target->working = 1;
git_packbuilder__progress_unlock(pb);
if (git_mutex_lock(&target->mutex)) {
git_error_set(GIT_ERROR_THREAD, "unable to lock packfile condition mutex");
git__free(p);
return -1;
}
target->data_ready = 1;
git_cond_signal(&target->cond);
git_mutex_unlock(&target->mutex);
if (!sub_size) {
git_thread_join(&target->thread, NULL);
git_cond_free(&target->cond);
git_mutex_free(&target->mutex);
active_threads--;
}
}
git__free(p);
return 0;
}