diff options
Diffstat (limited to 'src')
30 files changed, 1905 insertions, 8 deletions
diff --git a/src/test/Makefile b/src/test/Makefile index 0fd7eabf08f..9238860febd 100644 --- a/src/test/Makefile +++ b/src/test/Makefile @@ -12,6 +12,14 @@ subdir = src/test top_builddir = ../.. include $(top_builddir)/src/Makefile.global -SUBDIRS = regress isolation +SUBDIRS = regress isolation modules -$(recurse) +# We want to recurse to all subdirs for all standard targets, except that +# installcheck and install should not recurse into the subdirectory "modules". + +recurse_alldirs_targets := $(filter-out installcheck install, $(standard_targets)) +installable_dirs := $(filter-out modules, $(SUBDIRS)) + +$(call recurse,$(recurse_alldirs_targets)) +$(call recurse,installcheck, $(installable_dirs)) +$(call recurse,install, $(installable_dirs)) diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile new file mode 100644 index 00000000000..9d5aa97b4ba --- /dev/null +++ b/src/test/modules/Makefile @@ -0,0 +1,13 @@ +# src/test/modules/Makefile + +subdir = src/test/modules +top_builddir = ../../.. +include $(top_builddir)/src/Makefile.global + +SUBDIRS = \ + worker_spi \ + dummy_seclabel \ + test_shm_mq \ + test_parser + +$(recurse) diff --git a/src/test/modules/dummy_seclabel/Makefile b/src/test/modules/dummy_seclabel/Makefile new file mode 100644 index 00000000000..909ac9ace72 --- /dev/null +++ b/src/test/modules/dummy_seclabel/Makefile @@ -0,0 +1,15 @@ +# src/test/modules/dummy_seclabel/Makefile + +MODULES = dummy_seclabel +PGFILEDESC = "dummy_seclabel - regression testing of the SECURITY LABEL statement" + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/dummy_seclabel +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/dummy_seclabel/README b/src/test/modules/dummy_seclabel/README new file mode 100644 index 00000000000..75320aaeaca --- /dev/null +++ b/src/test/modules/dummy_seclabel/README @@ -0,0 +1,43 @@ +The dummy_seclabel module exists only to support regression +testing of the SECURITY LABEL statement. It is not intended +to be used in production. + +Rationale +========= + +The SECURITY LABEL statement allows the user to assign security +labels to database objects; however, security labels can only be assigned +when specifically allowed by a loadable module, so this module is provided +to allow proper regression testing. + +Security label providers intended to be used in production will typically be +dependent on a platform-specific feature such as +SE-Linux. This module is platform-independent, +and therefore better-suited to regression testing. + +Usage +===== + +Here's a simple example of usage: + +# postgresql.conf +shared_preload_libraries = 'dummy_seclabel' + +postgres=# CREATE TABLE t (a int, b text); +CREATE TABLE +postgres=# SECURITY LABEL ON TABLE t IS 'classified'; +SECURITY LABEL + +The dummy_seclabel module provides only four hardcoded +labels: unclassified, classified, +secret, and top secret. +It does not allow any other strings as security labels. + +These labels are not used to enforce access controls. They are only used +to check whether the SECURITY LABEL statement works as expected, +or not. + +Author +====== + +KaiGai Kohei <kaigai@ak.jp.nec.com> diff --git a/src/test/modules/dummy_seclabel/dummy_seclabel.c b/src/test/modules/dummy_seclabel/dummy_seclabel.c new file mode 100644 index 00000000000..b5753cc9084 --- /dev/null +++ b/src/test/modules/dummy_seclabel/dummy_seclabel.c @@ -0,0 +1,50 @@ +/* + * dummy_seclabel.c + * + * Dummy security label provider. + * + * This module does not provide anything worthwhile from a security + * perspective, but allows regression testing independent of platform-specific + * features like SELinux. + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + */ +#include "postgres.h" + +#include "commands/seclabel.h" +#include "miscadmin.h" +#include "utils/rel.h" + +PG_MODULE_MAGIC; + +/* Entrypoint of the module */ +void _PG_init(void); + +static void +dummy_object_relabel(const ObjectAddress *object, const char *seclabel) +{ + if (seclabel == NULL || + strcmp(seclabel, "unclassified") == 0 || + strcmp(seclabel, "classified") == 0) + return; + + if (strcmp(seclabel, "secret") == 0 || + strcmp(seclabel, "top secret") == 0) + { + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("only superuser can set '%s' label", seclabel))); + return; + } + ereport(ERROR, + (errcode(ERRCODE_INVALID_NAME), + errmsg("'%s' is not a valid security label", seclabel))); +} + +void +_PG_init(void) +{ + register_label_provider("dummy", dummy_object_relabel); +} diff --git a/src/test/modules/test_parser/.gitignore b/src/test/modules/test_parser/.gitignore new file mode 100644 index 00000000000..5dcb3ff9723 --- /dev/null +++ b/src/test/modules/test_parser/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/src/test/modules/test_parser/Makefile b/src/test/modules/test_parser/Makefile new file mode 100644 index 00000000000..0c755aac73b --- /dev/null +++ b/src/test/modules/test_parser/Makefile @@ -0,0 +1,21 @@ +# src/test/modules/test_parser/Makefile + +MODULE_big = test_parser +OBJS = test_parser.o $(WIN32RES) +PGFILEDESC = "test_parser - example of a custom parser for full-text search" + +EXTENSION = test_parser +DATA = test_parser--1.0.sql test_parser--unpackaged--1.0.sql + +REGRESS = test_parser + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_parser +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_parser/README b/src/test/modules/test_parser/README new file mode 100644 index 00000000000..0a11ec85fbc --- /dev/null +++ b/src/test/modules/test_parser/README @@ -0,0 +1,61 @@ +test_parser is an example of a custom parser for full-text +search. It doesn't do anything especially useful, but can serve as +a starting point for developing your own parser. + +test_parser recognizes words separated by white space, +and returns just two token types: + +mydb=# SELECT * FROM ts_token_type('testparser'); + tokid | alias | description +-------+-------+--------------- + 3 | word | Word + 12 | blank | Space symbols +(2 rows) + +These token numbers have been chosen to be compatible with the default +parser's numbering. This allows us to use its headline() +function, thus keeping the example simple. + +Usage +===== + +Installing the test_parser extension creates a text search +parser testparser. It has no user-configurable parameters. + +You can test the parser with, for example, + +mydb=# SELECT * FROM ts_parse('testparser', 'That''s my first own parser'); + tokid | token +-------+-------- + 3 | That's + 12 | + 3 | my + 12 | + 3 | first + 12 | + 3 | own + 12 | + 3 | parser + +Real-world use requires setting up a text search configuration +that uses the parser. For example, + +mydb=# CREATE TEXT SEARCH CONFIGURATION testcfg ( PARSER = testparser ); +CREATE TEXT SEARCH CONFIGURATION + +mydb=# ALTER TEXT SEARCH CONFIGURATION testcfg +mydb-# ADD MAPPING FOR word WITH english_stem; +ALTER TEXT SEARCH CONFIGURATION + +mydb=# SELECT to_tsvector('testcfg', 'That''s my first own parser'); + to_tsvector +------------------------------- + 'that':1 'first':3 'parser':5 +(1 row) + +mydb=# SELECT ts_headline('testcfg', 'Supernovae stars are the brightest phenomena in galaxies', +mydb(# to_tsquery('testcfg', 'star')); + ts_headline +----------------------------------------------------------------- + Supernovae <b>stars</b> are the brightest phenomena in galaxies +(1 row) diff --git a/src/test/modules/test_parser/expected/test_parser.out b/src/test/modules/test_parser/expected/test_parser.out new file mode 100644 index 00000000000..8a49bc01e32 --- /dev/null +++ b/src/test/modules/test_parser/expected/test_parser.out @@ -0,0 +1,44 @@ +CREATE EXTENSION test_parser; +-- make test configuration using parser +CREATE TEXT SEARCH CONFIGURATION testcfg (PARSER = testparser); +ALTER TEXT SEARCH CONFIGURATION testcfg ADD MAPPING FOR word WITH simple; +-- ts_parse +SELECT * FROM ts_parse('testparser', 'That''s simple parser can''t parse urls like http://some.url/here/'); + tokid | token +-------+----------------------- + 3 | That's + 12 | + 3 | simple + 12 | + 3 | parser + 12 | + 3 | can't + 12 | + 3 | parse + 12 | + 3 | urls + 12 | + 3 | like + 12 | + 3 | http://some.url/here/ +(15 rows) + +SELECT to_tsvector('testcfg','That''s my first own parser'); + to_tsvector +------------------------------------------------- + 'first':3 'my':2 'own':4 'parser':5 'that''s':1 +(1 row) + +SELECT to_tsquery('testcfg', 'star'); + to_tsquery +------------ + 'star' +(1 row) + +SELECT ts_headline('testcfg','Supernovae stars are the brightest phenomena in galaxies', + to_tsquery('testcfg', 'stars')); + ts_headline +----------------------------------------------------------------- + Supernovae <b>stars</b> are the brightest phenomena in galaxies +(1 row) + diff --git a/src/test/modules/test_parser/sql/test_parser.sql b/src/test/modules/test_parser/sql/test_parser.sql new file mode 100644 index 00000000000..1f21504602b --- /dev/null +++ b/src/test/modules/test_parser/sql/test_parser.sql @@ -0,0 +1,18 @@ +CREATE EXTENSION test_parser; + +-- make test configuration using parser + +CREATE TEXT SEARCH CONFIGURATION testcfg (PARSER = testparser); + +ALTER TEXT SEARCH CONFIGURATION testcfg ADD MAPPING FOR word WITH simple; + +-- ts_parse + +SELECT * FROM ts_parse('testparser', 'That''s simple parser can''t parse urls like http://some.url/here/'); + +SELECT to_tsvector('testcfg','That''s my first own parser'); + +SELECT to_tsquery('testcfg', 'star'); + +SELECT ts_headline('testcfg','Supernovae stars are the brightest phenomena in galaxies', + to_tsquery('testcfg', 'stars')); diff --git a/src/test/modules/test_parser/test_parser--1.0.sql b/src/test/modules/test_parser/test_parser--1.0.sql new file mode 100644 index 00000000000..56bb2442c3d --- /dev/null +++ b/src/test/modules/test_parser/test_parser--1.0.sql @@ -0,0 +1,32 @@ +/* src/test/modules/test_parser/test_parser--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION test_parser" to load this file. \quit + +CREATE FUNCTION testprs_start(internal, int4) +RETURNS internal +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT; + +CREATE FUNCTION testprs_getlexeme(internal, internal, internal) +RETURNS internal +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT; + +CREATE FUNCTION testprs_end(internal) +RETURNS void +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT; + +CREATE FUNCTION testprs_lextype(internal) +RETURNS internal +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT; + +CREATE TEXT SEARCH PARSER testparser ( + START = testprs_start, + GETTOKEN = testprs_getlexeme, + END = testprs_end, + HEADLINE = pg_catalog.prsd_headline, + LEXTYPES = testprs_lextype +); diff --git a/src/test/modules/test_parser/test_parser--unpackaged--1.0.sql b/src/test/modules/test_parser/test_parser--unpackaged--1.0.sql new file mode 100644 index 00000000000..9ebc4b3fbee --- /dev/null +++ b/src/test/modules/test_parser/test_parser--unpackaged--1.0.sql @@ -0,0 +1,10 @@ +/* src/test/modules/test_parser/test_parser--unpackaged--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION test_parser FROM unpackaged" to load this file. \quit + +ALTER EXTENSION test_parser ADD function testprs_start(internal,integer); +ALTER EXTENSION test_parser ADD function testprs_getlexeme(internal,internal,internal); +ALTER EXTENSION test_parser ADD function testprs_end(internal); +ALTER EXTENSION test_parser ADD function testprs_lextype(internal); +ALTER EXTENSION test_parser ADD text search parser testparser; diff --git a/src/test/modules/test_parser/test_parser.c b/src/test/modules/test_parser/test_parser.c new file mode 100644 index 00000000000..68c0a93e3cd --- /dev/null +++ b/src/test/modules/test_parser/test_parser.c @@ -0,0 +1,128 @@ +/*------------------------------------------------------------------------- + * + * test_parser.c + * Simple example of a text search parser + * + * Copyright (c) 2007-2014, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_parser/test_parser.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "fmgr.h" + +PG_MODULE_MAGIC; + +/* + * types + */ + +/* self-defined type */ +typedef struct +{ + char *buffer; /* text to parse */ + int len; /* length of the text in buffer */ + int pos; /* position of the parser */ +} ParserState; + +/* copy-paste from wparser.h of tsearch2 */ +typedef struct +{ + int lexid; + char *alias; + char *descr; +} LexDescr; + +/* + * functions + */ +PG_FUNCTION_INFO_V1(testprs_start); +PG_FUNCTION_INFO_V1(testprs_getlexeme); +PG_FUNCTION_INFO_V1(testprs_end); +PG_FUNCTION_INFO_V1(testprs_lextype); + +Datum +testprs_start(PG_FUNCTION_ARGS) +{ + ParserState *pst = (ParserState *) palloc0(sizeof(ParserState)); + + pst->buffer = (char *) PG_GETARG_POINTER(0); + pst->len = PG_GETARG_INT32(1); + pst->pos = 0; + + PG_RETURN_POINTER(pst); +} + +Datum +testprs_getlexeme(PG_FUNCTION_ARGS) +{ + ParserState *pst = (ParserState *) PG_GETARG_POINTER(0); + char **t = (char **) PG_GETARG_POINTER(1); + int *tlen = (int *) PG_GETARG_POINTER(2); + int startpos = pst->pos; + int type; + + *t = pst->buffer + pst->pos; + + if (pst->pos < pst->len && + (pst->buffer)[pst->pos] == ' ') + { + /* blank type */ + type = 12; + /* go to the next non-space character */ + while (pst->pos < pst->len && + (pst->buffer)[pst->pos] == ' ') + (pst->pos)++; + } + else + { + /* word type */ + type = 3; + /* go to the next space character */ + while (pst->pos < pst->len && + (pst->buffer)[pst->pos] != ' ') + (pst->pos)++; + } + + *tlen = pst->pos - startpos; + + /* we are finished if (*tlen == 0) */ + if (*tlen == 0) + type = 0; + + PG_RETURN_INT32(type); +} + +Datum +testprs_end(PG_FUNCTION_ARGS) +{ + ParserState *pst = (ParserState *) PG_GETARG_POINTER(0); + + pfree(pst); + PG_RETURN_VOID(); +} + +Datum +testprs_lextype(PG_FUNCTION_ARGS) +{ + /* + * Remarks: - we have to return the blanks for headline reason - we use + * the same lexids like Teodor in the default word parser; in this way we + * can reuse the headline function of the default word parser. + */ + LexDescr *descr = (LexDescr *) palloc(sizeof(LexDescr) * (2 + 1)); + + /* there are only two types in this parser */ + descr[0].lexid = 3; + descr[0].alias = pstrdup("word"); + descr[0].descr = pstrdup("Word"); + descr[1].lexid = 12; + descr[1].alias = pstrdup("blank"); + descr[1].descr = pstrdup("Space symbols"); + descr[2].lexid = 0; + + PG_RETURN_POINTER(descr); +} diff --git a/src/test/modules/test_parser/test_parser.control b/src/test/modules/test_parser/test_parser.control new file mode 100644 index 00000000000..36b26b2087c --- /dev/null +++ b/src/test/modules/test_parser/test_parser.control @@ -0,0 +1,5 @@ +# test_parser extension +comment = 'example of a custom parser for full-text search' +default_version = '1.0' +module_pathname = '$libdir/test_parser' +relocatable = true diff --git a/src/test/modules/test_shm_mq/.gitignore b/src/test/modules/test_shm_mq/.gitignore new file mode 100644 index 00000000000..5dcb3ff9723 --- /dev/null +++ b/src/test/modules/test_shm_mq/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/src/test/modules/test_shm_mq/Makefile b/src/test/modules/test_shm_mq/Makefile new file mode 100644 index 00000000000..11c4e357202 --- /dev/null +++ b/src/test/modules/test_shm_mq/Makefile @@ -0,0 +1,21 @@ +# src/test/modules/test_shm_mq/Makefile + +MODULE_big = test_shm_mq +OBJS = test.o setup.o worker.o $(WIN32RES) +PGFILEDESC = "test_shm_mq - example use of shared memory message queue" + +EXTENSION = test_shm_mq +DATA = test_shm_mq--1.0.sql + +REGRESS = test_shm_mq + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_shm_mq +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_shm_mq/README b/src/test/modules/test_shm_mq/README new file mode 100644 index 00000000000..641407bee03 --- /dev/null +++ b/src/test/modules/test_shm_mq/README @@ -0,0 +1,49 @@ +test_shm_mq is an example of how to use dynamic shared memory +and the shared memory message queue facilities to coordinate a user backend +with the efforts of one or more background workers. It is not intended to +do anything useful on its own; rather, it is a demonstration of how these +facilities can be used, and a unit test of those facilities. + +The function is this extension send the same message repeatedly through +a loop of processes. The message payload, the size of the message queue +through which it is sent, and the number of processes in the loop are +configurable. At the end, the message may be verified to ensure that it +has not been corrupted in transmission. + +Functions +========= + + +test_shm_mq(queue_size int8, message text, + repeat_count int4 default 1, num_workers int4 default 1) + RETURNS void + +This function sends and receives messages synchronously. The user +backend sends the provided message to the first background worker using +a message queue of the given size. The first background worker sends +the message to the second background worker, if the number of workers +is greater than one, and so forth. Eventually, the last background +worker sends the message back to the user backend. If the repeat count +is greater than one, the user backend then sends the message back to +the first worker. Once the message has been sent and received by all +the coordinating processes a number of times equal to the repeat count, +the user backend verifies that the message finally received matches the +one originally sent and throws an error if not. + + +test_shm_mq_pipelined(queue_size int8, message text, + repeat_count int4 default 1, num_workers int4 default 1, + verify bool default true) + RETURNS void + +This function sends the same message multiple times, as specified by the +repeat count, to the first background worker using a queue of the given +size. These messages are then forwarded to each background worker in +turn, in each case using a queue of the given size. Finally, the last +background worker sends the messages back to the user backend. The user +backend uses non-blocking sends and receives, so that it may begin receiving +copies of the message before it has finished sending all copies of the +message. The 'verify' argument controls whether or not the +received copies are checked against the message that was sent. (This +takes nontrivial time so it may be useful to disable it for benchmarking +purposes.) diff --git a/src/test/modules/test_shm_mq/expected/test_shm_mq.out b/src/test/modules/test_shm_mq/expected/test_shm_mq.out new file mode 100644 index 00000000000..c4858b0c205 --- /dev/null +++ b/src/test/modules/test_shm_mq/expected/test_shm_mq.out @@ -0,0 +1,36 @@ +CREATE EXTENSION test_shm_mq; +-- +-- These tests don't produce any interesting output. We're checking that +-- the operations complete without crashing or hanging and that none of their +-- internal sanity tests fail. +-- +SELECT test_shm_mq(1024, '', 2000, 1); + test_shm_mq +------------- + +(1 row) + +SELECT test_shm_mq(1024, 'a', 2001, 1); + test_shm_mq +------------- + +(1 row) + +SELECT test_shm_mq(32768, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,(100+900*random())::int)), 10000, 1); + test_shm_mq +------------- + +(1 row) + +SELECT test_shm_mq(100, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,(100+200*random())::int)), 10000, 1); + test_shm_mq +------------- + +(1 row) + +SELECT test_shm_mq_pipelined(16384, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,270000)), 200, 3); + test_shm_mq_pipelined +----------------------- + +(1 row) + diff --git a/src/test/modules/test_shm_mq/setup.c b/src/test/modules/test_shm_mq/setup.c new file mode 100644 index 00000000000..b049cff2d05 --- /dev/null +++ b/src/test/modules/test_shm_mq/setup.c @@ -0,0 +1,328 @@ +/*-------------------------------------------------------------------------- + * + * setup.c + * Code to set up a dynamic shared memory segments and a specified + * number of background workers for shared memory message queue + * testing. + * + * Copyright (C) 2013-2014, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_shm_mq/setup.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "storage/procsignal.h" +#include "storage/shm_toc.h" +#include "utils/memutils.h" + +#include "test_shm_mq.h" + +typedef struct +{ + int nworkers; + BackgroundWorkerHandle *handle[FLEXIBLE_ARRAY_MEMBER]; +} worker_state; + +static void setup_dynamic_shared_memory(int64 queue_size, int nworkers, + dsm_segment **segp, + test_shm_mq_header **hdrp, + shm_mq **outp, shm_mq **inp); +static worker_state *setup_background_workers(int nworkers, + dsm_segment *seg); +static void cleanup_background_workers(dsm_segment *seg, Datum arg); +static void wait_for_workers_to_become_ready(worker_state *wstate, + volatile test_shm_mq_header *hdr); +static bool check_worker_status(worker_state *wstate); + +/* + * Set up a dynamic shared memory segment and zero or more background workers + * for a test run. + */ +void +test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, + shm_mq_handle **output, shm_mq_handle **input) +{ + dsm_segment *seg; + test_shm_mq_header *hdr; + shm_mq *outq = NULL; /* placate compiler */ + shm_mq *inq = NULL; /* placate compiler */ + worker_state *wstate; + + /* Set up a dynamic shared memory segment. */ + setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq); + *segp = seg; + + /* Register background workers. */ + wstate = setup_background_workers(nworkers, seg); + + /* Attach the queues. */ + *output = shm_mq_attach(outq, seg, wstate->handle[0]); + *input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]); + + /* Wait for workers to become ready. */ + wait_for_workers_to_become_ready(wstate, hdr); + + /* + * Once we reach this point, all workers are ready. We no longer need to + * kill them if we die; they'll die on their own as the message queues + * shut down. + */ + cancel_on_dsm_detach(seg, cleanup_background_workers, + PointerGetDatum(wstate)); + pfree(wstate); +} + +/* + * Set up a dynamic shared memory segment. + * + * We set up a small control region that contains only a test_shm_mq_header, + * plus one region per message queue. There are as many message queues as + * the number of workers, plus one. + */ +static void +setup_dynamic_shared_memory(int64 queue_size, int nworkers, + dsm_segment **segp, test_shm_mq_header **hdrp, + shm_mq **outp, shm_mq **inp) +{ + shm_toc_estimator e; + int i; + Size segsize; + dsm_segment *seg; + shm_toc *toc; + test_shm_mq_header *hdr; + + /* Ensure a valid queue size. */ + if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("queue size must be at least %zu bytes", + shm_mq_minimum_size))); + if (queue_size != ((Size) queue_size)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("queue size overflows size_t"))); + + /* + * Estimate how much shared memory we need. + * + * Because the TOC machinery may choose to insert padding of oddly-sized + * requests, we must estimate each chunk separately. + * + * We need one key to register the location of the header, and we need + * nworkers + 1 keys to track the locations of the message queues. + */ + shm_toc_initialize_estimator(&e); + shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header)); + for (i = 0; i <= nworkers; ++i) + shm_toc_estimate_chunk(&e, (Size) queue_size); + shm_toc_estimate_keys(&e, 2 + nworkers); + segsize = shm_toc_estimate(&e); + + /* Create the shared memory segment and establish a table of contents. */ + seg = dsm_create(shm_toc_estimate(&e)); + toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg), + segsize); + + /* Set up the header region. */ + hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header)); + SpinLockInit(&hdr->mutex); + hdr->workers_total = nworkers; + hdr->workers_attached = 0; + hdr->workers_ready = 0; + shm_toc_insert(toc, 0, hdr); + + /* Set up one message queue per worker, plus one. */ + for (i = 0; i <= nworkers; ++i) + { + shm_mq *mq; + + mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size), + (Size) queue_size); + shm_toc_insert(toc, i + 1, mq); + + if (i == 0) + { + /* We send messages to the first queue. */ + shm_mq_set_sender(mq, MyProc); + *outp = mq; + } + if (i == nworkers) + { + /* We receive messages from the last queue. */ + shm_mq_set_receiver(mq, MyProc); + *inp = mq; + } + } + + /* Return results to caller. */ + *segp = seg; + *hdrp = hdr; +} + +/* + * Register background workers. + */ +static worker_state * +setup_background_workers(int nworkers, dsm_segment *seg) +{ + MemoryContext oldcontext; + BackgroundWorker worker; + worker_state *wstate; + int i; + + /* + * We need the worker_state object and the background worker handles to + * which it points to be allocated in CurTransactionContext rather than + * ExprContext; otherwise, they'll be destroyed before the on_dsm_detach + * hooks run. + */ + oldcontext = MemoryContextSwitchTo(CurTransactionContext); + + /* Create worker state object. */ + wstate = MemoryContextAlloc(TopTransactionContext, + offsetof(worker_state, handle) + + sizeof(BackgroundWorkerHandle *) * nworkers); + wstate->nworkers = 0; + + /* + * Arrange to kill all the workers if we abort before all workers are + * finished hooking themselves up to the dynamic shared memory segment. + * + * If we die after all the workers have finished hooking themselves up to + * the dynamic shared memory segment, we'll mark the two queues to which + * we're directly connected as detached, and the worker(s) connected to + * those queues will exit, marking any other queues to which they are + * connected as detached. This will cause any as-yet-unaware workers + * connected to those queues to exit in their turn, and so on, until + * everybody exits. + * + * But suppose the workers which are supposed to connect to the queues to + * which we're directly attached exit due to some error before they + * actually attach the queues. The remaining workers will have no way of + * knowing this. From their perspective, they're still waiting for those + * workers to start, when in fact they've already died. + */ + on_dsm_detach(seg, cleanup_background_workers, + PointerGetDatum(wstate)); + + /* Configure a worker. */ + worker.bgw_flags = BGWORKER_SHMEM_ACCESS; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = BGW_NEVER_RESTART; + worker.bgw_main = NULL; /* new worker might not have library loaded */ + sprintf(worker.bgw_library_name, "test_shm_mq"); + sprintf(worker.bgw_function_name, "test_shm_mq_main"); + snprintf(worker.bgw_name, BGW_MAXLEN, "test_shm_mq"); + worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg)); + /* set bgw_notify_pid, so we can detect if the worker stops */ + worker.bgw_notify_pid = MyProcPid; + + /* Register the workers. */ + for (i = 0; i < nworkers; ++i) + { + if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i])) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not register background process"), + errhint("You may need to increase max_worker_processes."))); + ++wstate->nworkers; + } + + /* All done. */ + MemoryContextSwitchTo(oldcontext); + return wstate; +} + +static void +cleanup_background_workers(dsm_segment *seg, Datum arg) +{ + worker_state *wstate = (worker_state *) DatumGetPointer(arg); + + while (wstate->nworkers > 0) + { + --wstate->nworkers; + TerminateBackgroundWorker(wstate->handle[wstate->nworkers]); + } +} + +static void +wait_for_workers_to_become_ready(worker_state *wstate, + volatile test_shm_mq_header *hdr) +{ + bool save_set_latch_on_sigusr1; + bool result = false; + + save_set_latch_on_sigusr1 = set_latch_on_sigusr1; + set_latch_on_sigusr1 = true; + + PG_TRY(); + { + for (;;) + { + int workers_ready; + + /* If all the workers are ready, we have succeeded. */ + SpinLockAcquire(&hdr->mutex); + workers_ready = hdr->workers_ready; + SpinLockRelease(&hdr->mutex); + if (workers_ready >= wstate->nworkers) + { + result = true; + break; + } + + /* If any workers (or the postmaster) have died, we have failed. */ + if (!check_worker_status(wstate)) + { + result = false; + break; + } + + /* Wait to be signalled. */ + WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0); + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); + + /* Reset the latch so we don't spin. */ + ResetLatch(&MyProc->procLatch); + } + } + PG_CATCH(); + { + set_latch_on_sigusr1 = save_set_latch_on_sigusr1; + PG_RE_THROW(); + } + PG_END_TRY(); + + if (!result) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("one or more background workers failed to start"))); +} + +static bool +check_worker_status(worker_state *wstate) +{ + int n; + + /* If any workers (or the postmaster) have died, we have failed. */ + for (n = 0; n < wstate->nworkers; ++n) + { + BgwHandleStatus status; + pid_t pid; + + status = GetBackgroundWorkerPid(wstate->handle[n], &pid); + if (status == BGWH_STOPPED || status == BGWH_POSTMASTER_DIED) + return false; + } + + /* Otherwise, things still look OK. */ + return true; +} diff --git a/src/test/modules/test_shm_mq/sql/test_shm_mq.sql b/src/test/modules/test_shm_mq/sql/test_shm_mq.sql new file mode 100644 index 00000000000..9de19d304a2 --- /dev/null +++ b/src/test/modules/test_shm_mq/sql/test_shm_mq.sql @@ -0,0 +1,12 @@ +CREATE EXTENSION test_shm_mq; + +-- +-- These tests don't produce any interesting output. We're checking that +-- the operations complete without crashing or hanging and that none of their +-- internal sanity tests fail. +-- +SELECT test_shm_mq(1024, '', 2000, 1); +SELECT test_shm_mq(1024, 'a', 2001, 1); +SELECT test_shm_mq(32768, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,(100+900*random())::int)), 10000, 1); +SELECT test_shm_mq(100, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,(100+200*random())::int)), 10000, 1); +SELECT test_shm_mq_pipelined(16384, (select string_agg(chr(32+(random()*95)::int), '') from generate_series(1,270000)), 200, 3); diff --git a/src/test/modules/test_shm_mq/test.c b/src/test/modules/test_shm_mq/test.c new file mode 100644 index 00000000000..6428fcf51c4 --- /dev/null +++ b/src/test/modules/test_shm_mq/test.c @@ -0,0 +1,264 @@ +/*-------------------------------------------------------------------------- + * + * test.c + * Test harness code for shared memory message queues. + * + * Copyright (C) 2013-2014, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_shm_mq/test.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "fmgr.h" +#include "miscadmin.h" + +#include "test_shm_mq.h" + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(test_shm_mq); +PG_FUNCTION_INFO_V1(test_shm_mq_pipelined); + +void _PG_init(void); + +static void verify_message(Size origlen, char *origdata, Size newlen, + char *newdata); + +/* + * Simple test of the shared memory message queue infrastructure. + * + * We set up a ring of message queues passing through 1 or more background + * processes and eventually looping back to ourselves. We then send a message + * through the ring a number of times indicated by the loop count. At the end, + * we check whether the final message matches the one we started with. + */ +Datum +test_shm_mq(PG_FUNCTION_ARGS) +{ + int64 queue_size = PG_GETARG_INT64(0); + text *message = PG_GETARG_TEXT_PP(1); + char *message_contents = VARDATA_ANY(message); + int message_size = VARSIZE_ANY_EXHDR(message); + int32 loop_count = PG_GETARG_INT32(2); + int32 nworkers = PG_GETARG_INT32(3); + dsm_segment *seg; + shm_mq_handle *outqh; + shm_mq_handle *inqh; + shm_mq_result res; + Size len; + void *data; + + /* A negative loopcount is nonsensical. */ + if (loop_count < 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("repeat count size must be a non-negative integer"))); + + /* + * Since this test sends data using the blocking interfaces, it cannot + * send data to itself. Therefore, a minimum of 1 worker is required. Of + * course, a negative worker count is nonsensical. + */ + if (nworkers < 1) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("number of workers must be a positive integer"))); + + /* Set up dynamic shared memory segment and background workers. */ + test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh); + + /* Send the initial message. */ + res = shm_mq_send(outqh, message_size, message_contents, false); + if (res != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send message"))); + + /* + * Receive a message and send it back out again. Do this a number of + * times equal to the loop count. + */ + for (;;) + { + /* Receive a message. */ + res = shm_mq_receive(inqh, &len, &data, false); + if (res != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not receive message"))); + + /* If this is supposed to be the last iteration, stop here. */ + if (--loop_count <= 0) + break; + + /* Send it back out. */ + res = shm_mq_send(outqh, len, data, false); + if (res != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send message"))); + } + + /* + * Finally, check that we got back the same message from the last + * iteration that we originally sent. + */ + verify_message(message_size, message_contents, len, data); + + /* Clean up. */ + dsm_detach(seg); + + PG_RETURN_VOID(); +} + +/* + * Pipelined test of the shared memory message queue infrastructure. + * + * As in the basic test, we set up a ring of message queues passing through + * 1 or more background processes and eventually looping back to ourselves. + * Then, we send N copies of the user-specified message through the ring and + * receive them all back. Since this might fill up all message queues in the + * ring and then stall, we must be prepared to begin receiving the messages + * back before we've finished sending them. + */ +Datum +test_shm_mq_pipelined(PG_FUNCTION_ARGS) +{ + int64 queue_size = PG_GETARG_INT64(0); + text *message = PG_GETARG_TEXT_PP(1); + char *message_contents = VARDATA_ANY(message); + int message_size = VARSIZE_ANY_EXHDR(message); + int32 loop_count = PG_GETARG_INT32(2); + int32 nworkers = PG_GETARG_INT32(3); + bool verify = PG_GETARG_BOOL(4); + int32 send_count = 0; + int32 receive_count = 0; + dsm_segment *seg; + shm_mq_handle *outqh; + shm_mq_handle *inqh; + shm_mq_result res; + Size len; + void *data; + + /* A negative loopcount is nonsensical. */ + if (loop_count < 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("repeat count size must be a non-negative integer"))); + + /* + * Using the nonblocking interfaces, we can even send data to ourselves, + * so the minimum number of workers for this test is zero. + */ + if (nworkers < 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("number of workers must be a non-negative integer"))); + + /* Set up dynamic shared memory segment and background workers. */ + test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh); + + /* Main loop. */ + for (;;) + { + bool wait = true; + + /* + * If we haven't yet sent the message the requisite number of times, + * try again to send it now. Note that when shm_mq_send() returns + * SHM_MQ_WOULD_BLOCK, the next call to that function must pass the + * same message size and contents; that's not an issue here because + * we're sending the same message every time. + */ + if (send_count < loop_count) + { + res = shm_mq_send(outqh, message_size, message_contents, true); + if (res == SHM_MQ_SUCCESS) + { + ++send_count; + wait = false; + } + else if (res == SHM_MQ_DETACHED) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send message"))); + } + + /* + * If we haven't yet received the message the requisite number of + * times, try to receive it again now. + */ + if (receive_count < loop_count) + { + res = shm_mq_receive(inqh, &len, &data, true); + if (res == SHM_MQ_SUCCESS) + { + ++receive_count; + /* Verifying every time is slow, so it's optional. */ + if (verify) + verify_message(message_size, message_contents, len, data); + wait = false; + } + else if (res == SHM_MQ_DETACHED) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not receive message"))); + } + else + { + /* + * Otherwise, we've received the message enough times. This + * shouldn't happen unless we've also sent it enough times. + */ + if (send_count != receive_count) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("message sent %d times, but received %d times", + send_count, receive_count))); + break; + } + + if (wait) + { + /* + * If we made no progress, wait for one of the other processes to + * which we are connected to set our latch, indicating that they + * have read or written data and therefore there may now be work + * for us to do. + */ + WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0); + CHECK_FOR_INTERRUPTS(); + ResetLatch(&MyProc->procLatch); + } + } + + /* Clean up. */ + dsm_detach(seg); + + PG_RETURN_VOID(); +} + +/* + * Verify that two messages are the same. + */ +static void +verify_message(Size origlen, char *origdata, Size newlen, char *newdata) +{ + Size i; + + if (origlen != newlen) + ereport(ERROR, + (errmsg("message corrupted"), + errdetail("The original message was %zu bytes but the final message is %zu bytes.", + origlen, newlen))); + + for (i = 0; i < origlen; ++i) + if (origdata[i] != newdata[i]) + ereport(ERROR, + (errmsg("message corrupted"), + errdetail("The new and original messages differ at byte %zu of %zu.", i, origlen))); +} diff --git a/src/test/modules/test_shm_mq/test_shm_mq--1.0.sql b/src/test/modules/test_shm_mq/test_shm_mq--1.0.sql new file mode 100644 index 00000000000..56db05d93df --- /dev/null +++ b/src/test/modules/test_shm_mq/test_shm_mq--1.0.sql @@ -0,0 +1,19 @@ +/* src/test/modules/test_shm_mq/test_shm_mq--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION test_shm_mq" to load this file. \quit + +CREATE FUNCTION test_shm_mq(queue_size pg_catalog.int8, + message pg_catalog.text, + repeat_count pg_catalog.int4 default 1, + num_workers pg_catalog.int4 default 1) + RETURNS pg_catalog.void STRICT + AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION test_shm_mq_pipelined(queue_size pg_catalog.int8, + message pg_catalog.text, + repeat_count pg_catalog.int4 default 1, + num_workers pg_catalog.int4 default 1, + verify pg_catalog.bool default true) + RETURNS pg_catalog.void STRICT + AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_shm_mq/test_shm_mq.control b/src/test/modules/test_shm_mq/test_shm_mq.control new file mode 100644 index 00000000000..d9a74c7a323 --- /dev/null +++ b/src/test/modules/test_shm_mq/test_shm_mq.control @@ -0,0 +1,4 @@ +comment = 'Test code for shared memory message queues' +default_version = '1.0' +module_pathname = '$libdir/test_shm_mq' +relocatable = true diff --git a/src/test/modules/test_shm_mq/test_shm_mq.h b/src/test/modules/test_shm_mq/test_shm_mq.h new file mode 100644 index 00000000000..144345b611b --- /dev/null +++ b/src/test/modules/test_shm_mq/test_shm_mq.h @@ -0,0 +1,45 @@ +/*-------------------------------------------------------------------------- + * + * test_shm_mq.h + * Definitions for shared memory message queues + * + * Copyright (C) 2013, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_shm_mq/test_shm_mq.h + * + * ------------------------------------------------------------------------- + */ + +#ifndef TEST_SHM_MQ_H +#define TEST_SHM_MQ_H + +#include "storage/dsm.h" +#include "storage/shm_mq.h" +#include "storage/spin.h" + +/* Identifier for shared memory segments used by this extension. */ +#define PG_TEST_SHM_MQ_MAGIC 0x79fb2447 + +/* + * This structure is stored in the dynamic shared memory segment. We use + * it to determine whether all workers started up OK and successfully + * attached to their respective shared message queues. + */ +typedef struct +{ + slock_t mutex; + int workers_total; + int workers_attached; + int workers_ready; +} test_shm_mq_header; + +/* Set up dynamic shared memory and background workers for test run. */ +extern void test_shm_mq_setup(int64 queue_size, int32 nworkers, + dsm_segment **seg, shm_mq_handle **output, + shm_mq_handle **input); + +/* Main entrypoint for a worker. */ +extern void test_shm_mq_main(Datum) __attribute__((noreturn)); + +#endif diff --git a/src/test/modules/test_shm_mq/worker.c b/src/test/modules/test_shm_mq/worker.c new file mode 100644 index 00000000000..dec058b55ea --- /dev/null +++ b/src/test/modules/test_shm_mq/worker.c @@ -0,0 +1,224 @@ +/*-------------------------------------------------------------------------- + * + * worker.c + * Code for sample worker making use of shared memory message queues. + * Our test worker simply reads messages from one message queue and + * writes them back out to another message queue. In a real + * application, you'd presumably want the worker to do some more + * complex calculation rather than simply returning the input, + * but it should be possible to use much of the control logic just + * as presented here. + * + * Copyright (C) 2013-2014, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_shm_mq/worker.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" +#include "storage/ipc.h" +#include "storage/procarray.h" +#include "storage/shm_mq.h" +#include "storage/shm_toc.h" +#include "utils/resowner.h" + +#include "test_shm_mq.h" + +static void handle_sigterm(SIGNAL_ARGS); +static void attach_to_queues(dsm_segment *seg, shm_toc *toc, + int myworkernumber, shm_mq_handle **inqhp, + shm_mq_handle **outqhp); +static void copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh); + +/* + * Background worker entrypoint. + * + * This is intended to demonstrate how a background worker can be used to + * facilitate a parallel computation. Most of the logic here is fairly + * boilerplate stuff, designed to attach to the shared memory segment, + * notify the user backend that we're alive, and so on. The + * application-specific bits of logic that you'd replace for your own worker + * are attach_to_queues() and copy_messages(). + */ +void +test_shm_mq_main(Datum main_arg) +{ + dsm_segment *seg; + shm_toc *toc; + shm_mq_handle *inqh; + shm_mq_handle *outqh; + volatile test_shm_mq_header *hdr; + int myworkernumber; + PGPROC *registrant; + + /* + * Establish signal handlers. + * + * We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as + * it would a normal user backend. To make that happen, we establish a + * signal handler that is a stripped-down version of die(). We don't have + * any equivalent of the backend's command-read loop, where interrupts can + * be processed immediately, so make sure ImmediateInterruptOK is turned + * off. + */ + pqsignal(SIGTERM, handle_sigterm); + ImmediateInterruptOK = false; + BackgroundWorkerUnblockSignals(); + + /* + * Connect to the dynamic shared memory segment. + * + * The backend that registered this worker passed us the ID of a shared + * memory segment to which we must attach for further instructions. In + * order to attach to dynamic shared memory, we need a resource owner. + * Once we've mapped the segment in our address space, attach to the table + * of contents so we can locate the various data structures we'll need to + * find within the segment. + */ + CurrentResourceOwner = ResourceOwnerCreate(NULL, "test_shm_mq worker"); + seg = dsm_attach(DatumGetInt32(main_arg)); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unable to map dynamic shared memory segment"))); + toc = shm_toc_attach(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("bad magic number in dynamic shared memory segment"))); + + /* + * Acquire a worker number. + * + * By convention, the process registering this background worker should + * have stored the control structure at key 0. We look up that key to + * find it. Our worker number gives our identity: there may be just one + * worker involved in this parallel operation, or there may be many. + */ + hdr = shm_toc_lookup(toc, 0); + SpinLockAcquire(&hdr->mutex); + myworkernumber = ++hdr->workers_attached; + SpinLockRelease(&hdr->mutex); + if (myworkernumber > hdr->workers_total) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("too many message queue testing workers already"))); + + /* + * Attach to the appropriate message queues. + */ + attach_to_queues(seg, toc, myworkernumber, &inqh, &outqh); + + /* + * Indicate that we're fully initialized and ready to begin the main part + * of the parallel operation. + * + * Once we signal that we're ready, the user backend is entitled to assume + * that our on_dsm_detach callbacks will fire before we disconnect from + * the shared memory segment and exit. Generally, that means we must have + * attached to all relevant dynamic shared memory data structures by now. + */ + SpinLockAcquire(&hdr->mutex); + ++hdr->workers_ready; + SpinLockRelease(&hdr->mutex); + registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid); + if (registrant == NULL) + { + elog(DEBUG1, "registrant backend has exited prematurely"); + proc_exit(1); + } + SetLatch(®istrant->procLatch); + + /* Do the work. */ + copy_messages(inqh, outqh); + + /* + * We're done. Explicitly detach the shared memory segment so that we + * don't get a resource leak warning at commit time. This will fire any + * on_dsm_detach callbacks we've registered, as well. Once that's done, + * we can go ahead and exit. + */ + dsm_detach(seg); + proc_exit(1); +} + +/* + * Attach to shared memory message queues. + * + * We use our worker number to determine to which queue we should attach. + * The queues are registered at keys 1..<number-of-workers>. The user backend + * writes to queue #1 and reads from queue #<number-of-workers>; each worker + * reads from the queue whose number is equal to its worker number and writes + * to the next higher-numbered queue. + */ +static void +attach_to_queues(dsm_segment *seg, shm_toc *toc, int myworkernumber, + shm_mq_handle **inqhp, shm_mq_handle **outqhp) +{ + shm_mq *inq; + shm_mq *outq; + + inq = shm_toc_lookup(toc, myworkernumber); + shm_mq_set_receiver(inq, MyProc); + *inqhp = shm_mq_attach(inq, seg, NULL); + outq = shm_toc_lookup(toc, myworkernumber + 1); + shm_mq_set_sender(outq, MyProc); + *outqhp = shm_mq_attach(outq, seg, NULL); +} + +/* + * Loop, receiving and sending messages, until the connection is broken. + * + * This is the "real work" performed by this worker process. Everything that + * happens before this is initialization of one form or another, and everything + * after this point is cleanup. + */ +static void +copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh) +{ + Size len; + void *data; + shm_mq_result res; + + for (;;) + { + /* Notice any interrupts that have occurred. */ + CHECK_FOR_INTERRUPTS(); + + /* Receive a message. */ + res = shm_mq_receive(inqh, &len, &data, false); + if (res != SHM_MQ_SUCCESS) + break; + + /* Send it back out. */ + res = shm_mq_send(outqh, len, data, false); + if (res != SHM_MQ_SUCCESS) + break; + } +} + +/* + * When we receive a SIGTERM, we set InterruptPending and ProcDiePending just + * like a normal backend. The next CHECK_FOR_INTERRUPTS() will do the right + * thing. + */ +static void +handle_sigterm(SIGNAL_ARGS) +{ + int save_errno = errno; + + if (MyProc) + SetLatch(&MyProc->procLatch); + + if (!proc_exit_inprogress) + { + InterruptPending = true; + ProcDiePending = true; + } + + errno = save_errno; +} diff --git a/src/test/modules/worker_spi/Makefile b/src/test/modules/worker_spi/Makefile new file mode 100644 index 00000000000..7cdb33c9dfc --- /dev/null +++ b/src/test/modules/worker_spi/Makefile @@ -0,0 +1,18 @@ +# src/test/modules/worker_spi/Makefile + +MODULES = worker_spi + +EXTENSION = worker_spi +DATA = worker_spi--1.0.sql +PGFILEDESC = "worker_spi - background worker example" + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/worker_spi +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/worker_spi/worker_spi--1.0.sql b/src/test/modules/worker_spi/worker_spi--1.0.sql new file mode 100644 index 00000000000..e9d5b07373a --- /dev/null +++ b/src/test/modules/worker_spi/worker_spi--1.0.sql @@ -0,0 +1,9 @@ +/* src/test/modules/worker_spi/worker_spi--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION worker_spi" to load this file. \quit + +CREATE FUNCTION worker_spi_launch(pg_catalog.int4) +RETURNS pg_catalog.int4 STRICT +AS 'MODULE_PATHNAME' +LANGUAGE C; diff --git a/src/test/modules/worker_spi/worker_spi.c b/src/test/modules/worker_spi/worker_spi.c new file mode 100644 index 00000000000..ac0f59c9886 --- /dev/null +++ b/src/test/modules/worker_spi/worker_spi.c @@ -0,0 +1,407 @@ +/* ------------------------------------------------------------------------- + * + * worker_spi.c + * Sample background worker code that demonstrates various coding + * patterns: establishing a database connection; starting and committing + * transactions; using GUC variables, and heeding SIGHUP to reread + * the configuration file; reporting to pg_stat_activity; using the + * process latch to sleep and exit in case of postmaster death. + * + * This code connects to a database, creates a schema and table, and summarizes + * the numbers contained therein. To see it working, insert an initial value + * with "total" type and some initial value; then insert some other rows with + * "delta" type. Delta rows will be deleted by this worker and their values + * aggregated into the total. + * + * Copyright (C) 2013-2014, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/worker_spi/worker_spi.c + * + * ------------------------------------------------------------------------- + */ +#include "postgres.h" + +/* These are always necessary for a bgworker */ +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/lwlock.h" +#include "storage/proc.h" +#include "storage/shmem.h" + +/* these headers are used by this particular worker's code */ +#include "access/xact.h" +#include "executor/spi.h" +#include "fmgr.h" +#include "lib/stringinfo.h" +#include "pgstat.h" +#include "utils/builtins.h" +#include "utils/snapmgr.h" +#include "tcop/utility.h" + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(worker_spi_launch); + +void _PG_init(void); +void worker_spi_main(Datum) __attribute__((noreturn)); + +/* flags set by signal handlers */ +static volatile sig_atomic_t got_sighup = false; +static volatile sig_atomic_t got_sigterm = false; + +/* GUC variables */ +static int worker_spi_naptime = 10; +static int worker_spi_total_workers = 2; + + +typedef struct worktable +{ + const char *schema; + const char *name; +} worktable; + +/* + * Signal handler for SIGTERM + * Set a flag to let the main loop to terminate, and set our latch to wake + * it up. + */ +static void +worker_spi_sigterm(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_sigterm = true; + if (MyProc) + SetLatch(&MyProc->procLatch); + + errno = save_errno; +} + +/* + * Signal handler for SIGHUP + * Set a flag to tell the main loop to reread the config file, and set + * our latch to wake it up. + */ +static void +worker_spi_sighup(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_sighup = true; + if (MyProc) + SetLatch(&MyProc->procLatch); + + errno = save_errno; +} + +/* + * Initialize workspace for a worker process: create the schema if it doesn't + * already exist. + */ +static void +initialize_worker_spi(worktable *table) +{ + int ret; + int ntup; + bool isnull; + StringInfoData buf; + + SetCurrentStatementStartTimestamp(); + StartTransactionCommand(); + SPI_connect(); + PushActiveSnapshot(GetTransactionSnapshot()); + pgstat_report_activity(STATE_RUNNING, "initializing spi_worker schema"); + + /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */ + initStringInfo(&buf); + appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'", + table->schema); + + ret = SPI_execute(buf.data, true, 0); + if (ret != SPI_OK_SELECT) + elog(FATAL, "SPI_execute failed: error code %d", ret); + + if (SPI_processed != 1) + elog(FATAL, "not a singleton result"); + + ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0], + SPI_tuptable->tupdesc, + 1, &isnull)); + if (isnull) + elog(FATAL, "null result"); + + if (ntup == 0) + { + resetStringInfo(&buf); + appendStringInfo(&buf, + "CREATE SCHEMA \"%s\" " + "CREATE TABLE \"%s\" (" + " type text CHECK (type IN ('total', 'delta')), " + " value integer)" + "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) " + "WHERE type = 'total'", + table->schema, table->name, table->name, table->name); + + /* set statement start time */ + SetCurrentStatementStartTimestamp(); + + ret = SPI_execute(buf.data, false, 0); + + if (ret != SPI_OK_UTILITY) + elog(FATAL, "failed to create my schema"); + } + + SPI_finish(); + PopActiveSnapshot(); + CommitTransactionCommand(); + pgstat_report_activity(STATE_IDLE, NULL); +} + +void +worker_spi_main(Datum main_arg) +{ + int index = DatumGetInt32(main_arg); + worktable *table; + StringInfoData buf; + char name[20]; + + table = palloc(sizeof(worktable)); + sprintf(name, "schema%d", index); + table->schema = pstrdup(name); + table->name = pstrdup("counted"); + + /* Establish signal handlers before unblocking signals. */ + pqsignal(SIGHUP, worker_spi_sighup); + pqsignal(SIGTERM, worker_spi_sigterm); + + /* We're now ready to receive signals */ + BackgroundWorkerUnblockSignals(); + + /* Connect to our database */ + BackgroundWorkerInitializeConnection("postgres", NULL); + + elog(LOG, "%s initialized with %s.%s", + MyBgworkerEntry->bgw_name, table->schema, table->name); + initialize_worker_spi(table); + + /* + * Quote identifiers passed to us. Note that this must be done after + * initialize_worker_spi, because that routine assumes the names are not + * quoted. + * + * Note some memory might be leaked here. + */ + table->schema = quote_identifier(table->schema); + table->name = quote_identifier(table->name); + + initStringInfo(&buf); + appendStringInfo(&buf, + "WITH deleted AS (DELETE " + "FROM %s.%s " + "WHERE type = 'delta' RETURNING value), " + "total AS (SELECT coalesce(sum(value), 0) as sum " + "FROM deleted) " + "UPDATE %s.%s " + "SET value = %s.value + total.sum " + "FROM total WHERE type = 'total' " + "RETURNING %s.value", + table->schema, table->name, + table->schema, table->name, + table->name, + table->name); + + /* + * Main loop: do this until the SIGTERM handler tells us to terminate + */ + while (!got_sigterm) + { + int ret; + int rc; + + /* + * Background workers mustn't call usleep() or any direct equivalent: + * instead, they may wait on their process latch, which sleeps as + * necessary, but is awakened if postmaster dies. That way the + * background process goes away immediately in an emergency. + */ + rc = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + worker_spi_naptime * 1000L); + ResetLatch(&MyProc->procLatch); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + /* + * In case of a SIGHUP, just reload the configuration. + */ + if (got_sighup) + { + got_sighup = false; + ProcessConfigFile(PGC_SIGHUP); + } + + /* + * Start a transaction on which we can run queries. Note that each + * StartTransactionCommand() call should be preceded by a + * SetCurrentStatementStartTimestamp() call, which sets both the time + * for the statement we're about the run, and also the transaction + * start time. Also, each other query sent to SPI should probably be + * preceded by SetCurrentStatementStartTimestamp(), so that statement + * start time is always up to date. + * + * The SPI_connect() call lets us run queries through the SPI manager, + * and the PushActiveSnapshot() call creates an "active" snapshot + * which is necessary for queries to have MVCC data to work on. + * + * The pgstat_report_activity() call makes our activity visible + * through the pgstat views. + */ + SetCurrentStatementStartTimestamp(); + StartTransactionCommand(); + SPI_connect(); + PushActiveSnapshot(GetTransactionSnapshot()); + pgstat_report_activity(STATE_RUNNING, buf.data); + + /* We can now execute queries via SPI */ + ret = SPI_execute(buf.data, false, 0); + + if (ret != SPI_OK_UPDATE_RETURNING) + elog(FATAL, "cannot select from table %s.%s: error code %d", + table->schema, table->name, ret); + + if (SPI_processed > 0) + { + bool isnull; + int32 val; + + val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0], + SPI_tuptable->tupdesc, + 1, &isnull)); + if (!isnull) + elog(LOG, "%s: count in %s.%s is now %d", + MyBgworkerEntry->bgw_name, + table->schema, table->name, val); + } + + /* + * And finish our transaction. + */ + SPI_finish(); + PopActiveSnapshot(); + CommitTransactionCommand(); + pgstat_report_activity(STATE_IDLE, NULL); + } + + proc_exit(1); +} + +/* + * Entrypoint of this module. + * + * We register more than one worker process here, to demonstrate how that can + * be done. + */ +void +_PG_init(void) +{ + BackgroundWorker worker; + unsigned int i; + + /* get the configuration */ + DefineCustomIntVariable("worker_spi.naptime", + "Duration between each check (in seconds).", + NULL, + &worker_spi_naptime, + 10, + 1, + INT_MAX, + PGC_SIGHUP, + 0, + NULL, + NULL, + NULL); + + if (!process_shared_preload_libraries_in_progress) + return; + + DefineCustomIntVariable("worker_spi.total_workers", + "Number of workers.", + NULL, + &worker_spi_total_workers, + 2, + 1, + 100, + PGC_POSTMASTER, + 0, + NULL, + NULL, + NULL); + + /* set up common data for all our workers */ + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_RecoveryFinished; + worker.bgw_restart_time = BGW_NEVER_RESTART; + worker.bgw_main = worker_spi_main; + worker.bgw_notify_pid = 0; + + /* + * Now fill in worker-specific data, and do the actual registrations. + */ + for (i = 1; i <= worker_spi_total_workers; i++) + { + snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i); + worker.bgw_main_arg = Int32GetDatum(i); + + RegisterBackgroundWorker(&worker); + } +} + +/* + * Dynamically launch an SPI worker. + */ +Datum +worker_spi_launch(PG_FUNCTION_ARGS) +{ + int32 i = PG_GETARG_INT32(0); + BackgroundWorker worker; + BackgroundWorkerHandle *handle; + BgwHandleStatus status; + pid_t pid; + + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_RecoveryFinished; + worker.bgw_restart_time = BGW_NEVER_RESTART; + worker.bgw_main = NULL; /* new worker might not have library loaded */ + sprintf(worker.bgw_library_name, "worker_spi"); + sprintf(worker.bgw_function_name, "worker_spi_main"); + snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i); + worker.bgw_main_arg = Int32GetDatum(i); + /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */ + worker.bgw_notify_pid = MyProcPid; + + if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + PG_RETURN_NULL(); + + status = WaitForBackgroundWorkerStartup(handle, &pid); + + if (status == BGWH_STOPPED) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not start background process"), + errhint("More details may be available in the server log."))); + if (status == BGWH_POSTMASTER_DIED) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("cannot start background processes without postmaster"), + errhint("Kill all remaining database processes and restart the database."))); + Assert(status == BGWH_STARTED); + + PG_RETURN_INT32(pid); +} diff --git a/src/test/modules/worker_spi/worker_spi.control b/src/test/modules/worker_spi/worker_spi.control new file mode 100644 index 00000000000..84d6294628a --- /dev/null +++ b/src/test/modules/worker_spi/worker_spi.control @@ -0,0 +1,5 @@ +# worker_spi extension +comment = 'Sample background worker' +default_version = '1.0' +module_pathname = '$libdir/worker_spi' +relocatable = true diff --git a/src/test/regress/GNUmakefile b/src/test/regress/GNUmakefile index b40b37c0b74..77fe8b620d4 100644 --- a/src/test/regress/GNUmakefile +++ b/src/test/regress/GNUmakefile @@ -101,7 +101,7 @@ installdirs-tests: installdirs $(MKDIR_P) $(patsubst $(srcdir)/%/,'$(DESTDIR)$(pkglibdir)/regress/%',$(sort $(dir $(regress_data_files)))) -# Get some extra C modules from contrib/spi and contrib/dummy_seclabel... +# Get some extra C modules from contrib/spi and src/test/modules/dummy_seclabel... all: refint$(DLSUFFIX) autoinc$(DLSUFFIX) dummy_seclabel$(DLSUFFIX) @@ -111,22 +111,22 @@ refint$(DLSUFFIX): $(top_builddir)/contrib/spi/refint$(DLSUFFIX) autoinc$(DLSUFFIX): $(top_builddir)/contrib/spi/autoinc$(DLSUFFIX) cp $< $@ -dummy_seclabel$(DLSUFFIX): $(top_builddir)/contrib/dummy_seclabel/dummy_seclabel$(DLSUFFIX) +dummy_seclabel$(DLSUFFIX): $(top_builddir)/src/test/modules/dummy_seclabel/dummy_seclabel$(DLSUFFIX) cp $< $@ $(top_builddir)/contrib/spi/refint$(DLSUFFIX): | submake-contrib-spi ; $(top_builddir)/contrib/spi/autoinc$(DLSUFFIX): | submake-contrib-spi ; -$(top_builddir)/contrib/dummy_seclabel/dummy_seclabel$(DLSUFFIX): | submake-contrib-dummy_seclabel ; +$(top_builddir)/src/test/modules/dummy_seclabel/dummy_seclabel$(DLSUFFIX): | submake-dummy_seclabel ; submake-contrib-spi: $(MAKE) -C $(top_builddir)/contrib/spi -submake-contrib-dummy_seclabel: - $(MAKE) -C $(top_builddir)/contrib/dummy_seclabel +submake-dummy_seclabel: + $(MAKE) -C $(top_builddir)/src/test/modules/dummy_seclabel -.PHONY: submake-contrib-spi submake-contrib-dummy_seclabel +.PHONY: submake-contrib-spi submake-dummy_seclabel # Tablespace setup |