aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/decode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r--src/backend/replication/logical/decode.c131
1 files changed, 0 insertions, 131 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c6ea7c98e15..6303647fe0f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -42,7 +42,6 @@
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "storage/standby.h"
-#include "commands/sequence.h"
/* individual record(group)'s handlers */
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -64,7 +63,6 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
/* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
-static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
/* helper functions for decoding transactions */
static inline bool FilterPrepare(LogicalDecodingContext *ctx,
@@ -1252,132 +1250,3 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
ctx->fast_forward || FilterByOrigin(ctx, origin_id));
}
-
-/*
- * DecodeSeqTuple
- * decode tuple describing the sequence increment
- *
- * Sequences are represented as a table with a single row, which gets updated
- * by nextval(). The tuple is stored in WAL right after the xl_seq_rec, so we
- * simply copy it into the tuplebuf (similar to seq_redo).
- */
-static void
-DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
-{
- int datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader;
-
- Assert(datalen >= 0);
-
- tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
-
- ItemPointerSetInvalid(&tuple->tuple.t_self);
-
- tuple->tuple.t_tableOid = InvalidOid;
-
- memcpy(((char *) tuple->tuple.t_data),
- data + sizeof(xl_seq_rec),
- SizeofHeapTupleHeader);
-
- memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
- data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader,
- datalen);
-}
-
-/*
- * Handle sequence decode
- *
- * Decoding sequences is a bit tricky, because while most sequence actions
- * are non-transactional (not subject to rollback), some need to be handled
- * as transactional.
- *
- * By default, a sequence increment is non-transactional - we must not queue
- * it in a transaction as other changes, because the transaction might get
- * rolled back and we'd discard the increment. The downstream would not be
- * notified about the increment, which is wrong.
- *
- * On the other hand, the sequence may be created in a transaction. In this
- * case we *should* queue the change as other changes in the transaction,
- * because we don't want to send the increments for unknown sequence to the
- * plugin - it might get confused about which sequence it's related to etc.
- */
-void
-sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
-{
- SnapBuild *builder = ctx->snapshot_builder;
- ReorderBufferTupleBuf *tuplebuf;
- RelFileNode target_node;
- XLogReaderState *r = buf->record;
- char *tupledata = NULL;
- Size tuplelen;
- Size datalen = 0;
- TransactionId xid = XLogRecGetXid(r);
- uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
- xl_seq_rec *xlrec;
- Snapshot snapshot;
- RepOriginId origin_id = XLogRecGetOrigin(r);
- bool transactional;
-
- /* only decode changes flagged with XLOG_SEQ_LOG */
- if (info != XLOG_SEQ_LOG)
- elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info);
-
- ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
-
- /*
- * If we don't have snapshot or we are just fast-forwarding, there is no
- * point in decoding messages.
- */
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
- ctx->fast_forward)
- return;
-
- /* only interested in our database */
- XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
- if (target_node.dbNode != ctx->slot->data.database)
- return;
-
- /* output plugin doesn't look for this origin, no need to queue */
- if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
- return;
-
- tupledata = XLogRecGetData(r);
- datalen = XLogRecGetDataLen(r);
- tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec);
-
- /* extract the WAL record, with "created" flag */
- xlrec = (xl_seq_rec *) XLogRecGetData(r);
-
- /* XXX how could we have sequence change without data? */
- if(!datalen || !tupledata)
- return;
-
- tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
- DecodeSeqTuple(tupledata, datalen, tuplebuf);
-
- /*
- * Should we handle the sequence increment as transactional or not?
- *
- * If the sequence was created in a still-running transaction, treat
- * it as transactional and queue the increments. Otherwise it needs
- * to be treated as non-transactional, in which case we send it to
- * the plugin right away.
- */
- transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
- target_node,
- xlrec->created);
-
- /* Skip the change if already processed (per the snapshot). */
- if (transactional &&
- !SnapBuildProcessChange(builder, xid, buf->origptr))
- return;
- else if (!transactional &&
- (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
- SnapBuildXactNeedsSkip(builder, buf->origptr)))
- return;
-
- /* Queue the increment (or send immediately if not transactional). */
- snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
- ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr,
- origin_id, target_node, transactional,
- xlrec->created, tuplebuf);
-}