diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/system_views.sql | 7 | ||||
-rw-r--r-- | src/backend/replication/logical/logical.c | 47 | ||||
-rw-r--r-- | src/backend/replication/slot.c | 71 | ||||
-rw-r--r-- | src/backend/replication/slotfuncs.c | 21 |
4 files changed, 98 insertions, 48 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 3190c7f7e01..ccc030fd7fb 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -917,6 +917,13 @@ LANGUAGE INTERNAL VOLATILE ROWS 1000 COST 1000 AS 'pg_logical_slot_peek_binary_changes'; +CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot( + IN slot_name name, IN immediately_reserve boolean DEFAULT false, + OUT slot_name name, OUT xlog_position pg_lsn) +RETURNS RECORD +LANGUAGE INTERNAL +AS 'pg_create_physical_replication_slot'; + CREATE OR REPLACE FUNCTION make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0, days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0, diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5411e599eb2..5a07e1d9a69 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -250,52 +250,7 @@ CreateInitDecodingContext(char *plugin, StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN); SpinLockRelease(&slot->mutex); - /* - * The replication slot mechanism is used to prevent removal of required - * WAL. As there is no interlock between this and checkpoints required WAL - * could be removed before ReplicationSlotsComputeRequiredLSN() has been - * called to prevent that. In the very unlikely case that this happens - * we'll just retry. - */ - while (true) - { - XLogSegNo segno; - - /* - * Let's start with enough information if we can, so log a standby - * snapshot and start decoding at exactly that position. - */ - if (!RecoveryInProgress()) - { - XLogRecPtr flushptr; - - /* start at current insert position */ - slot->data.restart_lsn = GetXLogInsertRecPtr(); - - /* make sure we have enough information to start */ - flushptr = LogStandbySnapshot(); - - /* and make sure it's fsynced to disk */ - XLogFlush(flushptr); - } - else - slot->data.restart_lsn = GetRedoRecPtr(); - - /* prevent WAL removal as fast as possible */ - ReplicationSlotsComputeRequiredLSN(); - - /* - * If all required WAL is still there, great, otherwise retry. The - * slot should prevent further removal of WAL, unless there's a - * concurrent ReplicationSlotsComputeRequiredLSN() after we've written - * the new restart_lsn above, so normally we should never need to loop - * more than twice. - */ - XLByteToSeg(slot->data.restart_lsn, segno); - if (XLogGetLastRemovedSegno() < segno) - break; - } - + ReplicationSlotReserveWal(); /* ---- * This is a bit tricky: We need to determine a safe xmin horizon to start diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 1f013af8871..c66619cda29 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -40,6 +40,7 @@ #include <sys/stat.h> #include "access/transam.h" +#include "access/xlog_internal.h" #include "common/string.h" #include "miscadmin.h" #include "replication/slot.h" @@ -782,6 +783,76 @@ CheckSlotRequirements(void) } /* + * Reserve WAL for the currently active slot. + * + * Compute and set restart_lsn in a manner that's appropriate for the type of + * the slot and concurrency safe. + */ +void +ReplicationSlotReserveWal(void) +{ + ReplicationSlot *slot = MyReplicationSlot; + + Assert(slot != NULL); + Assert(slot->data.restart_lsn == InvalidXLogRecPtr); + + /* + * The replication slot mechanism is used to prevent removal of required + * WAL. As there is no interlock between this routine and checkpoints, WAL + * segments could concurrently be removed when a now stale return value of + * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that + * this happens we'll just retry. + */ + while (true) + { + XLogSegNo segno; + + /* + * For logical slots log a standby snapshot and start logical decoding + * at exactly that position. That allows the slot to start up more + * quickly. + * + * That's not needed (or indeed helpful) for physical slots as they'll + * start replay at the last logged checkpoint anyway. Instead return + * the location of the last redo LSN. While that slightly increases + * the chance that we have to retry, it's where a base backup has to + * start replay at. + */ + if (!RecoveryInProgress() && SlotIsLogical(slot)) + { + XLogRecPtr flushptr; + + /* start at current insert position */ + slot->data.restart_lsn = GetXLogInsertRecPtr(); + + /* make sure we have enough information to start */ + flushptr = LogStandbySnapshot(); + + /* and make sure it's fsynced to disk */ + XLogFlush(flushptr); + } + else + { + slot->data.restart_lsn = GetRedoRecPtr(); + } + + /* prevent WAL removal as fast as possible */ + ReplicationSlotsComputeRequiredLSN(); + + /* + * If all required WAL is still there, great, otherwise retry. The + * slot should prevent further removal of WAL, unless there's a + * concurrent ReplicationSlotsComputeRequiredLSN() after we've written + * the new restart_lsn above, so normally we should never need to loop + * more than twice. + */ + XLByteToSeg(slot->data.restart_lsn, segno); + if (XLogGetLastRemovedSegno() < segno) + break; + } +} + +/* * Flush all replication slots to disk. * * This needn't actually be part of a checkpoint, but it's a convenient diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index ecfcb0754bd..2dc68279900 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -40,6 +40,7 @@ Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS) { Name name = PG_GETARG_NAME(0); + bool immediately_reserve = PG_GETARG_BOOL(1); Datum values[2]; bool nulls[2]; TupleDesc tupdesc; @@ -59,9 +60,25 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT); values[0] = NameGetDatum(&MyReplicationSlot->data.name); - nulls[0] = false; - nulls[1] = true; + + if (immediately_reserve) + { + /* Reserve WAL as the user asked for it */ + ReplicationSlotReserveWal(); + + /* Write this slot to disk */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + + values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn); + nulls[1] = false; + } + else + { + values[0] = NameGetDatum(&MyReplicationSlot->data.name); + nulls[1] = true; + } tuple = heap_form_tuple(tupdesc, values, nulls); result = HeapTupleGetDatum(tuple); |