]> git.kaiwu.me - klib.git/commitdiff
added kt_pipeline example
authorHeng Li <lh3@me.com>
Sun, 6 Sep 2015 21:48:26 +0000 (17:48 -0400)
committerHeng Li <lh3@me.com>
Sun, 6 Sep 2015 21:48:26 +0000 (17:48 -0400)
index.html

index 606880b70ba985d8e54aeb4220385fe7d2aa8e9b..cc08f6816ce844b1fc3e1a63e45e69ba05406815 100644 (file)
@@ -6328,7 +6328,7 @@ int main()
 }
 ```</pre>
 </div>
-<div created="20141130160556556" creator="lh3" modified="20141201012131686" modifier="lh3" tags="[[Library Documentations]]" title="Kthread: simple threading models">
+<div created="20141130160556556" creator="lh3" modified="20150906214636521" modifier="lh3" tags="[[Library Documentations]]" title="Kthread: simple threading models">
 <pre>!!Synopsis
 
 * Functionality: simple multi-threading models.
@@ -6382,6 +6382,90 @@ int main(int argc, char *argv[])
     free(global.k);
     return 0;
 }
+```
+
+!!!Example 2: reverse lines with kt_pipeline() and kt_for(). This example is only for demonstration purpose. In practice, we don't need this complication.
+```c
+#include &lt;stdio.h&gt;
+#include &lt;stdlib.h&gt;
+#include &lt;string.h&gt;
+#include &lt;assert.h&gt;
+
+void kt_for(int n_threads, void (*func)(void*,long,int), void *data, long n);
+void kt_pipeline(int n_threads, void *(*func)(void*, int, void*), void *shared_data, int n_steps);
+
+typedef struct {
+       FILE *fp;
+       int max_lines, buf_size, n_threads;
+       char *buf;
+} pipeline_t;
+
+typedef struct {
+       int n_lines;
+       char **lines;
+} step_t;
+
+static void worker_for(void *_data, long i, int tid) // kt_for() callback
+{
+       step_t *step = (step_t*)_data;
+       char *s = step-&gt;lines[i];
+       int t, l, j;
+       l = strlen(s) - 1;
+       assert(s[l] == '\n'); // not supporting long lines
+       for (j = 0; j &lt; l&gt;&gt;1; ++j)
+               t = s[j], s[j] = s[l - 1 - j], s[l - 1 - j] = t;
+}
+
+static void *worker_pipeline(void *shared, int step, void *in) // kt_pipeline() callback
+{
+       pipeline_t *p = (pipeline_t*)shared;
+       if (step == 0) { // step 0: read lines into the buffer
+               step_t *s;
+               s = calloc(1, sizeof(step_t));
+               s-&gt;lines = calloc(p-&gt;max_lines, sizeof(char*));
+               while (fgets(p-&gt;buf, p-&gt;buf_size, p-&gt;fp) != 0) {
+                       s-&gt;lines[s-&gt;n_lines] = strdup(p-&gt;buf);
+                       if (++s-&gt;n_lines &gt;= p-&gt;max_lines)
+                               break;
+               }
+               if (s-&gt;n_lines) return s;
+       } else if (step == 1) { // step 1: reverse lines
+               kt_for(p-&gt;n_threads, worker_for, in, ((step_t*)in)-&gt;n_lines);
+               return in;
+       } else if (step == 2) { // step 3: write the buffer to output
+               step_t *s = (step_t*)in;
+               while (s-&gt;n_lines &gt; 0) {
+                       fputs(s-&gt;lines[--s-&gt;n_lines], stdout);
+                       free(s-&gt;lines[s-&gt;n_lines]);
+               }
+               free(s-&gt;lines); free(s);
+       }
+       return 0;
+}
+
+int main(int argc, char *argv[])
+{
+       pipeline_t pl;
+       int pl_threads;
+       if (argc == 1) {
+               fprintf(stderr, &quot;Usage: reverse &lt;in.txt&gt; [pipeline_threads [for_threads]]\n&quot;);
+               return 1;
+       }
+       pl.fp = strcmp(argv[1], &quot;-&quot;)? fopen(argv[1], &quot;r&quot;) : stdin;
+       if (pl.fp == 0) {
+               fprintf(stderr, &quot;ERROR: failed to open the input file.\n&quot;);
+               return 1;
+       }
+       pl_threads = argc &gt; 2? atoi(argv[2]) : 3;
+       pl.max_lines = 4096;
+       pl.buf_size = 0x10000;
+       pl.n_threads = argc &gt; 3? atoi(argv[3]) : 1;
+       pl.buf = calloc(pl.buf_size, 1);
+       kt_pipeline(pl_threads, worker_pipeline, &amp;pl, 3);
+       free(pl.buf);
+       if (pl.fp != stdin) fclose(pl.fp);
+       return 0;
+}
 ```</pre>
 </div>
 <div created="20141130044228188" creator="lh3" modified="20141130044958286" modifier="lh3" tags="TableOfContents" title="Library Documentations">