int n, size; // n: number of workers; size: size of each items element
void *shared;
void *items;
- int (*func)(void*,void*);
+ int (*func)(void*,int,void*);
struct ktf_worker_t *w;
int finished;
} kt_for_t;
for (;;) {
int k = -1;
if (dq_deq(w->q, 1, &k) < 0) k = steal_work(w->f);
- if (k >= 0) w->f->func(w->f->shared, (uint8_t*)w->f->items + w->f->size * k);
+ if (k >= 0) w->f->func(w->f->shared, k, (uint8_t*)w->f->items + w->f->size * k);
else if (w->f->finished) break;
}
return 0;
* Parallelize a simple "for" loop
*
* @param n_threads total number of threads
- * @param func processing function
+ * @param func function in the form of func(void *shared, int item_id, void *item);
* @param shared shared data used by $func
* @param n_items number of items to process
* @param item_size size of each item
*
* ht_for(n_threads, func, shared, n_items, sizeof(item_type), items);
*/
-void kt_for(int n_threads, int (*func)(void*,void*), void *shared, int n_items, int item_size, void *items)
+void kt_for(int n_threads, int (*func)(void*,int,void*), void *shared, int n_items, int item_size, void *items)
{
kt_for_t *f;
pthread_t *tid;
for (i = 0, min = 1<<dq_bits, min_i = -1; i < f->n; ++i) // find the worker with the lowest load
if (min > dq_size(f->w[i].q)) min = dq_size(f->w[i].q), min_i = i;
if (min < 1<<dq_bits) dq_enq(f->w[min_i].q, 0, &k);
- else func(shared, (uint8_t*)f->items + f->size * k);
+ else f->func(shared, k, (uint8_t*)f->items + f->size * k);
}
f->finished = 1;
- while ((k = steal_work(f)) >= 0) func(shared, (uint8_t*)f->items + f->size * k); // help the unfinished workers
+ while ((k = steal_work(f)) >= 0) func(shared, k, (uint8_t*)f->items + f->size * k); // help the unfinished workers
for (i = 0; i < f->n; ++i) pthread_join(tid[i], 0);
for (i = 0; i < f->n; ++i) dq_destroy(f->w[i].q);