From 0978a8b055f0496000281d8d77592c25bf406b2c Mon Sep 17 00:00:00 2001 From: Heng Li Date: Wed, 9 Oct 2013 11:47:29 -0400 Subject: [PATCH] documentation; better var names --- kthread.c | 41 +++++++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/kthread.c b/kthread.c index d2c8cad..52288d9 100644 --- a/kthread.c +++ b/kthread.c @@ -67,9 +67,9 @@ int dq_deq(deque_t *q, int is_back, dqval_t *v) // get from the queue struct ktf_worker_t; typedef struct { - int n, size; // n: number of workers; size: size of each local element - void *global; - void *local; + int n, size; // n: number of workers; size: size of each items element + void *shared; + void *items; int (*func)(void*,void*); struct ktf_worker_t *w; int finished; @@ -97,21 +97,42 @@ static void *ktf_worker(void *data) for (;;) { int k = -1; if (dq_deq(w->q, 1, &k) < 0) k = steal_work(w->f); - if (k >= 0) w->f->func(w->f->global, (uint8_t*)w->f->local + w->f->size * k); + if (k >= 0) w->f->func(w->f->shared, (uint8_t*)w->f->items + w->f->size * k); else if (w->f->finished) break; } return 0; } -void kt_for(int n_threads, int (*func)(void*,void*), void *global, int m, int size, void *local) +/** + * Parallelize a simple "for" loop + * + * @param n_threads total number of threads + * @param func processing function + * @param shared shared data used by $func + * @param n_items number of items to process + * @param item_size size of each item + * @param items item + * + * This function parallelizes such a "for" loop: + * + * shared_type *shared; + * item_type items[n_items]; + * for (int i = 0; i < n_items; ++i) + * func(shared, &items[i]); + * + * with: + * + * ht_for(n_threads, func, shared, n_items, sizeof(item_type), items); + */ +void kt_for(int n_threads, int (*func)(void*,void*), void *shared, int n_items, int item_size, void *items) { kt_for_t *f; pthread_t *tid; int i, k, dq_bits = HT_DQ_BITS; f = (kt_for_t*)calloc(1, sizeof(kt_for_t)); - f->n = n_threads - 1, f->size = size; - f->global = global, f->local = local; + f->n = n_threads - 1, f->size = item_size; + f->shared = shared, f->items = items; f->func = func; f->w = (ktf_worker_t*)calloc(f->n, sizeof(ktf_worker_t)); @@ -124,15 +145,15 @@ void kt_for(int n_threads, int (*func)(void*,void*), void *global, int m, int si tid = (pthread_t*)calloc(f->n, sizeof(pthread_t)); for (i = 0; i < f->n; ++i) pthread_create(&tid[i], 0, ktf_worker, &f->w[i]); - for (k = 0; k < m; ++k) { + for (k = 0; k < n_items; ++k) { int min, min_i; for (i = 0, min = 1<n; ++i) // find the worker with the lowest load if (min > dq_size(f->w[i].q)) min = dq_size(f->w[i].q), min_i = i; if (min < 1<w[min_i].q, 0, &k); - else func(global, (uint8_t*)f->local + f->size * k); + else func(shared, (uint8_t*)f->items + f->size * k); } f->finished = 1; - while ((k = steal_work(f)) >= 0) func(global, (uint8_t*)f->local + f->size * k); // help the unfinished workers + while ((k = steal_work(f)) >= 0) func(shared, (uint8_t*)f->items + f->size * k); // help the unfinished workers for (i = 0; i < f->n; ++i) pthread_join(tid[i], 0); for (i = 0; i < f->n; ++i) dq_destroy(f->w[i].q); -- 2.47.3