From c3fdb86c68debebbef15f0443f6eaaa15cf51f35 Mon Sep 17 00:00:00 2001 From: Heng Li Date: Sun, 11 Jan 2015 16:24:54 -0500 Subject: [PATCH] kt_pipeline() example --- test/Makefile | 7 ++-- test/kthread_test2.c | 78 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 2 deletions(-) create mode 100644 test/kthread_test2.c diff --git a/test/Makefile b/test/Makefile index a392c8e..203308d 100644 --- a/test/Makefile +++ b/test/Makefile @@ -4,12 +4,12 @@ CFLAGS=-g -Wall -O2 -I.. CXXFLAGS=$(CFLAGS) PROGS=kbtree_test khash_keith khash_keith2 khash_test klist_test kseq_test kseq_bench \ kseq_bench2 ksort_test ksort_test-stl kvec_test kmin_test kstring_bench kstring_bench2 kstring_test \ - kthread_test + kthread_test kthread_test2 all:$(PROGS) clean: - rm -fr $(PROGS) *.dSYM a.out + rm -fr $(PROGS) *.dSYM a.out *.o kbtree_test:kbtree_test.c ../kbtree.h $(CC) $(CFLAGS) -o $@ kbtree_test.c @@ -58,3 +58,6 @@ kstring_test:kstring_test.c ../kstring.h ../kstring.c kthread_test:kthread_test.c ../kthread.c $(CC) $(CFLAGS) -fopenmp -o $@ kthread_test.c ../kthread.c + +kthread_test2:kthread_test2.c ../kthread.c + $(CC) $(CFLAGS) -o $@ kthread_test2.c ../kthread.c diff --git a/test/kthread_test2.c b/test/kthread_test2.c new file mode 100644 index 0000000..e9750b8 --- /dev/null +++ b/test/kthread_test2.c @@ -0,0 +1,78 @@ +#include +#include +#include +#include + +void kt_for(int n_threads, void (*func)(void*,long,int), void *data, long n); +void kt_pipeline(int n_threads, void *(*func)(void*, int, void*), void *shared_data, int n_steps); + +typedef struct { + FILE *fp; + int max_lines, buf_size, n_threads; + char *buf; +} pipeline_t; + +typedef struct { + int n_lines; + char **lines; +} step_t; + +static void worker_for(void *_data, long i, int tid) +{ + step_t *step = (step_t*)_data; + char *s = step->lines[i]; + int t, l, j; + l = strlen(s) - 1; + assert(s[l] == '\n'); // not supporting long lines + for (j = 0; j < l>>1; ++j) + t = s[j], s[j] = s[l - 1 - j], s[l - 1 - j] = t; +} + +static void *worker_pipeline(void *shared, int step, void *in) +{ + pipeline_t *p = (pipeline_t*)shared; + if (step == 0) { + step_t *s; + s = calloc(1, sizeof(step_t)); + s->lines = calloc(p->max_lines, sizeof(char*)); + while (fgets(p->buf, p->buf_size, p->fp) != 0) { + s->lines[s->n_lines] = strdup(p->buf); + if (++s->n_lines >= p->max_lines) + break; + } + if (s->n_lines) return s; + } else if (step == 1) { + kt_for(p->n_threads, worker_for, in, ((step_t*)in)->n_lines); + return in; + } else if (step == 2) { + step_t *s = (step_t*)in; + while (s->n_lines > 0) { + fputs(s->lines[--s->n_lines], stdout); + free(s->lines[s->n_lines]); + } + free(s->lines); free(s); + } + return 0; +} + +int main(int argc, char *argv[]) +{ + pipeline_t pl; + if (argc == 1) { + fprintf(stderr, "Usage: reverse [n_threads]\n"); + return 1; + } + pl.fp = strcmp(argv[1], "-")? fopen(argv[1], "r") : stdin; + if (pl.fp == 0) { + fprintf(stderr, "ERROR: failed to open the input file.\n"); + return 1; + } + pl.max_lines = 4096; + pl.buf_size = 0x10000; + pl.n_threads = argc > 2? atoi(argv[2]) : 1; + pl.buf = calloc(pl.buf_size, 1); + kt_pipeline(3, worker_pipeline, &pl, 3); + free(pl.buf); + if (pl.fp != stdin) fclose(pl.fp); + return 0; +} -- 2.47.3