diff options
author | Simon Riggs <simon@2ndQuadrant.com> | 2016-04-06 10:05:41 +0100 |
---|---|---|
committer | Simon Riggs <simon@2ndQuadrant.com> | 2016-04-06 10:05:41 +0100 |
commit | 3fe3511d05127cc024b221040db2eeb352e7d716 (patch) | |
tree | b17a084bec318a70a1c0fcd755596b771871bce7 /src/backend/replication/logical/message.c | |
parent | 989be0810dffd08b54e1caecec0677608211c339 (diff) | |
download | postgresql-3fe3511d05127cc024b221040db2eeb352e7d716.tar.gz postgresql-3fe3511d05127cc024b221040db2eeb352e7d716.zip |
Generic Messages for Logical Decoding
API and mechanism to allow generic messages to be inserted into WAL that are
intended to be read by logical decoding plugins. This commit adds an optional
new callback to the logical decoding API.
Messages are either text or bytea. Messages can be transactional, or not, and
are identified by a prefix to allow multiple concurrent decoding plugins.
(Not to be confused with Generic WAL records, which are intended to allow crash
recovery of extensible objects.)
Author: Petr Jelinek and Andres Freund
Reviewers: Artur Zakirov, Tomas Vondra, Simon Riggs
Discussion: 5685F999.6010202@2ndquadrant.com
Diffstat (limited to 'src/backend/replication/logical/message.c')
-rw-r--r-- | src/backend/replication/logical/message.c | 87 |
1 files changed, 87 insertions, 0 deletions
diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c new file mode 100644 index 00000000000..684f7998263 --- /dev/null +++ b/src/backend/replication/logical/message.c @@ -0,0 +1,87 @@ +/*------------------------------------------------------------------------- + * + * message.c + * Generic logical messages. + * + * Copyright (c) 2013-2016, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/message.c + * + * NOTES + * + * Generic logical messages allow XLOG logging of arbitrary binary blobs that + * get passed to the logical decoding plugin. In normal XLOG processing they + * are same as NOOP. + * + * These messages can be either transactional or non-transactional. + * Transactional messages are part of current transaction and will be sent to + * decoding plugin using in a same way as DML operations. + * Non-transactional messages are sent to the plugin at the time when the + * logical decoding reads them from XLOG. This also means that transactional + * messages won't be delivered if the transaction was rolled back but the + * non-transactional one will be delivered always. + * + * Every message carries prefix to avoid conflicts between different decoding + * plugins. The plugin authors must take extra care to use unique prefix, + * good options seems to be for example to use the name of the extension. + * + * --------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xact.h" + +#include "catalog/indexing.h" + +#include "nodes/execnodes.h" + +#include "replication/message.h" +#include "replication/logical.h" + +#include "utils/memutils.h" + +/* + * Write logical decoding message into XLog. + */ +XLogRecPtr +LogLogicalMessage(const char *prefix, const char *message, size_t size, + bool transactional) +{ + xl_logical_message xlrec; + + /* + * Force xid to be allocated if we're emitting a transactional message. + */ + if (transactional) + { + Assert(IsTransactionState()); + GetCurrentTransactionId(); + } + + xlrec.transactional = transactional; + xlrec.prefix_size = strlen(prefix) + 1; + xlrec.message_size = size; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage); + XLogRegisterData((char *) prefix, xlrec.prefix_size); + XLogRegisterData((char *) message, size); + + return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE); +} + +/* + * Redo is basically just noop for logical decoding messages. + */ +void +logicalmsg_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info != XLOG_LOGICAL_MESSAGE) + elog(PANIC, "logicalmsg_redo: unknown op code %u", info); + + /* This is only interesting for logical decoding, see decode.c. */ +} |