diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/catalog/system_functions.sql | 20 | ||||
-rw-r--r-- | src/backend/replication/logical/logicalfuncs.c | 3 | ||||
-rw-r--r-- | src/backend/replication/logical/message.c | 13 | ||||
-rw-r--r-- | src/include/catalog/catversion.h | 2 | ||||
-rw-r--r-- | src/include/catalog/pg_proc.dat | 4 | ||||
-rw-r--r-- | src/include/replication/message.h | 3 |
6 files changed, 38 insertions, 7 deletions
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 07c0d89c4f8..35d738d5763 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -446,6 +446,26 @@ LANGUAGE INTERNAL VOLATILE ROWS 1000 COST 1000 AS 'pg_logical_slot_peek_binary_changes'; +CREATE OR REPLACE FUNCTION pg_logical_emit_message( + transactional boolean, + prefix text, + message text, + flush boolean DEFAULT false) +RETURNS pg_lsn +LANGUAGE INTERNAL +STRICT VOLATILE +AS 'pg_logical_emit_message_text'; + +CREATE OR REPLACE FUNCTION pg_logical_emit_message( + transactional boolean, + prefix text, + message bytea, + flush boolean DEFAULT false) +RETURNS pg_lsn +LANGUAGE INTERNAL +STRICT VOLATILE +AS 'pg_logical_emit_message_bytea'; + CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot( IN slot_name name, IN immediately_reserve boolean DEFAULT false, IN temporary boolean DEFAULT false, diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 197169d6b0d..1067aca08fc 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -362,10 +362,11 @@ pg_logical_emit_message_bytea(PG_FUNCTION_ARGS) bool transactional = PG_GETARG_BOOL(0); char *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1)); bytea *data = PG_GETARG_BYTEA_PP(2); + bool flush = PG_GETARG_BOOL(3); XLogRecPtr lsn; lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data), - transactional); + transactional, flush); PG_RETURN_LSN(lsn); } diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c index c5de14afc65..b5d29382f54 100644 --- a/src/backend/replication/logical/message.c +++ b/src/backend/replication/logical/message.c @@ -44,9 +44,10 @@ */ XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, size_t size, - bool transactional) + bool transactional, bool flush) { xl_logical_message xlrec; + XLogRecPtr lsn; /* * Force xid to be allocated if we're emitting a transactional message. @@ -71,7 +72,15 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size, /* allow origin filtering */ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); - return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE); + lsn = XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE); + + /* + * Make sure that the message hits disk before leaving if emitting a + * non-transactional message when flush is requested. + */ + if (!transactional && flush) + XLogFlush(lsn); + return lsn; } /* diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index c5f4af24dc1..2f46fdc7391 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202310161 +#define CATALOG_VERSION_NO 202310181 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 72ea4aa8b8c..c92d0631a01 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11167,11 +11167,11 @@ prosrc => 'pg_replication_slot_advance' }, { oid => '3577', descr => 'emit a textual logical decoding message', proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u', - prorettype => 'pg_lsn', proargtypes => 'bool text text', + prorettype => 'pg_lsn', proargtypes => 'bool text text bool', prosrc => 'pg_logical_emit_message_text' }, { oid => '3578', descr => 'emit a binary logical decoding message', proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u', - prorettype => 'pg_lsn', proargtypes => 'bool text bytea', + prorettype => 'pg_lsn', proargtypes => 'bool text bytea bool', prosrc => 'pg_logical_emit_message_bytea' }, # event triggers diff --git a/src/include/replication/message.h b/src/include/replication/message.h index 6ce7f2038b2..0f168d572c1 100644 --- a/src/include/replication/message.h +++ b/src/include/replication/message.h @@ -30,7 +30,8 @@ typedef struct xl_logical_message #define SizeOfLogicalMessage (offsetof(xl_logical_message, message)) extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, - size_t size, bool transactional); + size_t size, bool transactional, + bool flush); /* RMGR API */ #define XLOG_LOGICAL_MESSAGE 0x00 |