aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon Riggs <simon@2ndQuadrant.com>2016-04-06 10:05:41 +0100
committerSimon Riggs <simon@2ndQuadrant.com>2016-04-06 10:05:41 +0100
commit3fe3511d05127cc024b221040db2eeb352e7d716 (patch)
treeb17a084bec318a70a1c0fcd755596b771871bce7 /src
parent989be0810dffd08b54e1caecec0677608211c339 (diff)
downloadpostgresql-3fe3511d05127cc024b221040db2eeb352e7d716.tar.gz
postgresql-3fe3511d05127cc024b221040db2eeb352e7d716.zip
Generic Messages for Logical Decoding
API and mechanism to allow generic messages to be inserted into WAL that are intended to be read by logical decoding plugins. This commit adds an optional new callback to the logical decoding API. Messages are either text or bytea. Messages can be transactional, or not, and are identified by a prefix to allow multiple concurrent decoding plugins. (Not to be confused with Generic WAL records, which are intended to allow crash recovery of extensible objects.) Author: Petr Jelinek and Andres Freund Reviewers: Artur Zakirov, Tomas Vondra, Simon Riggs Discussion: 5685F999.6010202@2ndquadrant.com
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/rmgrdesc/Makefile6
-rw-r--r--src/backend/access/rmgrdesc/logicalmsgdesc.c41
-rw-r--r--src/backend/access/transam/rmgr.c1
-rw-r--r--src/backend/replication/logical/Makefile2
-rw-r--r--src/backend/replication/logical/decode.c46
-rw-r--r--src/backend/replication/logical/logical.c38
-rw-r--r--src/backend/replication/logical/logicalfuncs.c27
-rw-r--r--src/backend/replication/logical/message.c87
-rw-r--r--src/backend/replication/logical/reorderbuffer.c121
-rw-r--r--src/backend/replication/logical/snapbuild.c19
-rw-r--r--src/bin/pg_xlogdump/.gitignore21
-rw-r--r--src/bin/pg_xlogdump/rmgrdesc.c1
-rw-r--r--src/include/access/rmgrlist.h1
-rw-r--r--src/include/catalog/pg_proc.h4
-rw-r--r--src/include/replication/logicalfuncs.h2
-rw-r--r--src/include/replication/message.h41
-rw-r--r--src/include/replication/output_plugin.h13
-rw-r--r--src/include/replication/reorderbuffer.h22
-rw-r--r--src/include/replication/snapbuild.h2
19 files changed, 471 insertions, 24 deletions
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index c0e38fdf17d..5514db1dda6 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -9,8 +9,8 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o genericdesc.o \
- gindesc.o gistdesc.o hashdesc.o heapdesc.o mxactdesc.o nbtdesc.o \
- relmapdesc.o replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \
- standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
+ gindesc.o gistdesc.o hashdesc.o heapdesc.o logicalmsgdesc.o \
+ mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o seqdesc.o \
+ smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/rmgrdesc/logicalmsgdesc.c b/src/backend/access/rmgrdesc/logicalmsgdesc.c
new file mode 100644
index 00000000000..b194e1424d8
--- /dev/null
+++ b/src/backend/access/rmgrdesc/logicalmsgdesc.c
@@ -0,0 +1,41 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalmsgdesc.c
+ * rmgr descriptor routines for replication/logical/message.c
+ *
+ * Portions Copyright (c) 2015-2016, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/rmgrdesc/logicalmsgdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "replication/message.h"
+
+void
+logicalmsg_desc(StringInfo buf, XLogReaderState *record)
+{
+ char *rec = XLogRecGetData(record);
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ if (info == XLOG_LOGICAL_MESSAGE)
+ {
+ xl_logical_message *xlrec = (xl_logical_message *) rec;
+
+ appendStringInfo(buf, "%s message size %zu bytes",
+ xlrec->transactional ? "transactional" : "nontransactional",
+ xlrec->message_size);
+ }
+}
+
+const char *
+logicalmsg_identify(uint8 info)
+{
+ if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE)
+ return "MESSAGE";
+
+ return NULL;
+}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 7b38c16f521..31c5fd165c0 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,6 +24,7 @@
#include "commands/dbcommands_xlog.h"
#include "commands/sequence.h"
#include "commands/tablespace.h"
+#include "replication/message.h"
#include "replication/origin.h"
#include "storage/standby.h"
#include "utils/relmapper.h"
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 8adea13bf4e..1d7ca062d11 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,7 +14,7 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
-OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o origin.o \
+OBJS = decode.o logical.o logicalfuncs.o message.o origin.o reorderbuffer.o \
snapbuild.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 7781ebcae0b..3e80c4a0d86 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -39,6 +39,7 @@
#include "replication/decode.h"
#include "replication/logical.h"
+#include "replication/message.h"
#include "replication/reorderbuffer.h"
#include "replication/origin.h"
#include "replication/snapbuild.h"
@@ -58,6 +59,7 @@ static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
/* individual record(group)'s handlers */
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -123,6 +125,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
DecodeHeapOp(ctx, &buf);
break;
+ case RM_LOGICALMSG_ID:
+ DecodeLogicalMsgOp(ctx, &buf);
+ break;
+
/*
* Rmgrs irrelevant for logical decoding; they describe stuff not
* represented in logical decoding. Add new rmgrs in rmgrlist.h's
@@ -458,6 +464,46 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
}
}
+/*
+ * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ SnapBuild *builder = ctx->snapshot_builder;
+ XLogReaderState *r = buf->record;
+ TransactionId xid = XLogRecGetXid(r);
+ uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+ Snapshot snapshot;
+ xl_logical_message *message;
+
+ if (info != XLOG_LOGICAL_MESSAGE)
+ elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
+
+ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+
+ /* No point in doing anything yet. */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+ return;
+
+ message = (xl_logical_message *) XLogRecGetData(r);
+
+ if (message->transactional &&
+ !SnapBuildProcessChange(builder, xid, buf->origptr))
+ return;
+ else if (!message->transactional &&
+ (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
+ SnapBuildXactNeedsSkip(builder, buf->origptr)))
+ return;
+
+ snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
+ ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
+ message->transactional,
+ message->message, /* first part of message is prefix */
+ message->message_size,
+ message->message + message->prefix_size);
+}
+
static inline bool
FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
{
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2e6d3f9203c..c06b2fa2859 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -62,6 +62,9 @@ static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
+static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional,
+ const char *prefix, Size message_size, const char *message);
static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
@@ -178,6 +181,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->begin = begin_cb_wrapper;
ctx->reorder->apply_change = change_cb_wrapper;
ctx->reorder->commit = commit_cb_wrapper;
+ ctx->reorder->message = message_cb_wrapper;
ctx->out = makeStringInfo();
ctx->prepare_write = prepare_write;
@@ -702,6 +706,40 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
return ret;
}
+static void
+message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn, bool transactional,
+ const char *prefix, Size message_size, const char *message)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ if (ctx->callbacks.message_cb == NULL)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "message";
+ state.report_location = message_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+ ctx->write_location = message_lsn;
+
+ /* do the actual work: call callback */
+ ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
+ message_size, message);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
/*
* Set the required catalog xmin horizon for historic snapshots in the current
* replication slot.
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index dd6cd62ccd1..69d20003a66 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -24,6 +24,8 @@
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
+#include "access/xact.h"
+
#include "catalog/pg_type.h"
#include "nodes/makefuncs.h"
@@ -41,6 +43,7 @@
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/logicalfuncs.h"
+#include "replication/message.h"
#include "storage/fd.h"
@@ -380,3 +383,27 @@ pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
{
return pg_logical_slot_get_changes_guts(fcinfo, false, true);
}
+
+
+/*
+ * SQL function for writing logical decding message into WAL.
+ */
+Datum
+pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
+{
+ bool transactional = PG_GETARG_BOOL(0);
+ char *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
+ bytea *data = PG_GETARG_BYTEA_PP(2);
+ XLogRecPtr lsn;
+
+ lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
+ transactional);
+ PG_RETURN_LSN(lsn);
+}
+
+Datum
+pg_logical_emit_message_text(PG_FUNCTION_ARGS)
+{
+ /* bytea and text are compatible */
+ return pg_logical_emit_message_bytea(fcinfo);
+}
diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c
new file mode 100644
index 00000000000..684f7998263
--- /dev/null
+++ b/src/backend/replication/logical/message.c
@@ -0,0 +1,87 @@
+/*-------------------------------------------------------------------------
+ *
+ * message.c
+ * Generic logical messages.
+ *
+ * Copyright (c) 2013-2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/message.c
+ *
+ * NOTES
+ *
+ * Generic logical messages allow XLOG logging of arbitrary binary blobs that
+ * get passed to the logical decoding plugin. In normal XLOG processing they
+ * are same as NOOP.
+ *
+ * These messages can be either transactional or non-transactional.
+ * Transactional messages are part of current transaction and will be sent to
+ * decoding plugin using in a same way as DML operations.
+ * Non-transactional messages are sent to the plugin at the time when the
+ * logical decoding reads them from XLOG. This also means that transactional
+ * messages won't be delivered if the transaction was rolled back but the
+ * non-transactional one will be delivered always.
+ *
+ * Every message carries prefix to avoid conflicts between different decoding
+ * plugins. The plugin authors must take extra care to use unique prefix,
+ * good options seems to be for example to use the name of the extension.
+ *
+ * ---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+
+#include "catalog/indexing.h"
+
+#include "nodes/execnodes.h"
+
+#include "replication/message.h"
+#include "replication/logical.h"
+
+#include "utils/memutils.h"
+
+/*
+ * Write logical decoding message into XLog.
+ */
+XLogRecPtr
+LogLogicalMessage(const char *prefix, const char *message, size_t size,
+ bool transactional)
+{
+ xl_logical_message xlrec;
+
+ /*
+ * Force xid to be allocated if we're emitting a transactional message.
+ */
+ if (transactional)
+ {
+ Assert(IsTransactionState());
+ GetCurrentTransactionId();
+ }
+
+ xlrec.transactional = transactional;
+ xlrec.prefix_size = strlen(prefix) + 1;
+ xlrec.message_size = size;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
+ XLogRegisterData((char *) prefix, xlrec.prefix_size);
+ XLogRegisterData((char *) message, size);
+
+ return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
+}
+
+/*
+ * Redo is basically just noop for logical decoding messages.
+ */
+void
+logicalmsg_redo(XLogReaderState *record)
+{
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ if (info != XLOG_LOGICAL_MESSAGE)
+ elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
+
+ /* This is only interesting for logical decoding, see decode.c. */
+}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 9d78c8c134e..52c6986dc0c 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -414,6 +414,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
change->data.tp.oldtuple = NULL;
}
break;
+ case REORDER_BUFFER_CHANGE_MESSAGE:
+ if (change->data.msg.prefix != NULL)
+ pfree(change->data.msg.prefix);
+ change->data.msg.prefix = NULL;
+ if (change->data.msg.message != NULL)
+ pfree(change->data.msg.message);
+ change->data.msg.message = NULL;
+ break;
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
if (change->data.snapshot)
{
@@ -627,6 +635,61 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
ReorderBufferCheckSerializeTXN(rb, txn);
}
+/*
+ * Queue message into a transaction so it can be processed upon commit.
+ */
+void
+ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
+ Snapshot snapshot, XLogRecPtr lsn,
+ bool transactional, const char *prefix,
+ Size message_size, const char *message)
+{
+ if (transactional)
+ {
+ MemoryContext oldcontext;
+ ReorderBufferChange *change;
+
+ Assert(xid != InvalidTransactionId);
+
+ oldcontext = MemoryContextSwitchTo(rb->context);
+
+ change = ReorderBufferGetChange(rb);
+ change->action = REORDER_BUFFER_CHANGE_MESSAGE;
+ change->data.msg.prefix = pstrdup(prefix);
+ change->data.msg.message_size = message_size;
+ change->data.msg.message = palloc(message_size);
+ memcpy(change->data.msg.message, message, message_size);
+
+ ReorderBufferQueueChange(rb, xid, lsn, change);
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+ else
+ {
+ ReorderBufferTXN *txn = NULL;
+ volatile Snapshot snapshot_now = snapshot;
+
+ if (xid != InvalidTransactionId)
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+ /* setup snapshot to allow catalog access */
+ SetupHistoricSnapshot(snapshot_now, NULL);
+ PG_TRY();
+ {
+ rb->message(rb, txn, lsn, false, prefix, message_size, message);
+
+ TeardownHistoricSnapshot(false);
+ }
+ PG_CATCH();
+ {
+ TeardownHistoricSnapshot(true);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+}
+
+
static void
AssertTXNLsnOrder(ReorderBuffer *rb)
{
@@ -1493,6 +1556,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
specinsert = change;
break;
+ case REORDER_BUFFER_CHANGE_MESSAGE:
+ rb->message(rb, txn, change->lsn, true,
+ change->data.msg.prefix,
+ change->data.msg.message_size,
+ change->data.msg.message);
+ break;
+
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
/* get rid of the old */
TeardownHistoricSnapshot(false);
@@ -2159,6 +2229,33 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
}
break;
}
+ case REORDER_BUFFER_CHANGE_MESSAGE:
+ {
+ char *data;
+ Size prefix_size = strlen(change->data.msg.prefix) + 1;
+
+ sz += prefix_size + change->data.msg.message_size +
+ sizeof(Size) + sizeof(Size);
+ ReorderBufferSerializeReserve(rb, sz);
+
+ data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+
+ /* write the prefix including the size */
+ memcpy(data, &prefix_size, sizeof(Size));
+ data += sizeof(Size);
+ memcpy(data, change->data.msg.prefix,
+ prefix_size);
+ data += prefix_size;
+
+ /* write the message including the size */
+ memcpy(data, &change->data.msg.message_size, sizeof(Size));
+ data += sizeof(Size);
+ memcpy(data, change->data.msg.message,
+ change->data.msg.message_size);
+ data += change->data.msg.message_size;
+
+ break;
+ }
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
{
Snapshot snap;
@@ -2415,6 +2512,30 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
}
break;
+ case REORDER_BUFFER_CHANGE_MESSAGE:
+ {
+ Size prefix_size;
+
+ /* read prefix */
+ memcpy(&prefix_size, data, sizeof(Size));
+ data += sizeof(Size);
+ change->data.msg.prefix = MemoryContextAlloc(rb->context,
+ prefix_size);
+ memcpy(change->data.msg.prefix, data, prefix_size);
+ Assert(change->data.msg.prefix[prefix_size-1] == '\0');
+ data += prefix_size;
+
+ /* read the messsage */
+ memcpy(&change->data.msg.message_size, data, sizeof(Size));
+ data += sizeof(Size);
+ change->data.msg.message = MemoryContextAlloc(rb->context,
+ change->data.msg.message_size);
+ memcpy(change->data.msg.message, data,
+ change->data.msg.message_size);
+ data += change->data.msg.message_size;
+
+ break;
+ }
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
{
Snapshot oldsnap;
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 179b85a4161..b4dc617a8c9 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -605,6 +605,25 @@ SnapBuildExportSnapshot(SnapBuild *builder)
}
/*
+ * Ensure there is a snapshot and if not build one for current transaction.
+ */
+Snapshot
+SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid)
+{
+ Assert(builder->state == SNAPBUILD_CONSISTENT);
+
+ /* only build a new snapshot if we don't have a prebuilt one */
+ if (builder->snapshot == NULL)
+ {
+ builder->snapshot = SnapBuildBuildSnapshot(builder, xid);
+ /* inrease refcount for the snapshot builder */
+ SnapBuildSnapIncRefcount(builder->snapshot);
+ }
+
+ return builder->snapshot;
+}
+
+/*
* Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
* any. Aborts the previously started transaction and resets the resource
* owner back to its original value.
diff --git a/src/bin/pg_xlogdump/.gitignore b/src/bin/pg_xlogdump/.gitignore
index 33a1acfd2cb..c4783f12bb5 100644
--- a/src/bin/pg_xlogdump/.gitignore
+++ b/src/bin/pg_xlogdump/.gitignore
@@ -1,23 +1,4 @@
/pg_xlogdump
# Source files copied from src/backend/access/rmgrdesc/
-/brindesc.c
-/clogdesc.c
-/committsdesc.c
-/dbasedesc.c
-/genericdesc.c
-/gindesc.c
-/gistdesc.c
-/hashdesc.c
-/heapdesc.c
-/mxactdesc.c
-/nbtdesc.c
-/relmapdesc.c
-/replorigindesc.c
-/seqdesc.c
-/smgrdesc.c
-/spgdesc.c
-/standbydesc.c
-/tblspcdesc.c
-/xactdesc.c
-/xlogdesc.c
+/*desc.c
/xlogreader.c
diff --git a/src/bin/pg_xlogdump/rmgrdesc.c b/src/bin/pg_xlogdump/rmgrdesc.c
index cff7e59f34a..017b9c5b345 100644
--- a/src/bin/pg_xlogdump/rmgrdesc.c
+++ b/src/bin/pg_xlogdump/rmgrdesc.c
@@ -26,6 +26,7 @@
#include "commands/dbcommands_xlog.h"
#include "commands/sequence.h"
#include "commands/tablespace.h"
+#include "replication/message.h"
#include "replication/origin.h"
#include "rmgrdesc.h"
#include "storage/standbydefs.h"
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 3cfe6f7b546..a7a0ae224fd 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -46,3 +46,4 @@ PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL)
+PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL)
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index a0edf939b3a..6eab6c7c334 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5155,6 +5155,10 @@ DATA(insert OID = 3784 ( pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000
DESCR("peek at changes from replication slot");
DATA(insert OID = 3785 ( pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ ));
DESCR("peek at binary changes from replication slot");
+DATA(insert OID = 3577 ( pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 25" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_text _null_ _null_ _null_ ));
+DESCR("emit a textual logical decoding message");
+DATA(insert OID = 3578 ( pg_logical_emit_message PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 3220 "16 25 17" _null_ _null_ _null_ _null_ _null_ pg_logical_emit_message_bytea _null_ _null_ _null_ ));
+DESCR("emit a binary logical decoding message");
/* event triggers */
DATA(insert OID = 3566 ( pg_event_trigger_dropped_objects PGNSP PGUID 12 10 100 0 0 f f f f t t s s 0 0 2249 "" "{26,26,23,16,16,16,25,25,25,25,1009,1009}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{classid, objid, objsubid, original, normal, is_temporary, object_type, schema_name, object_name, object_identity, address_names, address_args}" _null_ _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ ));
diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h
index c87a1dfebde..554041405c4 100644
--- a/src/include/replication/logicalfuncs.h
+++ b/src/include/replication/logicalfuncs.h
@@ -21,4 +21,6 @@ extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS);
extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS);
extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS);
+extern Datum pg_logical_emit_message_bytea(PG_FUNCTION_ARGS);
+extern Datum pg_logical_emit_message_text(PG_FUNCTION_ARGS);
#endif
diff --git a/src/include/replication/message.h b/src/include/replication/message.h
new file mode 100644
index 00000000000..8b968d5288e
--- /dev/null
+++ b/src/include/replication/message.h
@@ -0,0 +1,41 @@
+/*-------------------------------------------------------------------------
+ * message.h
+ * Exports from replication/logical/message.c
+ *
+ * Copyright (c) 2013-2016, PostgreSQL Global Development Group
+ *
+ * src/include/replication/message.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_MESSAGE_H
+#define PG_LOGICAL_MESSAGE_H
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+
+/*
+ * Generic logical decoding message wal record.
+ */
+typedef struct xl_logical_message
+{
+ bool transactional; /* is message transactional? */
+ Size prefix_size; /* length of prefix */
+ Size message_size; /* size of the message */
+ char message[FLEXIBLE_ARRAY_MEMBER]; /* message including the null
+ * terminated prefix of length
+ * prefix_size */
+} xl_logical_message;
+
+#define SizeOfLogicalMessage (offsetof(xl_logical_message, message))
+
+extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
+ size_t size, bool transactional);
+
+/* RMGR API*/
+#define XLOG_LOGICAL_MESSAGE 0x00
+void logicalmsg_redo(XLogReaderState *record);
+void logicalmsg_desc(StringInfo buf, XLogReaderState *record);
+const char *logicalmsg_identify(uint8 info);
+
+#endif /* PG_LOGICAL_MESSAGE_H */
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 577b12ea2ef..3a2ca985fbe 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -74,6 +74,18 @@ typedef void (*LogicalDecodeCommitCB) (
XLogRecPtr commit_lsn);
/*
+ * Called for the generic logical decoding messages.
+ */
+typedef void (*LogicalDecodeMessageCB) (
+ struct LogicalDecodingContext *,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix,
+ Size message_size,
+ const char *message);
+
+/*
* Filter changes by origin.
*/
typedef bool (*LogicalDecodeFilterByOriginCB) (
@@ -96,6 +108,7 @@ typedef struct OutputPluginCallbacks
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeCommitCB commit_cb;
+ LogicalDecodeMessageCB message_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
} OutputPluginCallbacks;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index b52d06af928..4c54953a512 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -54,6 +54,7 @@ enum ReorderBufferChangeType
REORDER_BUFFER_CHANGE_INSERT,
REORDER_BUFFER_CHANGE_UPDATE,
REORDER_BUFFER_CHANGE_DELETE,
+ REORDER_BUFFER_CHANGE_MESSAGE,
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
@@ -98,6 +99,14 @@ typedef struct ReorderBufferChange
ReorderBufferTupleBuf *newtuple;
} tp;
+ /* Message with arbitrary data. */
+ struct
+ {
+ char *prefix;
+ Size message_size;
+ char *message;
+ } msg;
+
/* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
Snapshot snapshot;
@@ -274,6 +283,15 @@ typedef void (*ReorderBufferCommitCB) (
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
+/* message callback signature */
+typedef void (*ReorderBufferMessageCB) (
+ ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr message_lsn,
+ bool transactional,
+ const char *prefix, Size sz,
+ const char *message);
+
struct ReorderBuffer
{
/*
@@ -300,6 +318,7 @@ struct ReorderBuffer
ReorderBufferBeginCB begin;
ReorderBufferApplyChangeCB apply_change;
ReorderBufferCommitCB commit;
+ ReorderBufferMessageCB message;
/*
* Pointer that will be passed untouched to the callbacks.
@@ -350,6 +369,9 @@ ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
+void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
+ bool transactional, const char *prefix,
+ Size message_size, const char *message);
void ReorderBufferCommit(ReorderBuffer *, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index 75955afd391..c4127a1cf75 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -63,6 +63,8 @@ extern const char *SnapBuildExportSnapshot(SnapBuild *snapstate);
extern void SnapBuildClearExportedSnapshot(void);
extern SnapBuildState SnapBuildCurrentState(SnapBuild *snapstate);
+extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
+ TransactionId xid);
extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);