diff options
-rw-r--r-- | contrib/test_decoding/expected/twophase.out | 38 | ||||
-rw-r--r-- | contrib/test_decoding/expected/twophase_stream.out | 28 | ||||
-rw-r--r-- | doc/src/sgml/logicaldecoding.sgml | 9 | ||||
-rw-r--r-- | src/backend/replication/logical/decode.c | 2 | ||||
-rw-r--r-- | src/backend/replication/logical/logical.c | 3 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 10 | ||||
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 26 | ||||
-rw-r--r-- | src/include/replication/reorderbuffer.h | 1 | ||||
-rw-r--r-- | src/include/replication/slot.h | 7 | ||||
-rw-r--r-- | src/include/replication/snapbuild.h | 4 |
10 files changed, 61 insertions, 67 deletions
diff --git a/contrib/test_decoding/expected/twophase.out b/contrib/test_decoding/expected/twophase.out index afa35669795..8a1d06d706d 100644 --- a/contrib/test_decoding/expected/twophase.out +++ b/contrib/test_decoding/expected/twophase.out @@ -33,14 +33,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two COMMIT PREPARED 'test_prepared#1'; SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); - data ----------------------------------------------------- - BEGIN - table public.test_prepared1: INSERT: id[integer]:1 - table public.test_prepared1: INSERT: id[integer]:2 - PREPARE TRANSACTION 'test_prepared#1' + data +----------------------------------- COMMIT PREPARED 'test_prepared#1' -(5 rows) +(1 row) -- Test that rollback of a prepared xact is decoded. BEGIN; @@ -103,13 +99,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two COMMIT PREPARED 'test_prepared#3'; SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); - data -------------------------------------------------------------------------- - BEGIN - table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar' - PREPARE TRANSACTION 'test_prepared#3' + data +----------------------------------- COMMIT PREPARED 'test_prepared#3' -(4 rows) +(1 row) -- make sure stuff still works INSERT INTO test_prepared1 VALUES (6); @@ -158,14 +151,10 @@ RESET statement_timeout; COMMIT PREPARED 'test_prepared_lock'; -- consume the commit SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); - data ---------------------------------------------------------------------------- - BEGIN - table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol' - table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2' - PREPARE TRANSACTION 'test_prepared_lock' + data +-------------------------------------- COMMIT PREPARED 'test_prepared_lock' -(5 rows) +(1 row) -- Test savepoints and sub-xacts. Creating savepoints will create -- sub-xacts implicitly. @@ -188,13 +177,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two COMMIT PREPARED 'test_prepared_savepoint'; -- consume the commit SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); - data ------------------------------------------------------------- - BEGIN - table public.test_prepared_savepoint: INSERT: a[integer]:1 - PREPARE TRANSACTION 'test_prepared_savepoint' + data +------------------------------------------- COMMIT PREPARED 'test_prepared_savepoint' -(4 rows) +(1 row) -- Test that a GID containing "_nodecode" gets decoded at commit prepared time. BEGIN; diff --git a/contrib/test_decoding/expected/twophase_stream.out b/contrib/test_decoding/expected/twophase_stream.out index 3acc4acd365..d54e640b409 100644 --- a/contrib/test_decoding/expected/twophase_stream.out +++ b/contrib/test_decoding/expected/twophase_stream.out @@ -60,32 +60,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two- COMMIT PREPARED 'test1'; --should show the COMMIT PREPARED and the other changes in the transaction SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); - data -------------------------------------------------------------- - BEGIN - table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1' - table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2' - table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3' - table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4' - table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5' - table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6' - table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7' - table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8' - table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9' - table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10' - table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11' - table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12' - table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13' - table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14' - table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15' - table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16' - table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17' - table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18' - table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19' - table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20' - PREPARE TRANSACTION 'test1' + data +------------------------- COMMIT PREPARED 'test1' -(23 rows) +(1 row) -- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with -- filtered gid. gids with '_nodecode' will not be decoded at prepare time. diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 85c55d64125..f1f13d81d56 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -191,9 +191,6 @@ postgres=# COMMIT PREPARED 'test_prepared1'; postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1'); lsn | xid | data -----------+-----+-------------------------------------------- - 0/1689DC0 | 529 | BEGIN 529 - 0/1689DC0 | 529 | table public.data: INSERT: id[integer]:3 data[text]:'5' - 0/1689FC0 | 529 | PREPARE TRANSACTION 'test_prepared1', txid 529 0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529 (4 row) @@ -822,10 +819,8 @@ typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx <parameter>gid</parameter> field, which is part of the <parameter>txn</parameter> parameter, can be used in this callback to check if the plugin has already received this <command>PREPARE</command> - in which case it can skip the remaining changes of the transaction. - This can only happen if the user restarts the decoding after receiving - the <command>PREPARE</command> for a transaction but before receiving - the <command>COMMIT PREPARED</command>, say because of some error. + in which case it can either error out or skip the remaining changes of + the transaction. <programlisting> typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 657cb4af1e3..5f596135b15 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -730,6 +730,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, if (two_phase) { ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, + SnapBuildInitialConsistentPoint(ctx->snapshot_builder), commit_time, origin_id, origin_lsn, parsed->twophase_gid, true); } @@ -868,6 +869,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, { ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr, abort_time, origin_id, origin_lsn, + InvalidXLogRecPtr, parsed->twophase_gid, false); } else diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index baeb45ff43c..3f6d723d096 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder = ReorderBufferAllocate(); ctx->snapshot_builder = AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn, - need_full_snapshot); + need_full_snapshot, slot->data.initial_consistent_point); ctx->reorder->private_data = ctx; @@ -590,6 +590,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) SpinLockAcquire(&slot->mutex); slot->data.confirmed_flush = ctx->reader->EndRecPtr; + slot->data.initial_consistent_point = ctx->reader->EndRecPtr; SpinLockRelease(&slot->mutex); } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c3b963211e8..91600ac5667 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2672,6 +2672,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + XLogRecPtr initial_consistent_point, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit) { @@ -2698,12 +2699,11 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, /* * It is possible that this transaction is not decoded at prepare time * either because by that time we didn't have a consistent snapshot or it - * was decoded earlier but we have restarted. We can't distinguish between - * those two cases so we send the prepare in both the cases and let - * downstream decide whether to process or skip it. We don't need to - * decode the xact for aborts if it is not done already. + * was decoded earlier but we have restarted. We only need to send the + * prepare if it was not decoded earlier. We don't need to decode the xact + * for aborts if it is not done already. */ - if (!rbtxn_prepared(txn) && is_commit) + if ((txn->final_lsn < initial_consistent_point) && is_commit) { txn->txn_flags |= RBTXN_PREPARE; diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index e11788795f1..ed3acadab7b 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -165,6 +165,17 @@ struct SnapBuild XLogRecPtr start_decoding_at; /* + * LSN at which we found a consistent point at the time of slot creation. + * This is also the point where we have exported a snapshot for the + * initial copy. + * + * The prepared transactions that are not covered by initial snapshot + * needs to be sent later along with commit prepared and they must be + * before this point. + */ + XLogRecPtr initial_consistent_point; + + /* * Don't start decoding WAL until the "xl_running_xacts" information * indicates there are no running xids with an xid smaller than this. */ @@ -269,7 +280,8 @@ SnapBuild * AllocateSnapshotBuilder(ReorderBuffer *reorder, TransactionId xmin_horizon, XLogRecPtr start_lsn, - bool need_full_snapshot) + bool need_full_snapshot, + XLogRecPtr initial_consistent_point) { MemoryContext context; MemoryContext oldcontext; @@ -297,6 +309,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, builder->initial_xmin_horizon = xmin_horizon; builder->start_decoding_at = start_lsn; builder->building_full_snapshot = need_full_snapshot; + builder->initial_consistent_point = initial_consistent_point; MemoryContextSwitchTo(oldcontext); @@ -357,6 +370,15 @@ SnapBuildCurrentState(SnapBuild *builder) } /* + * Return the LSN at which the snapshot was exported + */ +XLogRecPtr +SnapBuildInitialConsistentPoint(SnapBuild *builder) +{ + return builder->initial_consistent_point; +} + +/* * Should the contents of transaction ending at 'ptr' be decoded? */ bool @@ -1422,7 +1444,7 @@ typedef struct SnapBuildOnDisk offsetof(SnapBuildOnDisk, version) #define SNAPBUILD_MAGIC 0x51A1E001 -#define SNAPBUILD_VERSION 3 +#define SNAPBUILD_VERSION 4 /* * Store/Load a snapshot from disk, depending on the snapshot builder's state. diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index bab31bf7af7..565a961d6ab 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -643,6 +643,7 @@ void ReorderBufferCommit(ReorderBuffer *, TransactionId, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + XLogRecPtr initial_consistent_point, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn, char *gid, bool is_commit); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 38a9a0b3fc4..5c3fde20c69 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -91,6 +91,13 @@ typedef struct ReplicationSlotPersistentData */ XLogRecPtr confirmed_flush; + /* + * LSN at which we found a consistent point at the time of slot creation. + * This is also the point where we have exported a snapshot for the + * initial copy. + */ + XLogRecPtr initial_consistent_point; + /* plugin name */ NameData plugin; } ReplicationSlotPersistentData; diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index d9f187a58ec..fbabce6764d 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -61,7 +61,8 @@ extern void CheckPointSnapBuild(void); extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache, TransactionId xmin_horizon, XLogRecPtr start_lsn, - bool need_full_snapshot); + bool need_full_snapshot, + XLogRecPtr initial_consistent_point); extern void FreeSnapshotBuilder(SnapBuild *cache); extern void SnapBuildSnapDecRefcount(Snapshot snap); @@ -75,6 +76,7 @@ extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid); extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr); +extern XLogRecPtr SnapBuildInitialConsistentPoint(SnapBuild *builder); extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, |