aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-03-01 09:11:18 +0530
committerAmit Kapila <akapila@postgresql.org>2021-03-01 09:11:18 +0530
commit8bdb1332eb51837c15a10a972c179b84f654279e (patch)
treebadfa4657b5a985fb4d13564e9fc810f0d425f88 /src
parent6230912f23904aa6cb2a1f948ca9b08235b4f54a (diff)
downloadpostgresql-8bdb1332eb51837c15a10a972c179b84f654279e.tar.gz
postgresql-8bdb1332eb51837c15a10a972c179b84f654279e.zip
Avoid repeated decoding of prepared transactions after a restart.
In commit a271a1b50e, we allowed decoding at prepare time and the prepare was decoded again if there is a restart after decoding it. It was done that way because we can't distinguish between the cases where we have not decoded the prepare because it was prior to consistent snapshot or we have decoded it earlier but restarted. To distinguish between these two cases, we have introduced an initial_consistent_point at the slot level which is an LSN at which we found a consistent point at the time of slot creation. This is also the point where we have exported a snapshot for the initial copy. So, prepare transaction prior to this point are sent along with commit prepared. This commit bumps SNAPBUILD_VERSION because of change in SnapBuild. It will break existing slots which is fine in a major release. Author: Ajin Cherian, based on idea by Andres Freund Reviewed-by: Amit Kapila and Vignesh C Discussion: https://postgr.es/m/d0f60d60-133d-bf8d-bd70-47784d8fabf3@enterprisedb.com
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/decode.c2
-rw-r--r--src/backend/replication/logical/logical.c3
-rw-r--r--src/backend/replication/logical/reorderbuffer.c10
-rw-r--r--src/backend/replication/logical/snapbuild.c26
-rw-r--r--src/include/replication/reorderbuffer.h1
-rw-r--r--src/include/replication/slot.h7
-rw-r--r--src/include/replication/snapbuild.h4
7 files changed, 44 insertions, 9 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 657cb4af1e3..5f596135b15 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -730,6 +730,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
if (two_phase)
{
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+ SnapBuildInitialConsistentPoint(ctx->snapshot_builder),
commit_time, origin_id, origin_lsn,
parsed->twophase_gid, true);
}
@@ -868,6 +869,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
{
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
abort_time, origin_id, origin_lsn,
+ InvalidXLogRecPtr,
parsed->twophase_gid, false);
}
else
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index baeb45ff43c..3f6d723d096 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder = ReorderBufferAllocate();
ctx->snapshot_builder =
AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
- need_full_snapshot);
+ need_full_snapshot, slot->data.initial_consistent_point);
ctx->reorder->private_data = ctx;
@@ -590,6 +590,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
SpinLockAcquire(&slot->mutex);
slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+ slot->data.initial_consistent_point = ctx->reader->EndRecPtr;
SpinLockRelease(&slot->mutex);
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c3b963211e8..91600ac5667 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2672,6 +2672,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
void
ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ XLogRecPtr initial_consistent_point,
TimestampTz commit_time, RepOriginId origin_id,
XLogRecPtr origin_lsn, char *gid, bool is_commit)
{
@@ -2698,12 +2699,11 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
/*
* It is possible that this transaction is not decoded at prepare time
* either because by that time we didn't have a consistent snapshot or it
- * was decoded earlier but we have restarted. We can't distinguish between
- * those two cases so we send the prepare in both the cases and let
- * downstream decide whether to process or skip it. We don't need to
- * decode the xact for aborts if it is not done already.
+ * was decoded earlier but we have restarted. We only need to send the
+ * prepare if it was not decoded earlier. We don't need to decode the xact
+ * for aborts if it is not done already.
*/
- if (!rbtxn_prepared(txn) && is_commit)
+ if ((txn->final_lsn < initial_consistent_point) && is_commit)
{
txn->txn_flags |= RBTXN_PREPARE;
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index e11788795f1..ed3acadab7b 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -165,6 +165,17 @@ struct SnapBuild
XLogRecPtr start_decoding_at;
/*
+ * LSN at which we found a consistent point at the time of slot creation.
+ * This is also the point where we have exported a snapshot for the
+ * initial copy.
+ *
+ * The prepared transactions that are not covered by initial snapshot
+ * needs to be sent later along with commit prepared and they must be
+ * before this point.
+ */
+ XLogRecPtr initial_consistent_point;
+
+ /*
* Don't start decoding WAL until the "xl_running_xacts" information
* indicates there are no running xids with an xid smaller than this.
*/
@@ -269,7 +280,8 @@ SnapBuild *
AllocateSnapshotBuilder(ReorderBuffer *reorder,
TransactionId xmin_horizon,
XLogRecPtr start_lsn,
- bool need_full_snapshot)
+ bool need_full_snapshot,
+ XLogRecPtr initial_consistent_point)
{
MemoryContext context;
MemoryContext oldcontext;
@@ -297,6 +309,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
builder->initial_xmin_horizon = xmin_horizon;
builder->start_decoding_at = start_lsn;
builder->building_full_snapshot = need_full_snapshot;
+ builder->initial_consistent_point = initial_consistent_point;
MemoryContextSwitchTo(oldcontext);
@@ -357,6 +370,15 @@ SnapBuildCurrentState(SnapBuild *builder)
}
/*
+ * Return the LSN at which the snapshot was exported
+ */
+XLogRecPtr
+SnapBuildInitialConsistentPoint(SnapBuild *builder)
+{
+ return builder->initial_consistent_point;
+}
+
+/*
* Should the contents of transaction ending at 'ptr' be decoded?
*/
bool
@@ -1422,7 +1444,7 @@ typedef struct SnapBuildOnDisk
offsetof(SnapBuildOnDisk, version)
#define SNAPBUILD_MAGIC 0x51A1E001
-#define SNAPBUILD_VERSION 3
+#define SNAPBUILD_VERSION 4
/*
* Store/Load a snapshot from disk, depending on the snapshot builder's state.
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index bab31bf7af7..565a961d6ab 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -643,6 +643,7 @@ void ReorderBufferCommit(ReorderBuffer *, TransactionId,
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ XLogRecPtr initial_consistent_point,
TimestampTz commit_time,
RepOriginId origin_id, XLogRecPtr origin_lsn,
char *gid, bool is_commit);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 38a9a0b3fc4..5c3fde20c69 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -91,6 +91,13 @@ typedef struct ReplicationSlotPersistentData
*/
XLogRecPtr confirmed_flush;
+ /*
+ * LSN at which we found a consistent point at the time of slot creation.
+ * This is also the point where we have exported a snapshot for the
+ * initial copy.
+ */
+ XLogRecPtr initial_consistent_point;
+
/* plugin name */
NameData plugin;
} ReplicationSlotPersistentData;
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index d9f187a58ec..fbabce6764d 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -61,7 +61,8 @@ extern void CheckPointSnapBuild(void);
extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
TransactionId xmin_horizon, XLogRecPtr start_lsn,
- bool need_full_snapshot);
+ bool need_full_snapshot,
+ XLogRecPtr initial_consistent_point);
extern void FreeSnapshotBuilder(SnapBuild *cache);
extern void SnapBuildSnapDecRefcount(Snapshot snap);
@@ -75,6 +76,7 @@ extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
TransactionId xid);
extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
+extern XLogRecPtr SnapBuildInitialConsistentPoint(SnapBuild *builder);
extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
TransactionId xid, int nsubxacts,