aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/decode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r--src/backend/replication/logical/decode.c77
1 files changed, 43 insertions, 34 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 414cfa95586..7b6114a2097 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -9,12 +9,12 @@
*
* NOTE:
* This basically tries to handle all low level xlog stuff for
- * reorderbuffer.c and snapbuild.c. There's some minor leakage where a
- * specific record's struct is used to pass data along, but those just
- * happen to contain the right amount of data in a convenient
- * format. There isn't and shouldn't be much intelligence about the
- * contents of records in here except turning them into a more usable
- * format.
+ * reorderbuffer.c and snapbuild.c. There's some minor leakage where a
+ * specific record's struct is used to pass data along, but those just
+ * happen to contain the right amount of data in a convenient
+ * format. There isn't and shouldn't be much intelligence about the
+ * contents of records in here except turning them into a more usable
+ * format.
*
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
@@ -44,10 +44,10 @@
typedef struct XLogRecordBuffer
{
- XLogRecPtr origptr;
- XLogRecPtr endptr;
- XLogRecord record;
- char *record_data;
+ XLogRecPtr origptr;
+ XLogRecPtr endptr;
+ XLogRecord record;
+ char *record_data;
} XLogRecordBuffer;
/* RMGR Handlers */
@@ -63,10 +63,10 @@ static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- TransactionId xid, Oid dboid,
- TimestampTz commit_time,
- int nsubxacts, TransactionId *sub_xids,
- int ninval_msgs, SharedInvalidationMessage *msg);
+ TransactionId xid, Oid dboid,
+ TimestampTz commit_time,
+ int nsubxacts, TransactionId *sub_xids,
+ int ninval_msgs, SharedInvalidationMessage *msg);
static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn,
TransactionId xid, TransactionId *sub_xids, int nsubxacts);
@@ -91,10 +91,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record)
/* cast so we get a warning when new rmgrs are added */
switch ((RmgrIds) buf.record.xl_rmid)
{
- /*
- * Rmgrs we care about for logical decoding. Add new rmgrs in
- * rmgrlist.h's order.
- */
+ /*
+ * Rmgrs we care about for logical decoding. Add new rmgrs in
+ * rmgrlist.h's order.
+ */
case RM_XLOG_ID:
DecodeXLogOp(ctx, &buf);
break;
@@ -115,11 +115,11 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record)
DecodeHeapOp(ctx, &buf);
break;
- /*
- * Rmgrs irrelevant for logical decoding; they describe stuff not
- * represented in logical decoding. Add new rmgrs in rmgrlist.h's
- * order.
- */
+ /*
+ * Rmgrs irrelevant for logical decoding; they describe stuff not
+ * represented in logical decoding. Add new rmgrs in rmgrlist.h's
+ * order.
+ */
case RM_SMGR_ID:
case RM_CLOG_ID:
case RM_DBASE_ID:
@@ -149,13 +149,14 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
switch (info)
{
- /* this is also used in END_OF_RECOVERY checkpoints */
+ /* this is also used in END_OF_RECOVERY checkpoints */
case XLOG_CHECKPOINT_SHUTDOWN:
case XLOG_END_OF_RECOVERY:
SnapBuildSerializationPoint(builder, buf->origptr);
break;
case XLOG_CHECKPOINT_ONLINE:
+
/*
* a RUNNING_XACTS record will have been logged near to this, we
* can restart from there.
@@ -181,9 +182,9 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
static void
DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
- SnapBuild *builder = ctx->snapshot_builder;
- ReorderBuffer *reorder = ctx->reorder;
- XLogRecord *r = &buf->record;
+ SnapBuild *builder = ctx->snapshot_builder;
+ ReorderBuffer *reorder = ctx->reorder;
+ XLogRecord *r = &buf->record;
uint8 info = r->xl_info & ~XLR_INFO_MASK;
/* no point in doing anything yet, data could not be decoded anyway */
@@ -280,7 +281,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
int i;
TransactionId *sub_xid;
- xlrec = (xl_xact_assignment *) buf->record_data;
+ xlrec = (xl_xact_assignment *) buf->record_data;
sub_xid = &xlrec->xsub[0];
@@ -292,6 +293,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
break;
}
case XLOG_XACT_PREPARE:
+
/*
* Currently decoding ignores PREPARE TRANSACTION and will just
* decode the transaction when the COMMIT PREPARED is sent or
@@ -321,7 +323,9 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
case XLOG_RUNNING_XACTS:
{
xl_running_xacts *running = (xl_running_xacts *) buf->record_data;
+
SnapBuildProcessRunningXacts(builder, buf->origptr, running);
+
/*
* Abort all transactions that we keep track of, that are
* older than the record's oldestRunningXid. This is the most
@@ -364,22 +368,25 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
case XLOG_HEAP2_NEW_CID:
{
xl_heap_new_cid *xlrec;
+
xlrec = (xl_heap_new_cid *) buf->record_data;
SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
break;
}
case XLOG_HEAP2_REWRITE:
+
/*
* Although these records only exist to serve the needs of logical
* decoding, all the work happens as part of crash or archive
* recovery, so we don't need to do anything here.
*/
break;
- /*
- * Everything else here is just low level physical stuff we're
- * not interested in.
- */
+
+ /*
+ * Everything else here is just low level physical stuff we're not
+ * interested in.
+ */
case XLOG_HEAP2_FREEZE_PAGE:
case XLOG_HEAP2_CLEAN:
case XLOG_HEAP2_CLEANUP_INFO:
@@ -429,6 +436,7 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
break;
case XLOG_HEAP_NEWPAGE:
+
/*
* This is only used in places like indexams and CLUSTER which
* don't contain changes relevant for logical replication.
@@ -436,6 +444,7 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
break;
case XLOG_HEAP_INPLACE:
+
/*
* Inplace updates are only ever performed on catalog tuples and
* can, per definition, not change tuple visibility. Since we
@@ -503,8 +512,8 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* There basically two reasons we might not be interested in this
* transaction:
* 1) We might not be interested in decoding transactions up to this
- * LSN. This can happen because we previously decoded it and now just
- * are restarting or if we haven't assembled a consistent snapshot yet.
+ * LSN. This can happen because we previously decoded it and now just
+ * are restarting or if we haven't assembled a consistent snapshot yet.
* 2) The transaction happened in another database.
*
* We can't just use ReorderBufferAbort() here, because we need to execute