]> git.kaiwu.me - klib.git/commitdiff
forpool working
authorHeng Li <lh3@me.com>
Fri, 16 Dec 2016 22:44:51 +0000 (17:44 -0500)
committerHeng Li <lh3@me.com>
Fri, 16 Dec 2016 22:44:51 +0000 (17:44 -0500)
kthread.c

index c0d909330587c2e10d5271646e3b8909e9a1c018..d83df88c76a27bb9c0ee44cce11cd92cddfb1391 100644 (file)
--- a/kthread.c
+++ b/kthread.c
@@ -77,13 +77,13 @@ typedef struct {
 } kto_worker_t;
 
 typedef struct kt_forpool_t {
-       int n_threads, n_done;
+       int n_threads, n_pending;
        long n;
        pthread_t *tid;
        kto_worker_t *w;
        void (*func)(void*,long,int);
        void *data;
-       pthread_mutex_t mu_m, mu_s;
+       pthread_mutex_t mutex;
        pthread_cond_t cv_m, cv_s;
 } kt_forpool_t;
 
@@ -104,23 +104,21 @@ static void *kt_fp_worker(void *data)
        for (;;) {
                long i;
                int action;
-               pthread_mutex_lock(&fp->mu_s);
-               while (w->action == 0) pthread_cond_wait(&fp->cv_s, &fp->mu_s);
-               action = w->action;
-               pthread_mutex_unlock(&fp->mu_s);
-               if (action > 0) {
-                       for (;;) { // process jobs allocated to this worker
-                               i = __sync_fetch_and_add(&w->i, fp->n_threads);
-                               if (i >= fp->n) break;
-                               fp->func(fp->data, i, w - fp->w);
-                       }
-                       while ((i = kt_fp_steal_work(fp)) >= 0) // steal jobs allocated to other workers
-                               fp->func(fp->data, i, w - fp->w);
-               }
-               w->action = 0;
-               if (__sync_add_and_fetch(&fp->n_done, 1) == fp->n_threads)
+               pthread_mutex_lock(&fp->mutex);
+               if (--fp->n_pending == 0)
                        pthread_cond_signal(&fp->cv_m);
+               w->action = 0;
+               while (w->action == 0) pthread_cond_wait(&fp->cv_s, &fp->mutex);
+               action = w->action;
+               pthread_mutex_unlock(&fp->mutex);
                if (action < 0) break;
+               for (;;) { // process jobs allocated to this worker
+                       i = __sync_fetch_and_add(&w->i, fp->n_threads);
+                       if (i >= fp->n) break;
+                       fp->func(fp->data, i, w - fp->w);
+               }
+               while ((i = kt_fp_steal_work(fp)) >= 0) // steal jobs allocated to other workers
+                       fp->func(fp->data, i, w - fp->w);
        }
        pthread_exit(0);
 }
@@ -130,13 +128,17 @@ void *kt_forpool_init(int n_threads)
        kt_forpool_t *fp;
        int i;
        fp = (kt_forpool_t*)calloc(1, sizeof(kt_forpool_t));
-       fp->n_threads = n_threads;
+       fp->n_threads = fp->n_pending = n_threads;
        fp->tid = (pthread_t*)calloc(fp->n_threads, sizeof(pthread_t));
        fp->w = (kto_worker_t*)calloc(fp->n_threads, sizeof(kto_worker_t));
        for (i = 0; i < fp->n_threads; ++i) fp->w[i].t = fp;
-       pthread_mutex_init(&fp->mu_m, 0); pthread_cond_init(&fp->cv_m, 0);
-       pthread_mutex_init(&fp->mu_s, 0); pthread_cond_init(&fp->cv_s, 0);
+       pthread_mutex_init(&fp->mutex, 0);
+       pthread_cond_init(&fp->cv_m, 0);
+       pthread_cond_init(&fp->cv_s, 0);
        for (i = 0; i < fp->n_threads; ++i) pthread_create(&fp->tid[i], 0, kt_fp_worker, &fp->w[i]);
+       pthread_mutex_lock(&fp->mutex);
+       while (fp->n_pending) pthread_cond_wait(&fp->cv_m, &fp->mutex);
+       pthread_mutex_unlock(&fp->mutex);
        return fp;
 }
 
@@ -144,15 +146,12 @@ void kt_forpool_destroy(void *_fp)
 {
        kt_forpool_t *fp = (kt_forpool_t*)_fp;
        int i;
-       fp->n_done = 0;
        for (i = 0; i < fp->n_threads; ++i) fp->w[i].action = -1;
        pthread_cond_broadcast(&fp->cv_s);
-       pthread_mutex_lock(&fp->mu_m);
-       pthread_cond_wait(&fp->cv_m, &fp->mu_m);
-       pthread_mutex_unlock(&fp->mu_m);
        for (i = 0; i < fp->n_threads; ++i) pthread_join(fp->tid[i], 0);
-       pthread_cond_destroy(&fp->cv_s); pthread_mutex_destroy(&fp->mu_s);
-       pthread_cond_destroy(&fp->cv_m); pthread_mutex_destroy(&fp->mu_m);
+       pthread_cond_destroy(&fp->cv_s);
+       pthread_cond_destroy(&fp->cv_m);
+       pthread_mutex_destroy(&fp->mutex);
        free(fp->w); free(fp->tid); free(fp);
 }
 
@@ -160,12 +159,12 @@ void kt_forpool(void *_fp, void (*func)(void*,long,int), void *data, long n)
 {
        kt_forpool_t *fp = (kt_forpool_t*)_fp;
        int i;
-       fp->n = n, fp->func = func, fp->data = data, fp->n_done = 0;
+       fp->n = n, fp->func = func, fp->data = data, fp->n_pending = fp->n_threads;
        for (i = 0; i < fp->n_threads; ++i) fp->w[i].i = i, fp->w[i].action = 1;
+       pthread_mutex_lock(&fp->mutex);
        pthread_cond_broadcast(&fp->cv_s);
-       pthread_mutex_lock(&fp->mu_m);
-       pthread_cond_wait(&fp->cv_m, &fp->mu_m);
-       pthread_mutex_unlock(&fp->mu_m);
+       while (fp->n_pending) pthread_cond_wait(&fp->cv_m, &fp->mutex);
+       pthread_mutex_unlock(&fp->mutex);
 }
 
 /*****************