]> git.kaiwu.me - klib.git/commitdiff
kt_pipeline() example
authorHeng Li <lh3@me.com>
Sun, 11 Jan 2015 21:24:54 +0000 (16:24 -0500)
committerHeng Li <lh3@me.com>
Sun, 11 Jan 2015 21:24:54 +0000 (16:24 -0500)
test/Makefile
test/kthread_test2.c [new file with mode: 0644]

index a392c8ed46dbe4f4ff187b2f2f0ceb0828fb561f..203308d08313c396b2888729ffc1357bf29b360b 100644 (file)
@@ -4,12 +4,12 @@ CFLAGS=-g -Wall -O2 -I..
 CXXFLAGS=$(CFLAGS)
 PROGS=kbtree_test khash_keith khash_keith2 khash_test klist_test kseq_test kseq_bench \
                kseq_bench2 ksort_test ksort_test-stl kvec_test kmin_test kstring_bench kstring_bench2 kstring_test \
-               kthread_test
+               kthread_test kthread_test2
 
 all:$(PROGS)
 
 clean:
-               rm -fr $(PROGS) *.dSYM a.out
+               rm -fr $(PROGS) *.dSYM a.out *.o
 
 kbtree_test:kbtree_test.c ../kbtree.h
                $(CC) $(CFLAGS) -o $@ kbtree_test.c
@@ -58,3 +58,6 @@ kstring_test:kstring_test.c ../kstring.h ../kstring.c
 
 kthread_test:kthread_test.c ../kthread.c
                $(CC) $(CFLAGS) -fopenmp -o $@ kthread_test.c ../kthread.c
+
+kthread_test2:kthread_test2.c ../kthread.c
+               $(CC) $(CFLAGS) -o $@ kthread_test2.c ../kthread.c
diff --git a/test/kthread_test2.c b/test/kthread_test2.c
new file mode 100644 (file)
index 0000000..e9750b8
--- /dev/null
@@ -0,0 +1,78 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+
+void kt_for(int n_threads, void (*func)(void*,long,int), void *data, long n);
+void kt_pipeline(int n_threads, void *(*func)(void*, int, void*), void *shared_data, int n_steps);
+
+typedef struct {
+       FILE *fp;
+       int max_lines, buf_size, n_threads;
+       char *buf;
+} pipeline_t;
+
+typedef struct {
+       int n_lines;
+       char **lines;
+} step_t;
+
+static void worker_for(void *_data, long i, int tid)
+{
+       step_t *step = (step_t*)_data;
+       char *s = step->lines[i];
+       int t, l, j;
+       l = strlen(s) - 1;
+       assert(s[l] == '\n'); // not supporting long lines
+       for (j = 0; j < l>>1; ++j)
+               t = s[j], s[j] = s[l - 1 - j], s[l - 1 - j] = t;
+}
+
+static void *worker_pipeline(void *shared, int step, void *in)
+{
+       pipeline_t *p = (pipeline_t*)shared;
+       if (step == 0) {
+               step_t *s;
+               s = calloc(1, sizeof(step_t));
+               s->lines = calloc(p->max_lines, sizeof(char*));
+               while (fgets(p->buf, p->buf_size, p->fp) != 0) {
+                       s->lines[s->n_lines] = strdup(p->buf);
+                       if (++s->n_lines >= p->max_lines)
+                               break;
+               }
+               if (s->n_lines) return s;
+       } else if (step == 1) {
+               kt_for(p->n_threads, worker_for, in, ((step_t*)in)->n_lines);
+               return in;
+       } else if (step == 2) {
+               step_t *s = (step_t*)in;
+               while (s->n_lines > 0) {
+                       fputs(s->lines[--s->n_lines], stdout);
+                       free(s->lines[s->n_lines]);
+               }
+               free(s->lines); free(s);
+       }
+       return 0;
+}
+
+int main(int argc, char *argv[])
+{
+       pipeline_t pl;
+       if (argc == 1) {
+               fprintf(stderr, "Usage: reverse <in.txt> [n_threads]\n");
+               return 1;
+       }
+       pl.fp = strcmp(argv[1], "-")? fopen(argv[1], "r") : stdin;
+       if (pl.fp == 0) {
+               fprintf(stderr, "ERROR: failed to open the input file.\n");
+               return 1;
+       }
+       pl.max_lines = 4096;
+       pl.buf_size = 0x10000;
+       pl.n_threads = argc > 2? atoi(argv[2]) : 1;
+       pl.buf = calloc(pl.buf_size, 1);
+       kt_pipeline(3, worker_pipeline, &pl, 3);
+       free(pl.buf);
+       if (pl.fp != stdin) fclose(pl.fp);
+       return 0;
+}