aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/logical.c7
-rw-r--r--src/backend/replication/logical/snapbuild.c29
-rw-r--r--src/include/replication/snapbuild.h1
3 files changed, 28 insertions, 9 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 99f31849bb1..f8ef5d56d26 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -152,6 +152,7 @@ StartupDecodingContext(List *output_plugin_options,
TransactionId xmin_horizon,
bool need_full_snapshot,
bool fast_forward,
+ bool in_create,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
@@ -212,7 +213,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder = ReorderBufferAllocate();
ctx->snapshot_builder =
AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
- need_full_snapshot, slot->data.two_phase_at);
+ need_full_snapshot, in_create, slot->data.two_phase_at);
ctx->reorder->private_data = ctx;
@@ -438,7 +439,7 @@ CreateInitDecodingContext(const char *plugin,
ReplicationSlotSave();
ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
- need_full_snapshot, false,
+ need_full_snapshot, false, true,
xl_routine, prepare_write, do_write,
update_progress);
@@ -592,7 +593,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId, false,
- fast_forward, xl_routine, prepare_write,
+ fast_forward, false, xl_routine, prepare_write,
do_write, update_progress);
/* call output plugin initialization callback */
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index e37e22f4417..ae676145e60 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -190,6 +190,14 @@ struct SnapBuild
bool building_full_snapshot;
/*
+ * Indicates if we are using the snapshot builder for the creation of a
+ * logical replication slot. If it's true, the start point for decoding
+ * changes is not determined yet. So we skip snapshot restores to properly
+ * find the start point. See SnapBuildFindSnapshot() for details.
+ */
+ bool in_slot_creation;
+
+ /*
* Snapshot that's valid to see the catalog state seen at this moment.
*/
Snapshot snapshot;
@@ -317,6 +325,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
TransactionId xmin_horizon,
XLogRecPtr start_lsn,
bool need_full_snapshot,
+ bool in_slot_creation,
XLogRecPtr two_phase_at)
{
MemoryContext context;
@@ -347,6 +356,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
builder->initial_xmin_horizon = xmin_horizon;
builder->start_decoding_at = start_lsn;
+ builder->in_slot_creation = in_slot_creation;
builder->building_full_snapshot = need_full_snapshot;
builder->two_phase_at = two_phase_at;
@@ -1327,10 +1337,12 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
* state while waiting on c)'s sub-states.
*
* b) This (in a previous run) or another decoding slot serialized a
- * snapshot to disk that we can use. Can't use this method for the
- * initial snapshot when slot is being created and needs full snapshot
- * for export or direct use, as that snapshot will only contain catalog
- * modifying transactions.
+ * snapshot to disk that we can use. Can't use this method while finding
+ * the start point for decoding changes as the restart LSN would be an
+ * arbitrary LSN but we need to find the start point to extract changes
+ * where we won't see the data for partial transactions. Also, we cannot
+ * use this method when a slot needs a full snapshot for export or direct
+ * use, as that snapshot will only contain catalog modifying transactions.
*
* c) First incrementally build a snapshot for catalog tuples
* (BUILDING_SNAPSHOT), that requires all, already in-progress,
@@ -1395,8 +1407,13 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
return false;
}
- /* b) valid on disk state and not building full snapshot */
+
+ /*
+ * b) valid on disk state and while neither building full snapshot nor
+ * creating a slot.
+ */
else if (!builder->building_full_snapshot &&
+ !builder->in_slot_creation &&
SnapBuildRestore(builder, lsn))
{
/* there won't be any state to cleanup */
@@ -1580,7 +1597,7 @@ typedef struct SnapBuildOnDisk
offsetof(SnapBuildOnDisk, version)
#define SNAPBUILD_MAGIC 0x51A1E001
-#define SNAPBUILD_VERSION 5
+#define SNAPBUILD_VERSION 6
/*
* Store/Load a snapshot from disk, depending on the snapshot builder's state.
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index a3360a1c5ea..caa5113ff81 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -62,6 +62,7 @@ extern void CheckPointSnapBuild(void);
extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *reorder,
TransactionId xmin_horizon, XLogRecPtr start_lsn,
bool need_full_snapshot,
+ bool in_slot_creation,
XLogRecPtr two_phase_at);
extern void FreeSnapshotBuilder(SnapBuild *builder);