struct ktf_worker_t;
typedef struct {
- int n, size; // n: number of workers; size: size of each local element
- void *global;
- void *local;
+ int n, size; // n: number of workers; size: size of each items element
+ void *shared;
+ void *items;
int (*func)(void*,void*);
struct ktf_worker_t *w;
int finished;
for (;;) {
int k = -1;
if (dq_deq(w->q, 1, &k) < 0) k = steal_work(w->f);
- if (k >= 0) w->f->func(w->f->global, (uint8_t*)w->f->local + w->f->size * k);
+ if (k >= 0) w->f->func(w->f->shared, (uint8_t*)w->f->items + w->f->size * k);
else if (w->f->finished) break;
}
return 0;
}
-void kt_for(int n_threads, int (*func)(void*,void*), void *global, int m, int size, void *local)
+/**
+ * Parallelize a simple "for" loop
+ *
+ * @param n_threads total number of threads
+ * @param func processing function
+ * @param shared shared data used by $func
+ * @param n_items number of items to process
+ * @param item_size size of each item
+ * @param items item
+ *
+ * This function parallelizes such a "for" loop:
+ *
+ * shared_type *shared;
+ * item_type items[n_items];
+ * for (int i = 0; i < n_items; ++i)
+ * func(shared, &items[i]);
+ *
+ * with:
+ *
+ * ht_for(n_threads, func, shared, n_items, sizeof(item_type), items);
+ */
+void kt_for(int n_threads, int (*func)(void*,void*), void *shared, int n_items, int item_size, void *items)
{
kt_for_t *f;
pthread_t *tid;
int i, k, dq_bits = HT_DQ_BITS;
f = (kt_for_t*)calloc(1, sizeof(kt_for_t));
- f->n = n_threads - 1, f->size = size;
- f->global = global, f->local = local;
+ f->n = n_threads - 1, f->size = item_size;
+ f->shared = shared, f->items = items;
f->func = func;
f->w = (ktf_worker_t*)calloc(f->n, sizeof(ktf_worker_t));
tid = (pthread_t*)calloc(f->n, sizeof(pthread_t));
for (i = 0; i < f->n; ++i) pthread_create(&tid[i], 0, ktf_worker, &f->w[i]);
- for (k = 0; k < m; ++k) {
+ for (k = 0; k < n_items; ++k) {
int min, min_i;
for (i = 0, min = 1<<dq_bits, min_i = -1; i < f->n; ++i) // find the worker with the lowest load
if (min > dq_size(f->w[i].q)) min = dq_size(f->w[i].q), min_i = i;
if (min < 1<<dq_bits) dq_enq(f->w[min_i].q, 0, &k);
- else func(global, (uint8_t*)f->local + f->size * k);
+ else func(shared, (uint8_t*)f->items + f->size * k);
}
f->finished = 1;
- while ((k = steal_work(f)) >= 0) func(global, (uint8_t*)f->local + f->size * k); // help the unfinished workers
+ while ((k = steal_work(f)) >= 0) func(shared, (uint8_t*)f->items + f->size * k); // help the unfinished workers
for (i = 0; i < f->n; ++i) pthread_join(tid[i], 0);
for (i = 0; i < f->n; ++i) dq_destroy(f->w[i].q);