aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/expected/twophase.out38
-rw-r--r--contrib/test_decoding/expected/twophase_stream.out28
-rw-r--r--doc/src/sgml/logicaldecoding.sgml9
-rw-r--r--src/backend/replication/logical/decode.c2
-rw-r--r--src/backend/replication/logical/logical.c3
-rw-r--r--src/backend/replication/logical/reorderbuffer.c10
-rw-r--r--src/backend/replication/logical/snapbuild.c26
-rw-r--r--src/include/replication/reorderbuffer.h1
-rw-r--r--src/include/replication/slot.h7
-rw-r--r--src/include/replication/snapbuild.h4
10 files changed, 61 insertions, 67 deletions
diff --git a/contrib/test_decoding/expected/twophase.out b/contrib/test_decoding/expected/twophase.out
index afa35669795..8a1d06d706d 100644
--- a/contrib/test_decoding/expected/twophase.out
+++ b/contrib/test_decoding/expected/twophase.out
@@ -33,14 +33,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
COMMIT PREPARED 'test_prepared#1';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
- data
-----------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:1
- table public.test_prepared1: INSERT: id[integer]:2
- PREPARE TRANSACTION 'test_prepared#1'
+ data
+-----------------------------------
COMMIT PREPARED 'test_prepared#1'
-(5 rows)
+(1 row)
-- Test that rollback of a prepared xact is decoded.
BEGIN;
@@ -103,13 +99,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
COMMIT PREPARED 'test_prepared#3';
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
- data
--------------------------------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
- PREPARE TRANSACTION 'test_prepared#3'
+ data
+-----------------------------------
COMMIT PREPARED 'test_prepared#3'
-(4 rows)
+(1 row)
-- make sure stuff still works
INSERT INTO test_prepared1 VALUES (6);
@@ -158,14 +151,10 @@ RESET statement_timeout;
COMMIT PREPARED 'test_prepared_lock';
-- consume the commit
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
- data
----------------------------------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
- table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
- PREPARE TRANSACTION 'test_prepared_lock'
+ data
+--------------------------------------
COMMIT PREPARED 'test_prepared_lock'
-(5 rows)
+(1 row)
-- Test savepoints and sub-xacts. Creating savepoints will create
-- sub-xacts implicitly.
@@ -188,13 +177,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
COMMIT PREPARED 'test_prepared_savepoint';
-- consume the commit
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
- data
-------------------------------------------------------------
- BEGIN
- table public.test_prepared_savepoint: INSERT: a[integer]:1
- PREPARE TRANSACTION 'test_prepared_savepoint'
+ data
+-------------------------------------------
COMMIT PREPARED 'test_prepared_savepoint'
-(4 rows)
+(1 row)
-- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
BEGIN;
diff --git a/contrib/test_decoding/expected/twophase_stream.out b/contrib/test_decoding/expected/twophase_stream.out
index 3acc4acd365..d54e640b409 100644
--- a/contrib/test_decoding/expected/twophase_stream.out
+++ b/contrib/test_decoding/expected/twophase_stream.out
@@ -60,32 +60,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-
COMMIT PREPARED 'test1';
--should show the COMMIT PREPARED and the other changes in the transaction
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
- data
--------------------------------------------------------------
- BEGIN
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
- PREPARE TRANSACTION 'test1'
+ data
+-------------------------
COMMIT PREPARED 'test1'
-(23 rows)
+(1 row)
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 85c55d64125..f1f13d81d56 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -191,9 +191,6 @@ postgres=# COMMIT PREPARED 'test_prepared1';
postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
lsn | xid | data
-----------+-----+--------------------------------------------
- 0/1689DC0 | 529 | BEGIN 529
- 0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5'
- 0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529
0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
(4 row)
@@ -822,10 +819,8 @@ typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx
<parameter>gid</parameter> field, which is part of the
<parameter>txn</parameter> parameter, can be used in this callback to
check if the plugin has already received this <command>PREPARE</command>
- in which case it can skip the remaining changes of the transaction.
- This can only happen if the user restarts the decoding after receiving
- the <command>PREPARE</command> for a transaction but before receiving
- the <command>COMMIT PREPARED</command>, say because of some error.
+ in which case it can either error out or skip the remaining changes of
+ the transaction.
<programlisting>
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 657cb4af1e3..5f596135b15 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -730,6 +730,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
if (two_phase)
{
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+ SnapBuildInitialConsistentPoint(ctx->snapshot_builder),
commit_time, origin_id, origin_lsn,
parsed->twophase_gid, true);
}
@@ -868,6 +869,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
{
ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
abort_time, origin_id, origin_lsn,
+ InvalidXLogRecPtr,
parsed->twophase_gid, false);
}
else
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index baeb45ff43c..3f6d723d096 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder = ReorderBufferAllocate();
ctx->snapshot_builder =
AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
- need_full_snapshot);
+ need_full_snapshot, slot->data.initial_consistent_point);
ctx->reorder->private_data = ctx;
@@ -590,6 +590,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
SpinLockAcquire(&slot->mutex);
slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+ slot->data.initial_consistent_point = ctx->reader->EndRecPtr;
SpinLockRelease(&slot->mutex);
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c3b963211e8..91600ac5667 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2672,6 +2672,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
void
ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ XLogRecPtr initial_consistent_point,
TimestampTz commit_time, RepOriginId origin_id,
XLogRecPtr origin_lsn, char *gid, bool is_commit)
{
@@ -2698,12 +2699,11 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
/*
* It is possible that this transaction is not decoded at prepare time
* either because by that time we didn't have a consistent snapshot or it
- * was decoded earlier but we have restarted. We can't distinguish between
- * those two cases so we send the prepare in both the cases and let
- * downstream decide whether to process or skip it. We don't need to
- * decode the xact for aborts if it is not done already.
+ * was decoded earlier but we have restarted. We only need to send the
+ * prepare if it was not decoded earlier. We don't need to decode the xact
+ * for aborts if it is not done already.
*/
- if (!rbtxn_prepared(txn) && is_commit)
+ if ((txn->final_lsn < initial_consistent_point) && is_commit)
{
txn->txn_flags |= RBTXN_PREPARE;
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index e11788795f1..ed3acadab7b 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -165,6 +165,17 @@ struct SnapBuild
XLogRecPtr start_decoding_at;
/*
+ * LSN at which we found a consistent point at the time of slot creation.
+ * This is also the point where we have exported a snapshot for the
+ * initial copy.
+ *
+ * The prepared transactions that are not covered by initial snapshot
+ * needs to be sent later along with commit prepared and they must be
+ * before this point.
+ */
+ XLogRecPtr initial_consistent_point;
+
+ /*
* Don't start decoding WAL until the "xl_running_xacts" information
* indicates there are no running xids with an xid smaller than this.
*/
@@ -269,7 +280,8 @@ SnapBuild *
AllocateSnapshotBuilder(ReorderBuffer *reorder,
TransactionId xmin_horizon,
XLogRecPtr start_lsn,
- bool need_full_snapshot)
+ bool need_full_snapshot,
+ XLogRecPtr initial_consistent_point)
{
MemoryContext context;
MemoryContext oldcontext;
@@ -297,6 +309,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
builder->initial_xmin_horizon = xmin_horizon;
builder->start_decoding_at = start_lsn;
builder->building_full_snapshot = need_full_snapshot;
+ builder->initial_consistent_point = initial_consistent_point;
MemoryContextSwitchTo(oldcontext);
@@ -357,6 +370,15 @@ SnapBuildCurrentState(SnapBuild *builder)
}
/*
+ * Return the LSN at which the snapshot was exported
+ */
+XLogRecPtr
+SnapBuildInitialConsistentPoint(SnapBuild *builder)
+{
+ return builder->initial_consistent_point;
+}
+
+/*
* Should the contents of transaction ending at 'ptr' be decoded?
*/
bool
@@ -1422,7 +1444,7 @@ typedef struct SnapBuildOnDisk
offsetof(SnapBuildOnDisk, version)
#define SNAPBUILD_MAGIC 0x51A1E001
-#define SNAPBUILD_VERSION 3
+#define SNAPBUILD_VERSION 4
/*
* Store/Load a snapshot from disk, depending on the snapshot builder's state.
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index bab31bf7af7..565a961d6ab 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -643,6 +643,7 @@ void ReorderBufferCommit(ReorderBuffer *, TransactionId,
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+ XLogRecPtr initial_consistent_point,
TimestampTz commit_time,
RepOriginId origin_id, XLogRecPtr origin_lsn,
char *gid, bool is_commit);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 38a9a0b3fc4..5c3fde20c69 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -91,6 +91,13 @@ typedef struct ReplicationSlotPersistentData
*/
XLogRecPtr confirmed_flush;
+ /*
+ * LSN at which we found a consistent point at the time of slot creation.
+ * This is also the point where we have exported a snapshot for the
+ * initial copy.
+ */
+ XLogRecPtr initial_consistent_point;
+
/* plugin name */
NameData plugin;
} ReplicationSlotPersistentData;
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index d9f187a58ec..fbabce6764d 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -61,7 +61,8 @@ extern void CheckPointSnapBuild(void);
extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
TransactionId xmin_horizon, XLogRecPtr start_lsn,
- bool need_full_snapshot);
+ bool need_full_snapshot,
+ XLogRecPtr initial_consistent_point);
extern void FreeSnapshotBuilder(SnapBuild *cache);
extern void SnapBuildSnapDecRefcount(Snapshot snap);
@@ -75,6 +76,7 @@ extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
TransactionId xid);
extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
+extern XLogRecPtr SnapBuildInitialConsistentPoint(SnapBuild *builder);
extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
TransactionId xid, int nsubxacts,