]> git.kaiwu.me - klib.git/commitdiff
documentation; better var names
authorHeng Li <lh3@me.com>
Wed, 9 Oct 2013 15:47:29 +0000 (11:47 -0400)
committerHeng Li <lh3@me.com>
Wed, 9 Oct 2013 15:47:29 +0000 (11:47 -0400)
kthread.c

index d2c8cad738cf9ef1bd4756f5909158ca9191bea2..52288d91eb19679648aa7b4b3e8b278868958cc2 100644 (file)
--- a/kthread.c
+++ b/kthread.c
@@ -67,9 +67,9 @@ int dq_deq(deque_t *q, int is_back, dqval_t *v) // get from the queue
 struct ktf_worker_t;
 
 typedef struct {
-       int n, size; // n: number of workers; size: size of each local element
-       void *global;
-       void *local;
+       int n, size; // n: number of workers; size: size of each items element
+       void *shared;
+       void *items;
        int (*func)(void*,void*);
        struct ktf_worker_t *w;
        int finished;
@@ -97,21 +97,42 @@ static void *ktf_worker(void *data)
        for (;;) {
                int k = -1;
                if (dq_deq(w->q, 1, &k) < 0) k = steal_work(w->f);
-               if (k >= 0) w->f->func(w->f->global, (uint8_t*)w->f->local + w->f->size * k);
+               if (k >= 0) w->f->func(w->f->shared, (uint8_t*)w->f->items + w->f->size * k);
                else if (w->f->finished) break;
        }
        return 0;
 }
 
-void kt_for(int n_threads, int (*func)(void*,void*), void *global, int m, int size, void *local)
+/**
+ * Parallelize a simple "for" loop
+ *
+ * @param n_threads    total number of threads
+ * @param func         processing function
+ * @param shared       shared data used by $func
+ * @param n_items      number of items to process
+ * @param item_size    size of each item
+ * @param items        item
+ *
+ * This function parallelizes such a "for" loop:
+ *
+ *   shared_type *shared;
+ *   item_type items[n_items];
+ *   for (int i = 0; i < n_items; ++i)
+ *     func(shared, &items[i]);
+ *
+ * with:
+ *
+ *   ht_for(n_threads, func, shared, n_items, sizeof(item_type), items);
+ */
+void kt_for(int n_threads, int (*func)(void*,void*), void *shared, int n_items, int item_size, void *items)
 {
        kt_for_t *f;
        pthread_t *tid;
        int i, k, dq_bits = HT_DQ_BITS;
 
        f = (kt_for_t*)calloc(1, sizeof(kt_for_t));
-       f->n = n_threads - 1, f->size = size;
-       f->global = global, f->local = local;
+       f->n = n_threads - 1, f->size = item_size;
+       f->shared = shared, f->items = items;
        f->func = func;
 
        f->w = (ktf_worker_t*)calloc(f->n, sizeof(ktf_worker_t));
@@ -124,15 +145,15 @@ void kt_for(int n_threads, int (*func)(void*,void*), void *global, int m, int si
        tid = (pthread_t*)calloc(f->n, sizeof(pthread_t));
        for (i = 0; i < f->n; ++i) pthread_create(&tid[i], 0, ktf_worker, &f->w[i]);
 
-       for (k = 0; k < m; ++k) {
+       for (k = 0; k < n_items; ++k) {
                int min, min_i;
                for (i = 0, min = 1<<dq_bits, min_i = -1; i < f->n; ++i) // find the worker with the lowest load
                        if (min > dq_size(f->w[i].q)) min = dq_size(f->w[i].q), min_i = i;
                if (min < 1<<dq_bits) dq_enq(f->w[min_i].q, 0, &k);
-               else func(global, (uint8_t*)f->local + f->size * k);
+               else func(shared, (uint8_t*)f->items + f->size * k);
        }
        f->finished = 1;
-       while ((k = steal_work(f)) >= 0) func(global, (uint8_t*)f->local + f->size * k); // help the unfinished workers
+       while ((k = steal_work(f)) >= 0) func(shared, (uint8_t*)f->items + f->size * k); // help the unfinished workers
 
        for (i = 0; i < f->n; ++i) pthread_join(tid[i], 0);
        for (i = 0; i < f->n; ++i) dq_destroy(f->w[i].q);