aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/logical.c2
-rw-r--r--src/backend/replication/slotfuncs.c46
2 files changed, 41 insertions, 7 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index e3da7d36250..5adf253583b 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -208,6 +208,8 @@ StartupDecodingContext(List *output_plugin_options,
*
* plugin -- contains the name of the output plugin
* output_plugin_options -- contains options passed to the output plugin
+ * need_full_snapshot -- if true, must obtain a snapshot able to read all
+ * tables; if false, one that can read only catalogs is acceptable.
* restart_lsn -- if given as invalid, it's this routine's responsibility to
* mark WAL as reserved by setting a convenient restart_lsn for the slot.
* Otherwise, we set for decoding to start from the given LSN without
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 2c9d5de6d90..beb735d87b6 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -118,10 +118,14 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
* Helper function for creating a new logical replication slot with
* given arguments. Note that this function doesn't release the created
* slot.
+ *
+ * When find_startpoint is false, the slot's confirmed_flush is not set; it's
+ * caller's responsibility to ensure it's set to something sensible.
*/
static void
create_logical_replication_slot(char *name, char *plugin,
- bool temporary, XLogRecPtr restart_lsn)
+ bool temporary, XLogRecPtr restart_lsn,
+ bool find_startpoint)
{
LogicalDecodingContext *ctx = NULL;
@@ -139,16 +143,24 @@ create_logical_replication_slot(char *name, char *plugin,
temporary ? RS_TEMPORARY : RS_EPHEMERAL);
/*
- * Create logical decoding context, to build the initial snapshot.
+ * Create logical decoding context to find start point or, if we don't
+ * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
+ *
+ * Note: when !find_startpoint this is still important, because it's at
+ * this point that the output plugin is validated.
*/
ctx = CreateInitDecodingContext(plugin, NIL,
- false, /* do not build snapshot */
+ false, /* just catalogs is OK */
restart_lsn,
logical_read_local_xlog_page, NULL, NULL,
NULL);
- /* build initial snapshot, might take a while */
- DecodingContextFindStartpoint(ctx);
+ /*
+ * If caller needs us to determine the decoding start point, do so now.
+ * This might take a while.
+ */
+ if (find_startpoint)
+ DecodingContextFindStartpoint(ctx);
/* don't need the decoding context anymore */
FreeDecodingContext(ctx);
@@ -179,7 +191,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
create_logical_replication_slot(NameStr(*name),
NameStr(*plugin),
temporary,
- InvalidXLogRecPtr);
+ InvalidXLogRecPtr,
+ true);
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
@@ -683,10 +696,18 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
/* Create new slot and acquire it */
if (logical_slot)
+ {
+ /*
+ * We must not try to read WAL, since we haven't reserved it yet --
+ * hence pass find_startpoint false. confirmed_flush will be set
+ * below, by copying from the source slot.
+ */
create_logical_replication_slot(NameStr(*dst_name),
plugin,
temporary,
- src_restart_lsn);
+ src_restart_lsn,
+ false);
+ }
else
create_physical_replication_slot(NameStr(*dst_name),
true,
@@ -703,6 +724,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
TransactionId copy_xmin;
TransactionId copy_catalog_xmin;
XLogRecPtr copy_restart_lsn;
+ XLogRecPtr copy_confirmed_flush;
bool copy_islogical;
char *copy_name;
@@ -714,6 +736,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
copy_xmin = src->data.xmin;
copy_catalog_xmin = src->data.catalog_xmin;
copy_restart_lsn = src->data.restart_lsn;
+ copy_confirmed_flush = src->data.confirmed_flush;
/* for existence check */
copy_name = pstrdup(NameStr(src->data.name));
@@ -738,6 +761,14 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
NameStr(*src_name)),
errdetail("The source replication slot was modified incompatibly during the copy operation.")));
+ /* The source slot must have a consistent snapshot */
+ if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot copy unfinished logical replication slot \"%s\"",
+ NameStr(*src_name)),
+ errhint("Retry when the source replication slot's confirmed_flush_lsn is valid.")));
+
/* Install copied values again */
SpinLockAcquire(&MyReplicationSlot->mutex);
MyReplicationSlot->effective_xmin = copy_effective_xmin;
@@ -746,6 +777,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
MyReplicationSlot->data.xmin = copy_xmin;
MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
+ MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
SpinLockRelease(&MyReplicationSlot->mutex);
ReplicationSlotMarkDirty();