From 73d926f7d565725c9daf99be82f3d82796f8d620 Mon Sep 17 00:00:00 2001 From: Heng Li Date: Fri, 16 Dec 2016 17:44:51 -0500 Subject: [PATCH] forpool working --- kthread.c | 59 +++++++++++++++++++++++++++---------------------------- 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/kthread.c b/kthread.c index c0d9093..d83df88 100644 --- a/kthread.c +++ b/kthread.c @@ -77,13 +77,13 @@ typedef struct { } kto_worker_t; typedef struct kt_forpool_t { - int n_threads, n_done; + int n_threads, n_pending; long n; pthread_t *tid; kto_worker_t *w; void (*func)(void*,long,int); void *data; - pthread_mutex_t mu_m, mu_s; + pthread_mutex_t mutex; pthread_cond_t cv_m, cv_s; } kt_forpool_t; @@ -104,23 +104,21 @@ static void *kt_fp_worker(void *data) for (;;) { long i; int action; - pthread_mutex_lock(&fp->mu_s); - while (w->action == 0) pthread_cond_wait(&fp->cv_s, &fp->mu_s); - action = w->action; - pthread_mutex_unlock(&fp->mu_s); - if (action > 0) { - for (;;) { // process jobs allocated to this worker - i = __sync_fetch_and_add(&w->i, fp->n_threads); - if (i >= fp->n) break; - fp->func(fp->data, i, w - fp->w); - } - while ((i = kt_fp_steal_work(fp)) >= 0) // steal jobs allocated to other workers - fp->func(fp->data, i, w - fp->w); - } - w->action = 0; - if (__sync_add_and_fetch(&fp->n_done, 1) == fp->n_threads) + pthread_mutex_lock(&fp->mutex); + if (--fp->n_pending == 0) pthread_cond_signal(&fp->cv_m); + w->action = 0; + while (w->action == 0) pthread_cond_wait(&fp->cv_s, &fp->mutex); + action = w->action; + pthread_mutex_unlock(&fp->mutex); if (action < 0) break; + for (;;) { // process jobs allocated to this worker + i = __sync_fetch_and_add(&w->i, fp->n_threads); + if (i >= fp->n) break; + fp->func(fp->data, i, w - fp->w); + } + while ((i = kt_fp_steal_work(fp)) >= 0) // steal jobs allocated to other workers + fp->func(fp->data, i, w - fp->w); } pthread_exit(0); } @@ -130,13 +128,17 @@ void *kt_forpool_init(int n_threads) kt_forpool_t *fp; int i; fp = (kt_forpool_t*)calloc(1, sizeof(kt_forpool_t)); - fp->n_threads = n_threads; + fp->n_threads = fp->n_pending = n_threads; fp->tid = (pthread_t*)calloc(fp->n_threads, sizeof(pthread_t)); fp->w = (kto_worker_t*)calloc(fp->n_threads, sizeof(kto_worker_t)); for (i = 0; i < fp->n_threads; ++i) fp->w[i].t = fp; - pthread_mutex_init(&fp->mu_m, 0); pthread_cond_init(&fp->cv_m, 0); - pthread_mutex_init(&fp->mu_s, 0); pthread_cond_init(&fp->cv_s, 0); + pthread_mutex_init(&fp->mutex, 0); + pthread_cond_init(&fp->cv_m, 0); + pthread_cond_init(&fp->cv_s, 0); for (i = 0; i < fp->n_threads; ++i) pthread_create(&fp->tid[i], 0, kt_fp_worker, &fp->w[i]); + pthread_mutex_lock(&fp->mutex); + while (fp->n_pending) pthread_cond_wait(&fp->cv_m, &fp->mutex); + pthread_mutex_unlock(&fp->mutex); return fp; } @@ -144,15 +146,12 @@ void kt_forpool_destroy(void *_fp) { kt_forpool_t *fp = (kt_forpool_t*)_fp; int i; - fp->n_done = 0; for (i = 0; i < fp->n_threads; ++i) fp->w[i].action = -1; pthread_cond_broadcast(&fp->cv_s); - pthread_mutex_lock(&fp->mu_m); - pthread_cond_wait(&fp->cv_m, &fp->mu_m); - pthread_mutex_unlock(&fp->mu_m); for (i = 0; i < fp->n_threads; ++i) pthread_join(fp->tid[i], 0); - pthread_cond_destroy(&fp->cv_s); pthread_mutex_destroy(&fp->mu_s); - pthread_cond_destroy(&fp->cv_m); pthread_mutex_destroy(&fp->mu_m); + pthread_cond_destroy(&fp->cv_s); + pthread_cond_destroy(&fp->cv_m); + pthread_mutex_destroy(&fp->mutex); free(fp->w); free(fp->tid); free(fp); } @@ -160,12 +159,12 @@ void kt_forpool(void *_fp, void (*func)(void*,long,int), void *data, long n) { kt_forpool_t *fp = (kt_forpool_t*)_fp; int i; - fp->n = n, fp->func = func, fp->data = data, fp->n_done = 0; + fp->n = n, fp->func = func, fp->data = data, fp->n_pending = fp->n_threads; for (i = 0; i < fp->n_threads; ++i) fp->w[i].i = i, fp->w[i].action = 1; + pthread_mutex_lock(&fp->mutex); pthread_cond_broadcast(&fp->cv_s); - pthread_mutex_lock(&fp->mu_m); - pthread_cond_wait(&fp->cv_m, &fp->mu_m); - pthread_mutex_unlock(&fp->mu_m); + while (fp->n_pending) pthread_cond_wait(&fp->cv_m, &fp->mutex); + pthread_mutex_unlock(&fp->mutex); } /***************** -- 2.47.3