aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/expected/messages.out5
-rw-r--r--contrib/test_decoding/sql/messages.sql5
-rw-r--r--doc/src/sgml/func.sgml9
-rw-r--r--src/backend/catalog/system_functions.sql20
-rw-r--r--src/backend/replication/logical/logicalfuncs.c3
-rw-r--r--src/backend/replication/logical/message.c13
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_proc.dat4
-rw-r--r--src/include/replication/message.h3
9 files changed, 51 insertions, 13 deletions
diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out
index 0fd70036bd5..84baf8af3ee 100644
--- a/contrib/test_decoding/expected/messages.out
+++ b/contrib/test_decoding/expected/messages.out
@@ -6,13 +6,14 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
init
(1 row)
-SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
+-- These two cover the path for the flush variant.
+SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1', true);
?column?
----------
msg1
(1 row)
-SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
+SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2', true);
?column?
----------
msg2
diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql
index 3d8500f99cb..1f3dcb63ee7 100644
--- a/contrib/test_decoding/sql/messages.sql
+++ b/contrib/test_decoding/sql/messages.sql
@@ -3,8 +3,9 @@ SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
-SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
-SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
+-- These two cover the path for the flush variant.
+SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1', true);
+SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2', true);
BEGIN;
SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3');
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index affd1254bb7..7c3e940afef 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -27740,11 +27740,11 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
<indexterm>
<primary>pg_logical_emit_message</primary>
</indexterm>
- <function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>text</type> )
+ <function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>text</type> [, <parameter>flush</parameter> <type>boolean</type> <literal>DEFAULT</literal> <literal>false</literal>] )
<returnvalue>pg_lsn</returnvalue>
</para>
<para role="func_signature">
- <function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>bytea</type> )
+ <function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>bytea</type> [, <parameter>flush</parameter> <type>boolean</type> <literal>DEFAULT</literal> <literal>false</literal>] )
<returnvalue>pg_lsn</returnvalue>
</para>
<para>
@@ -27758,6 +27758,11 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
recognize messages that are interesting for them.
The <parameter>content</parameter> parameter is the content of the
message, given either in text or binary form.
+ The <parameter>flush</parameter> parameter (default set to
+ <literal>false</literal>) controls if the message is immediately
+ flushed to WAL or not. <parameter>flush</parameter> has no effect
+ with <parameter>transactional</parameter>, as the message's WAL
+ record is flushed along with its transaction.
</para></entry>
</row>
</tbody>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 07c0d89c4f8..35d738d5763 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -446,6 +446,26 @@ LANGUAGE INTERNAL
VOLATILE ROWS 1000 COST 1000
AS 'pg_logical_slot_peek_binary_changes';
+CREATE OR REPLACE FUNCTION pg_logical_emit_message(
+ transactional boolean,
+ prefix text,
+ message text,
+ flush boolean DEFAULT false)
+RETURNS pg_lsn
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_logical_emit_message_text';
+
+CREATE OR REPLACE FUNCTION pg_logical_emit_message(
+ transactional boolean,
+ prefix text,
+ message bytea,
+ flush boolean DEFAULT false)
+RETURNS pg_lsn
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_logical_emit_message_bytea';
+
CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
IN slot_name name, IN immediately_reserve boolean DEFAULT false,
IN temporary boolean DEFAULT false,
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 197169d6b0d..1067aca08fc 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -362,10 +362,11 @@ pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
bool transactional = PG_GETARG_BOOL(0);
char *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
bytea *data = PG_GETARG_BYTEA_PP(2);
+ bool flush = PG_GETARG_BOOL(3);
XLogRecPtr lsn;
lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
- transactional);
+ transactional, flush);
PG_RETURN_LSN(lsn);
}
diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c
index c5de14afc65..b5d29382f54 100644
--- a/src/backend/replication/logical/message.c
+++ b/src/backend/replication/logical/message.c
@@ -44,9 +44,10 @@
*/
XLogRecPtr
LogLogicalMessage(const char *prefix, const char *message, size_t size,
- bool transactional)
+ bool transactional, bool flush)
{
xl_logical_message xlrec;
+ XLogRecPtr lsn;
/*
* Force xid to be allocated if we're emitting a transactional message.
@@ -71,7 +72,15 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
/* allow origin filtering */
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
- return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
+ lsn = XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
+
+ /*
+ * Make sure that the message hits disk before leaving if emitting a
+ * non-transactional message when flush is requested.
+ */
+ if (!transactional && flush)
+ XLogFlush(lsn);
+ return lsn;
}
/*
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index c5f4af24dc1..2f46fdc7391 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -57,6 +57,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202310161
+#define CATALOG_VERSION_NO 202310181
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 72ea4aa8b8c..c92d0631a01 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11167,11 +11167,11 @@
prosrc => 'pg_replication_slot_advance' },
{ oid => '3577', descr => 'emit a textual logical decoding message',
proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u',
- prorettype => 'pg_lsn', proargtypes => 'bool text text',
+ prorettype => 'pg_lsn', proargtypes => 'bool text text bool',
prosrc => 'pg_logical_emit_message_text' },
{ oid => '3578', descr => 'emit a binary logical decoding message',
proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u',
- prorettype => 'pg_lsn', proargtypes => 'bool text bytea',
+ prorettype => 'pg_lsn', proargtypes => 'bool text bytea bool',
prosrc => 'pg_logical_emit_message_bytea' },
# event triggers
diff --git a/src/include/replication/message.h b/src/include/replication/message.h
index 6ce7f2038b2..0f168d572c1 100644
--- a/src/include/replication/message.h
+++ b/src/include/replication/message.h
@@ -30,7 +30,8 @@ typedef struct xl_logical_message
#define SizeOfLogicalMessage (offsetof(xl_logical_message, message))
extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
- size_t size, bool transactional);
+ size_t size, bool transactional,
+ bool flush);
/* RMGR API */
#define XLOG_LOGICAL_MESSAGE 0x00