} kto_worker_t;
typedef struct kt_forpool_t {
- int n_threads, n_done;
+ int n_threads, n_pending;
long n;
pthread_t *tid;
kto_worker_t *w;
void (*func)(void*,long,int);
void *data;
- pthread_mutex_t mu_m, mu_s;
+ pthread_mutex_t mutex;
pthread_cond_t cv_m, cv_s;
} kt_forpool_t;
for (;;) {
long i;
int action;
- pthread_mutex_lock(&fp->mu_s);
- while (w->action == 0) pthread_cond_wait(&fp->cv_s, &fp->mu_s);
- action = w->action;
- pthread_mutex_unlock(&fp->mu_s);
- if (action > 0) {
- for (;;) { // process jobs allocated to this worker
- i = __sync_fetch_and_add(&w->i, fp->n_threads);
- if (i >= fp->n) break;
- fp->func(fp->data, i, w - fp->w);
- }
- while ((i = kt_fp_steal_work(fp)) >= 0) // steal jobs allocated to other workers
- fp->func(fp->data, i, w - fp->w);
- }
- w->action = 0;
- if (__sync_add_and_fetch(&fp->n_done, 1) == fp->n_threads)
+ pthread_mutex_lock(&fp->mutex);
+ if (--fp->n_pending == 0)
pthread_cond_signal(&fp->cv_m);
+ w->action = 0;
+ while (w->action == 0) pthread_cond_wait(&fp->cv_s, &fp->mutex);
+ action = w->action;
+ pthread_mutex_unlock(&fp->mutex);
if (action < 0) break;
+ for (;;) { // process jobs allocated to this worker
+ i = __sync_fetch_and_add(&w->i, fp->n_threads);
+ if (i >= fp->n) break;
+ fp->func(fp->data, i, w - fp->w);
+ }
+ while ((i = kt_fp_steal_work(fp)) >= 0) // steal jobs allocated to other workers
+ fp->func(fp->data, i, w - fp->w);
}
pthread_exit(0);
}
kt_forpool_t *fp;
int i;
fp = (kt_forpool_t*)calloc(1, sizeof(kt_forpool_t));
- fp->n_threads = n_threads;
+ fp->n_threads = fp->n_pending = n_threads;
fp->tid = (pthread_t*)calloc(fp->n_threads, sizeof(pthread_t));
fp->w = (kto_worker_t*)calloc(fp->n_threads, sizeof(kto_worker_t));
for (i = 0; i < fp->n_threads; ++i) fp->w[i].t = fp;
- pthread_mutex_init(&fp->mu_m, 0); pthread_cond_init(&fp->cv_m, 0);
- pthread_mutex_init(&fp->mu_s, 0); pthread_cond_init(&fp->cv_s, 0);
+ pthread_mutex_init(&fp->mutex, 0);
+ pthread_cond_init(&fp->cv_m, 0);
+ pthread_cond_init(&fp->cv_s, 0);
for (i = 0; i < fp->n_threads; ++i) pthread_create(&fp->tid[i], 0, kt_fp_worker, &fp->w[i]);
+ pthread_mutex_lock(&fp->mutex);
+ while (fp->n_pending) pthread_cond_wait(&fp->cv_m, &fp->mutex);
+ pthread_mutex_unlock(&fp->mutex);
return fp;
}
{
kt_forpool_t *fp = (kt_forpool_t*)_fp;
int i;
- fp->n_done = 0;
for (i = 0; i < fp->n_threads; ++i) fp->w[i].action = -1;
pthread_cond_broadcast(&fp->cv_s);
- pthread_mutex_lock(&fp->mu_m);
- pthread_cond_wait(&fp->cv_m, &fp->mu_m);
- pthread_mutex_unlock(&fp->mu_m);
for (i = 0; i < fp->n_threads; ++i) pthread_join(fp->tid[i], 0);
- pthread_cond_destroy(&fp->cv_s); pthread_mutex_destroy(&fp->mu_s);
- pthread_cond_destroy(&fp->cv_m); pthread_mutex_destroy(&fp->mu_m);
+ pthread_cond_destroy(&fp->cv_s);
+ pthread_cond_destroy(&fp->cv_m);
+ pthread_mutex_destroy(&fp->mutex);
free(fp->w); free(fp->tid); free(fp);
}
{
kt_forpool_t *fp = (kt_forpool_t*)_fp;
int i;
- fp->n = n, fp->func = func, fp->data = data, fp->n_done = 0;
+ fp->n = n, fp->func = func, fp->data = data, fp->n_pending = fp->n_threads;
for (i = 0; i < fp->n_threads; ++i) fp->w[i].i = i, fp->w[i].action = 1;
+ pthread_mutex_lock(&fp->mutex);
pthread_cond_broadcast(&fp->cv_s);
- pthread_mutex_lock(&fp->mu_m);
- pthread_cond_wait(&fp->cv_m, &fp->mu_m);
- pthread_mutex_unlock(&fp->mu_m);
+ while (fp->n_pending) pthread_cond_wait(&fp->cv_m, &fp->mutex);
+ pthread_mutex_unlock(&fp->mutex);
}
/*****************