From 1808e585316e1f1ee9ea5b40c3162f6ca89af350 Mon Sep 17 00:00:00 2001 From: Heng Li Date: Thu, 10 Oct 2013 00:20:08 -0400 Subject: [PATCH] revert to kt_for() the new version is buggy and even if I could fix it, I would not feel confident. Perhaps I will come back later when I am more competent. --- kthread.c | 225 +++++++++++++++++++----------------------------------- kthread.h | 27 ------- 2 files changed, 78 insertions(+), 174 deletions(-) delete mode 100644 kthread.h diff --git a/kthread.c b/kthread.c index 789ad69..9ebdc18 100644 --- a/kthread.c +++ b/kthread.c @@ -3,13 +3,13 @@ #include #include -#define KT_DQ_BITS 5 // 1<n_threads; ++i) - if (max < dq_size(t->w[i].q)) // max is not accurate as other workers may steal from the same queue, but it does not matter. - max = dq_size(t->w[i].q), max_i = i; - if (max_i < 0 || dq_deq(t->w[max_i].q, 0, &k) < 0) k = (uint64_t)-1; - return k; -} + int i; +} ktf_worker_t; -static inline void do_task(kthread_t *t, uint64_t sid) +static inline int steal_work(kt_for_t *f) // steal work from the worker with the highest load { - kt_task_t *s = &t->tasks[sid>>32]; - s->func(s->shared, (int)sid, (uint8_t*)s->items + s->item_size * (uint32_t)sid); + int i, max = -1, max_i = -1, k = -1; + for (i = 0; i < f->n; ++i) + if (max < dq_size(f->w[i].q)) // max is not accurate as other workers may steal from the same queue, but it does not matter. + max = dq_size(f->w[i].q), max_i = i; + if (max_i < 0 || dq_deq(f->w[max_i].q, 0, &k) < 0) k = -1; + return k; } -static void *slave(void *data) +static void *ktf_worker(void *data) { - kt_worker_t *w = (kt_worker_t*)data; + ktf_worker_t *w = (ktf_worker_t*)data; for (;;) { - uint64_t sid; - if (dq_deq(w->q, 1, &sid) < 0) - sid = steal_task(w->t); - if (sid == (uint64_t)-1) { // if still fail to find a task, sleep and wait for the signal - if (w->type == 2) break; - pthread_mutex_lock(&w->lock); - w->type = 0; // wait - while (w->type == 0) pthread_cond_wait(&w->cv, &w->lock); - pthread_mutex_unlock(&w->lock); - } else do_task(w->t, sid); + int k = -1; + if (dq_deq(w->q, 1, &k) < 0) k = steal_work(w->f); + if (k >= 0) w->f->func(w->f->shared, k, (uint8_t*)w->f->items + w->f->size * k); + else if (w->f->finished) break; } return 0; } -static void *master(void *data) +/** + * Parallelize a simple "for" loop + * + * @param n_threads total number of threads + * @param func function in the form of func(void *shared, int item_id, void *item); + * @param shared shared data used by $func + * @param n_items number of items to process + * @param item_size size of each item + * @param items item + * + * This function parallelizes such a "for" loop: + * + * shared_type *shared; + * item_type items[n_items]; + * for (int i = 0; i < n_items; ++i) + * func(shared, &items[i]); + * + * with: + * + * ht_for(n_threads, func, shared, n_items, sizeof(item_type), items); + */ +void kt_for(int n_threads, int (*func)(void*,int,void*), void *shared, int n_items, int item_size, void *items) { - kthread_t *t = (kthread_t*)data; - int i, n_tasks = 0, to_sync = 0; - for (i = 0; i < t->n_threads; ++i) - pthread_create(&t->w[i].tid, 0, slave, &t->w[i]); - while (!to_sync) { - int next_tasks, tid, iid; - uint64_t sid; - pthread_mutex_lock(&t->lock); - while (n_tasks == t->n_tasks && !t->to_sync) - pthread_cond_wait(&t->cv, &t->lock); - next_tasks = t->n_tasks, to_sync = t->to_sync; - pthread_mutex_unlock(&t->lock); - for (tid = n_tasks; tid < next_tasks; ++tid) { - kt_task_t *s = &t->tasks[tid]; - for (iid = 0; iid < s->n_items; ++iid) { - int min, min_i; - for (i = 0, min = 1<n_threads; ++i) - if (min > dq_size(t->w[i].q)) min = dq_size(t->w[i].q), min_i = i; - sid = (uint64_t)tid<<32 | iid; - if (min < 1<w[min_i]; - dq_enq(w->q, 0, &sid); - if (w->type == 0) { - pthread_mutex_lock(&w->lock); - w->type = 1; - pthread_cond_signal(&w->cv); - pthread_mutex_unlock(&w->lock); - } - } else do_task(t, sid); - } - } - while ((sid = steal_task(t)) != (uint64_t)-1) do_task(t, sid); - n_tasks = next_tasks; + kt_for_t *f; + pthread_t *tid; + int i, k, dq_bits = HT_DQ_BITS; + + f = (kt_for_t*)calloc(1, sizeof(kt_for_t)); + f->n = n_threads - 1, f->size = item_size; + f->shared = shared, f->items = items; + f->func = func; + + f->w = (ktf_worker_t*)calloc(f->n, sizeof(ktf_worker_t)); + for (i = 0; i < f->n; ++i) + f->w[i].f = f, f->w[i].i = i, f->w[i].q = dq_init(dq_bits); + + tid = (pthread_t*)calloc(f->n, sizeof(pthread_t)); + for (i = 0; i < f->n; ++i) pthread_create(&tid[i], 0, ktf_worker, &f->w[i]); + + for (k = 0; k < n_items; ++k) { + int min, min_i; + for (i = 0, min = 1<n; ++i) // find the worker with the lowest load + if (min > dq_size(f->w[i].q)) min = dq_size(f->w[i].q), min_i = i; + if (min < 1<w[min_i].q, 0, &k); + else f->func(shared, k, (uint8_t*)f->items + f->size * k); } - for (i = 0; i < t->n_threads; ++i) { - pthread_mutex_lock(&t->w[i].lock); - t->w[i].type = 2; - pthread_cond_signal(&t->w[i].cv); - pthread_mutex_unlock(&t->w[i].lock); - } - for (i = 0; i < t->n_threads; ++i) pthread_join(t->w[i].tid, 0); - return 0; -} + f->finished = 1; + while ((k = steal_work(f)) >= 0) func(shared, k, (uint8_t*)f->items + f->size * k); // help the unfinished workers -kthread_t *kt_init(int n_threads) -{ - kthread_t *t; - int i; - t = calloc(1, sizeof(kthread_t)); - t->n_threads = n_threads - 1; - t->w = calloc(t->n_threads, sizeof(kt_worker_t)); - pthread_mutex_init(&t->lock, 0); - pthread_cond_init(&t->cv, 0); - for (i = 0; i < t->n_threads; ++i) { - t->w[i].q = dq_init(KT_DQ_BITS); - t->w[i].t = t; - pthread_mutex_init(&t->w[i].lock, 0); - pthread_cond_init(&t->w[i].cv, 0); - } - pthread_create(&t->self, 0, master, t); - return t; -} - -void kt_sync(kthread_t *t) -{ - int i; - pthread_mutex_lock(&t->lock); - t->to_sync = 1; - pthread_cond_signal(&t->cv); - pthread_mutex_unlock(&t->lock); - pthread_join(t->self, 0); - - pthread_cond_destroy(&t->cv); - pthread_mutex_destroy(&t->lock); - for (i = 0; i < t->n_threads; ++i) { - pthread_cond_destroy(&t->w[i].cv); - pthread_mutex_destroy(&t->w[i].lock); - dq_destroy(t->w[i].q); - } - free(t->tasks); free(t->w); free(t); -} - -void kt_spawn(kthread_t *t, int (*func)(void*,int,void*), void *shared, int n_items, int item_size, void *items) -{ - kt_task_t *p; - pthread_mutex_lock(&t->lock); - if (t->n_tasks == t->max_tasks) { - t->max_tasks = t->max_tasks? t->max_tasks<<1 : 2; - t->tasks = realloc(t->tasks, t->max_tasks * sizeof(kt_task_t)); - } - p = &t->tasks[t->n_tasks++]; - p->func = func, p->shared = shared; - p->n_items = n_items, p->item_size = item_size, p->items = items, p->n_finished = 0; - pthread_cond_signal(&t->cv); - pthread_mutex_unlock(&t->lock); + for (i = 0; i < f->n; ++i) pthread_join(tid[i], 0); + for (i = 0; i < f->n; ++i) dq_destroy(f->w[i].q); + free(tid); free(f->w); free(f); } diff --git a/kthread.h b/kthread.h deleted file mode 100644 index 83f4249..0000000 --- a/kthread.h +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef KTHREAD_H -#define KTHREAD_H - -struct kthread_t; -typedef struct kthread_t kthread_t; - -#ifdef __cplusplus -extern "C" { -#endif - -kthread_t *kt_init(int n_threads); -void kt_spawn(kthread_t *t, int (*func)(void*,int,void*), void *shared, int n_items, int item_size, void *items); -void kt_sync(kthread_t *t); - -#ifdef __cplusplus -} -#endif - -static inline void kt_for(int n_threads, int (*func)(void*,int,void*), void *shared, int n_items, int item_size, void *items) -{ - kthread_t *t; - t = kt_init(n_threads); - kt_spawn(t, func, shared, n_items, item_size, items); - kt_sync(t); -} - -#endif -- 2.47.3