aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/proto.c2
-rw-r--r--src/backend/replication/logical/reorderbuffer.c50
-rw-r--r--src/backend/replication/logical/snapbuild.c2
-rw-r--r--src/include/replication/reorderbuffer.h8
4 files changed, 38 insertions, 24 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index dc72b7c8f77..1a352b542dc 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -164,7 +164,7 @@ logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
* which case we expect to have a valid GID.
*/
Assert(txn->gid != NULL);
- Assert(rbtxn_prepared(txn));
+ Assert(rbtxn_is_prepared(txn));
Assert(TransactionIdIsValid(txn->xid));
/* send the flags field */
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index ed5a2946dc1..b42f4002ba8 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1793,7 +1793,7 @@ ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn
* and the toast reconstruction data. The full cleanup will happen as part
* of decoding ABORT record of this transaction.
*/
- ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn));
ReorderBufferToastReset(rb, txn);
/* All changes should be discarded */
@@ -1968,7 +1968,7 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferStreamTXN(rb, txn);
- if (rbtxn_prepared(txn))
+ if (rbtxn_is_prepared(txn))
{
/*
* Note, we send stream prepare even if a concurrent abort is
@@ -2150,7 +2150,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferChange *specinsert)
{
/* Discard the changes that we just streamed */
- ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn));
/* Free all resources allocated for toast reconstruction */
ReorderBufferToastReset(rb, txn);
@@ -2238,7 +2238,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
*/
if (!streaming)
{
- if (rbtxn_prepared(txn))
+ if (rbtxn_is_prepared(txn))
rb->begin_prepare(rb, txn);
else
rb->begin(rb, txn);
@@ -2280,7 +2280,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
* required for the cases when we decode the changes before the
* COMMIT record is processed.
*/
- if (streaming || rbtxn_prepared(change->txn))
+ if (streaming || rbtxn_is_prepared(change->txn))
{
curtxn = change->txn;
SetupCheckXidLive(curtxn->xid);
@@ -2625,7 +2625,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
* Call either PREPARE (for two-phase transactions) or COMMIT (for
* regular ones).
*/
- if (rbtxn_prepared(txn))
+ if (rbtxn_is_prepared(txn))
{
Assert(!rbtxn_sent_prepare(txn));
rb->prepare(rb, txn, commit_lsn);
@@ -2680,12 +2680,12 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
* For 4, as the entire txn has been decoded, we can fully clean up
* the TXN reorder buffer.
*/
- if (streaming || rbtxn_prepared(txn))
+ if (streaming || rbtxn_is_prepared(txn))
{
if (streaming)
ReorderBufferMaybeMarkTXNStreamed(rb, txn);
- ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn));
/* Reset the CheckXidAlive */
CheckXidAlive = InvalidTransactionId;
}
@@ -2729,7 +2729,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
* during a two-phase commit.
*/
if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
- (stream_started || rbtxn_prepared(txn)))
+ (stream_started || rbtxn_is_prepared(txn)))
{
/* curtxn must be set for streaming or prepared transactions */
Assert(curtxn);
@@ -2816,7 +2816,7 @@ ReorderBufferReplay(ReorderBufferTXN *txn,
* Removing this txn before a commit might result in the computation
* of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
*/
- if (!rbtxn_prepared(txn))
+ if (!rbtxn_is_prepared(txn))
ReorderBufferCleanupTXN(rb, txn);
return;
}
@@ -2853,7 +2853,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
}
/*
- * Record the prepare information for a transaction.
+ * Record the prepare information for a transaction. Also, mark the transaction
+ * as a prepared transaction.
*/
bool
ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
@@ -2879,6 +2880,10 @@ ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
txn->origin_id = origin_id;
txn->origin_lsn = origin_lsn;
+ /* Mark this transaction as a prepared transaction */
+ Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == 0);
+ txn->txn_flags |= RBTXN_IS_PREPARED;
+
return true;
}
@@ -2894,6 +2899,8 @@ ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
if (txn == NULL)
return;
+ /* txn must have been marked as a prepared transaction */
+ Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == RBTXN_IS_PREPARED);
txn->txn_flags |= RBTXN_SKIPPED_PREPARE;
}
@@ -2915,12 +2922,16 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
if (txn == NULL)
return;
- txn->txn_flags |= RBTXN_PREPARE;
- txn->gid = pstrdup(gid);
-
- /* The prepare info must have been updated in txn by now. */
+ /*
+ * txn must have been marked as a prepared transaction and must have
+ * neither been skipped nor sent a prepare. Also, the prepare info must
+ * have been updated in it by now.
+ */
+ Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == RBTXN_IS_PREPARED);
Assert(txn->final_lsn != InvalidXLogRecPtr);
+ txn->gid = pstrdup(gid);
+
ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
@@ -2976,12 +2987,13 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
*/
if ((txn->final_lsn < two_phase_at) && is_commit)
{
- txn->txn_flags |= RBTXN_PREPARE;
-
/*
- * The prepare info must have been updated in txn even if we skip
- * prepare.
+ * txn must have been marked as a prepared transaction and skipped but
+ * not sent a prepare. Also, the prepare info must have been updated
+ * in txn even if we skip prepare.
*/
+ Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) ==
+ (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE));
Assert(txn->final_lsn != InvalidXLogRecPtr);
/*
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index bbedd3de318..05687fd75e5 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -761,7 +761,7 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
* We don't need to add snapshot to prepared transactions as they
* should not see the new catalog contents.
*/
- if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn))
+ if (rbtxn_is_prepared(txn))
continue;
elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 9d9ac2f0830..517a8e3634f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -170,13 +170,15 @@ typedef struct ReorderBufferChange
#define RBTXN_IS_SERIALIZED_CLEAR 0x0008
#define RBTXN_IS_STREAMED 0x0010
#define RBTXN_HAS_PARTIAL_CHANGE 0x0020
-#define RBTXN_PREPARE 0x0040
+#define RBTXN_IS_PREPARED 0x0040
#define RBTXN_SKIPPED_PREPARE 0x0080
#define RBTXN_HAS_STREAMABLE_CHANGE 0x0100
#define RBTXN_SENT_PREPARE 0x0200
#define RBTXN_IS_COMMITTED 0x0400
#define RBTXN_IS_ABORTED 0x0800
+#define RBTXN_PREPARE_STATUS_MASK (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE | RBTXN_SENT_PREPARE)
+
/* Does the transaction have catalog changes? */
#define rbtxn_has_catalog_changes(txn) \
( \
@@ -234,9 +236,9 @@ typedef struct ReorderBufferChange
* committed. To check whether a prepare or a stream_prepare has already
* been sent for this transaction, we need to use rbtxn_sent_prepare().
*/
-#define rbtxn_prepared(txn) \
+#define rbtxn_is_prepared(txn) \
( \
- ((txn)->txn_flags & RBTXN_PREPARE) != 0 \
+ ((txn)->txn_flags & RBTXN_IS_PREPARED) != 0 \
)
/* Has a prepare or stream_prepare already been sent? */