From: Heng Li Date: Thu, 10 Oct 2013 14:37:14 +0000 (-0400) Subject: simplified the kt_for() API X-Git-Url: http://www.kaiwu.me/postgresql/commit/?a=commitdiff_plain;h=85febb4cd7609cfdae8cc216c7f0f3036862e63c;p=klib.git simplified the kt_for() API * the local array can fit into the global data * sometimes caller needs to know thread id to maintain buffer --- diff --git a/kthread.c b/kthread.c index 9ebdc18..bc94a70 100644 --- a/kthread.c +++ b/kthread.c @@ -67,10 +67,9 @@ int dq_deq(deque_t *q, int is_back, dqval_t *v) // get from the queue struct ktf_worker_t; typedef struct { - int n, size; // n: number of workers; size: size of each items element - void *shared; - void *items; - int (*func)(void*,int,void*); + int n; // n: number of workers + void *data; + void (*func)(void*,int,int); struct ktf_worker_t *w; int finished; } kt_for_t; @@ -97,7 +96,7 @@ static void *ktf_worker(void *data) for (;;) { int k = -1; if (dq_deq(w->q, 1, &k) < 0) k = steal_work(w->f); - if (k >= 0) w->f->func(w->f->shared, k, (uint8_t*)w->f->items + w->f->size * k); + if (k >= 0) w->f->func(w->f->data, k, w->i + 1); else if (w->f->finished) break; } return 0; @@ -107,32 +106,29 @@ static void *ktf_worker(void *data) * Parallelize a simple "for" loop * * @param n_threads total number of threads - * @param func function in the form of func(void *shared, int item_id, void *item); - * @param shared shared data used by $func + * @param func function in the form of func(void *data, int item_id, void *item); + * @param data data used by $func * @param n_items number of items to process - * @param item_size size of each item - * @param items item * * This function parallelizes such a "for" loop: * - * shared_type *shared; - * item_type items[n_items]; + * data_type *data; * for (int i = 0; i < n_items; ++i) - * func(shared, &items[i]); + * func(data, &items[i], 0); * * with: * - * ht_for(n_threads, func, shared, n_items, sizeof(item_type), items); + * ht_for(n_threads, func, data, n_items); */ -void kt_for(int n_threads, int (*func)(void*,int,void*), void *shared, int n_items, int item_size, void *items) +void kt_for(int n_threads, void (*func)(void*,int,int), void *data, int n_items) { kt_for_t *f; pthread_t *tid; int i, k, dq_bits = HT_DQ_BITS; f = (kt_for_t*)calloc(1, sizeof(kt_for_t)); - f->n = n_threads - 1, f->size = item_size; - f->shared = shared, f->items = items; + f->n = n_threads - 1; + f->data = data; f->func = func; f->w = (ktf_worker_t*)calloc(f->n, sizeof(ktf_worker_t)); @@ -147,10 +143,10 @@ void kt_for(int n_threads, int (*func)(void*,int,void*), void *shared, int n_ite for (i = 0, min = 1<n; ++i) // find the worker with the lowest load if (min > dq_size(f->w[i].q)) min = dq_size(f->w[i].q), min_i = i; if (min < 1<w[min_i].q, 0, &k); - else f->func(shared, k, (uint8_t*)f->items + f->size * k); + else f->func(data, k, 0); } f->finished = 1; - while ((k = steal_work(f)) >= 0) func(shared, k, (uint8_t*)f->items + f->size * k); // help the unfinished workers + while ((k = steal_work(f)) >= 0) func(data, k, 0); // help the unfinished workers for (i = 0; i < f->n; ++i) pthread_join(tid[i], 0); for (i = 0; i < f->n; ++i) dq_destroy(f->w[i].q);