aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2020-07-20 08:48:26 +0530
committerAmit Kapila <akapila@postgresql.org>2020-07-20 08:48:26 +0530
commit0bead9af484c1d0a67e690fda47011addaa5bc9d (patch)
tree246db2163552a439ac1b7903fd36cb02e73cea3c
parentd05b172a760e0ccb3008a2144f96053720000b12 (diff)
downloadpostgresql-0bead9af484c1d0a67e690fda47011addaa5bc9d.tar.gz
postgresql-0bead9af484c1d0a67e690fda47011addaa5bc9d.zip
Immediately WAL-log subtransaction and top-level XID association.
The logical decoding infrastructure needs to know which top-level transaction the subxact belongs to, in order to decode all the changes. Until now that might be delayed until commit, due to the caching (GPROC_MAX_CACHED_SUBXIDS), preventing features requiring incremental decoding. So we also write the assignment info into WAL immediately, as part of the next WAL record (to minimize overhead) only when wal_level=logical. We can not remove the existing XLOG_XACT_ASSIGNMENT WAL as that is required for avoiding overflow in the hot standby snapshot. Bump XLOG_PAGE_MAGIC, since this introduces XLR_BLOCK_ID_TOPLEVEL_XID. Author: Tomas Vondra, Dilip Kumar, Amit Kapila Reviewed-by: Amit Kapila Tested-by: Neha Sharma and Mahendra Singh Thalor Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
-rw-r--r--src/backend/access/transam/xact.c50
-rw-r--r--src/backend/access/transam/xloginsert.c23
-rw-r--r--src/backend/access/transam/xlogreader.c5
-rw-r--r--src/backend/replication/logical/decode.c44
-rw-r--r--src/include/access/xact.h3
-rw-r--r--src/include/access/xlog.h1
-rw-r--r--src/include/access/xlog_internal.h2
-rw-r--r--src/include/access/xlogreader.h3
-rw-r--r--src/include/access/xlogrecord.h1
9 files changed, 108 insertions, 24 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b3ee7fa7ea0..bd4c3cf3258 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -191,6 +191,7 @@ typedef struct TransactionStateData
bool didLogXid; /* has xid been included in WAL record? */
int parallelModeLevel; /* Enter/ExitParallelMode counter */
bool chain; /* start a new block after this one */
+ bool assigned; /* assigned to top-level XID */
struct TransactionStateData *parent; /* back link to parent */
} TransactionStateData;
@@ -223,6 +224,7 @@ typedef struct SerializedTransactionState
static TransactionStateData TopTransactionStateData = {
.state = TRANS_DEFAULT,
.blockState = TBLOCK_DEFAULT,
+ .assigned = false,
};
/*
@@ -5120,6 +5122,7 @@ PushTransaction(void)
GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
s->prevXactReadOnly = XactReadOnly;
s->parallelModeLevel = 0;
+ s->assigned = false;
CurrentTransactionState = s;
@@ -6022,3 +6025,50 @@ xact_redo(XLogReaderState *record)
else
elog(PANIC, "xact_redo: unknown op code %u", info);
}
+
+/*
+ * IsSubTransactionAssignmentPending
+ *
+ * This is used to decide whether we need to WAL log the top-level XID for
+ * operation in a subtransaction. We require that for logical decoding, see
+ * LogicalDecodingProcessRecord.
+ *
+ * This returns true if wal_level >= logical and we are inside a valid
+ * subtransaction, for which the assignment was not yet written to any WAL
+ * record.
+ */
+bool
+IsSubTransactionAssignmentPending(void)
+{
+ /* wal_level has to be logical */
+ if (!XLogLogicalInfoActive())
+ return false;
+
+ /* we need to be in a transaction state */
+ if (!IsTransactionState())
+ return false;
+
+ /* it has to be a subtransaction */
+ if (!IsSubTransaction())
+ return false;
+
+ /* the subtransaction has to have a XID assigned */
+ if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny()))
+ return false;
+
+ /* and it should not be already 'assigned' */
+ return !CurrentTransactionState->assigned;
+}
+
+/*
+ * MarkSubTransactionAssigned
+ *
+ * Mark the subtransaction assignment as completed.
+ */
+void
+MarkSubTransactionAssigned(void)
+{
+ Assert(IsSubTransactionAssignmentPending());
+
+ CurrentTransactionState->assigned = true;
+}
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index b21679f09eb..c526bb19281 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -89,11 +89,13 @@ static XLogRecData hdr_rdt;
static char *hdr_scratch = NULL;
#define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char))
+#define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char))
#define HEADER_SCRATCH_SIZE \
(SizeOfXLogRecord + \
MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
- SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
+ SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
+ SizeOfXLogTransactionId)
/*
* An array of XLogRecData structs, to hold registered data.
@@ -195,6 +197,10 @@ XLogResetInsertion(void)
{
int i;
+ /* reset the subxact assignment flag (if needed) */
+ if (curinsert_flags & XLOG_INCLUDE_XID)
+ MarkSubTransactionAssigned();
+
for (i = 0; i < max_registered_block_id; i++)
registered_buffers[i].in_use = false;
@@ -398,7 +404,7 @@ void
XLogSetRecordFlags(uint8 flags)
{
Assert(begininsert_called);
- curinsert_flags = flags;
+ curinsert_flags |= flags;
}
/*
@@ -748,6 +754,19 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
scratch += sizeof(replorigin_session_origin);
}
+ /* followed by toplevel XID, if not already included in previous record */
+ if (IsSubTransactionAssignmentPending())
+ {
+ TransactionId xid = GetTopTransactionIdIfAny();
+
+ /* update the flag (later used by XLogResetInsertion) */
+ XLogSetRecordFlags(XLOG_INCLUDE_XID);
+
+ *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
+ memcpy(scratch, &xid, sizeof(TransactionId));
+ scratch += sizeof(TransactionId);
+ }
+
/* followed by main data, if any */
if (mainrdata_len > 0)
{
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index cb76be4f469..a757baccfc5 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1197,6 +1197,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
state->decoded_record = record;
state->record_origin = InvalidRepOriginId;
+ state->toplevel_xid = InvalidTransactionId;
ptr = (char *) record;
ptr += SizeOfXLogRecord;
@@ -1235,6 +1236,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
{
COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
}
+ else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
+ {
+ COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId));
+ }
else if (block_id <= XLR_MAX_BLOCK_ID)
{
/* XLogRecordBlockHeader */
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c2e5e3abf82..0c0c3717391 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -94,11 +94,27 @@ void
LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
{
XLogRecordBuffer buf;
+ TransactionId txid;
buf.origptr = ctx->reader->ReadRecPtr;
buf.endptr = ctx->reader->EndRecPtr;
buf.record = record;
+ txid = XLogRecGetTopXid(record);
+
+ /*
+ * If the top-level xid is valid, we need to assign the subxact to the
+ * top-level xact. We need to do this for all records, hence we do it
+ * before the switch.
+ */
+ if (TransactionIdIsValid(txid))
+ {
+ ReorderBufferAssignChild(ctx->reorder,
+ txid,
+ record->decoded_record->xl_xid,
+ buf.origptr);
+ }
+
/* cast so we get a warning when new rmgrs are added */
switch ((RmgrId) XLogRecGetRmid(record))
{
@@ -216,13 +232,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* If the snapshot isn't yet fully built, we cannot decode anything, so
* bail out.
- *
- * However, it's critical to process XLOG_XACT_ASSIGNMENT records even
- * when the snapshot is being built: it is possible to get later records
- * that require subxids to be properly assigned.
*/
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT &&
- info != XLOG_XACT_ASSIGNMENT)
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
switch (info)
@@ -264,22 +275,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
break;
}
case XLOG_XACT_ASSIGNMENT:
- {
- xl_xact_assignment *xlrec;
- int i;
- TransactionId *sub_xid;
- xlrec = (xl_xact_assignment *) XLogRecGetData(r);
-
- sub_xid = &xlrec->xsub[0];
-
- for (i = 0; i < xlrec->nsubxacts; i++)
- {
- ReorderBufferAssignChild(reorder, xlrec->xtop,
- *(sub_xid++), buf->origptr);
- }
- break;
- }
+ /*
+ * We assign subxact to the toplevel xact while processing each
+ * record if required. So, we don't need to do anything here.
+ * See LogicalDecodingProcessRecord.
+ */
+ break;
case XLOG_XACT_PREPARE:
/*
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index db191879b9d..aef85553674 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -428,6 +428,9 @@ extern void UnregisterXactCallback(XactCallback callback, void *arg);
extern void RegisterSubXactCallback(SubXactCallback callback, void *arg);
extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
+extern bool IsSubTransactionAssignmentPending(void);
+extern void MarkSubTransactionAssigned(void);
+
extern int xactGetCommittedChildren(TransactionId **ptr);
extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 5b143348879..d8391aa3783 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -237,6 +237,7 @@ extern bool XLOG_DEBUG;
*/
#define XLOG_INCLUDE_ORIGIN 0x01 /* include the replication origin */
#define XLOG_MARK_UNIMPORTANT 0x02 /* record not important for durability */
+#define XLOG_INCLUDE_XID 0x04 /* include XID of top-level xact */
/* Checkpoint statistics */
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 88f3d767007..b9490a3afef 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -31,7 +31,7 @@
/*
* Each page of XLOG file has a header like this:
*/
-#define XLOG_PAGE_MAGIC 0xD106 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD107 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index b0f2a6ed43a..b9768822291 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -191,6 +191,8 @@ struct XLogReaderState
RepOriginId record_origin;
+ TransactionId toplevel_xid; /* XID of top-level transaction */
+
/* information about blocks referenced by the record. */
DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
@@ -304,6 +306,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
#define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid)
#define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid)
#define XLogRecGetOrigin(decoder) ((decoder)->record_origin)
+#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid)
#define XLogRecGetData(decoder) ((decoder)->main_data)
#define XLogRecGetDataLen(decoder) ((decoder)->main_data_len)
#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index acd9af0194d..2f0c8bf5896 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -223,5 +223,6 @@ typedef struct XLogRecordDataHeaderLong
#define XLR_BLOCK_ID_DATA_SHORT 255
#define XLR_BLOCK_ID_DATA_LONG 254
#define XLR_BLOCK_ID_ORIGIN 253
+#define XLR_BLOCK_ID_TOPLEVEL_XID 252
#endif /* XLOGRECORD_H */