diff options
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r-- | src/backend/replication/logical/decode.c | 131 |
1 files changed, 0 insertions, 131 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c6ea7c98e15..6303647fe0f 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -42,7 +42,6 @@ #include "replication/reorderbuffer.h" #include "replication/snapbuild.h" #include "storage/standby.h" -#include "commands/sequence.h" /* individual record(group)'s handlers */ static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -64,7 +63,6 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); -static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple); /* helper functions for decoding transactions */ static inline bool FilterPrepare(LogicalDecodingContext *ctx, @@ -1252,132 +1250,3 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) || ctx->fast_forward || FilterByOrigin(ctx, origin_id)); } - -/* - * DecodeSeqTuple - * decode tuple describing the sequence increment - * - * Sequences are represented as a table with a single row, which gets updated - * by nextval(). The tuple is stored in WAL right after the xl_seq_rec, so we - * simply copy it into the tuplebuf (similar to seq_redo). - */ -static void -DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) -{ - int datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader; - - Assert(datalen >= 0); - - tuple->tuple.t_len = datalen + SizeofHeapTupleHeader; - - ItemPointerSetInvalid(&tuple->tuple.t_self); - - tuple->tuple.t_tableOid = InvalidOid; - - memcpy(((char *) tuple->tuple.t_data), - data + sizeof(xl_seq_rec), - SizeofHeapTupleHeader); - - memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader, - data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader, - datalen); -} - -/* - * Handle sequence decode - * - * Decoding sequences is a bit tricky, because while most sequence actions - * are non-transactional (not subject to rollback), some need to be handled - * as transactional. - * - * By default, a sequence increment is non-transactional - we must not queue - * it in a transaction as other changes, because the transaction might get - * rolled back and we'd discard the increment. The downstream would not be - * notified about the increment, which is wrong. - * - * On the other hand, the sequence may be created in a transaction. In this - * case we *should* queue the change as other changes in the transaction, - * because we don't want to send the increments for unknown sequence to the - * plugin - it might get confused about which sequence it's related to etc. - */ -void -sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) -{ - SnapBuild *builder = ctx->snapshot_builder; - ReorderBufferTupleBuf *tuplebuf; - RelFileNode target_node; - XLogReaderState *r = buf->record; - char *tupledata = NULL; - Size tuplelen; - Size datalen = 0; - TransactionId xid = XLogRecGetXid(r); - uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; - xl_seq_rec *xlrec; - Snapshot snapshot; - RepOriginId origin_id = XLogRecGetOrigin(r); - bool transactional; - - /* only decode changes flagged with XLOG_SEQ_LOG */ - if (info != XLOG_SEQ_LOG) - elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info); - - ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); - - /* - * If we don't have snapshot or we are just fast-forwarding, there is no - * point in decoding messages. - */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || - ctx->fast_forward) - return; - - /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); - if (target_node.dbNode != ctx->slot->data.database) - return; - - /* output plugin doesn't look for this origin, no need to queue */ - if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) - return; - - tupledata = XLogRecGetData(r); - datalen = XLogRecGetDataLen(r); - tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec); - - /* extract the WAL record, with "created" flag */ - xlrec = (xl_seq_rec *) XLogRecGetData(r); - - /* XXX how could we have sequence change without data? */ - if(!datalen || !tupledata) - return; - - tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); - DecodeSeqTuple(tupledata, datalen, tuplebuf); - - /* - * Should we handle the sequence increment as transactional or not? - * - * If the sequence was created in a still-running transaction, treat - * it as transactional and queue the increments. Otherwise it needs - * to be treated as non-transactional, in which case we send it to - * the plugin right away. - */ - transactional = ReorderBufferSequenceIsTransactional(ctx->reorder, - target_node, - xlrec->created); - - /* Skip the change if already processed (per the snapshot). */ - if (transactional && - !SnapBuildProcessChange(builder, xid, buf->origptr)) - return; - else if (!transactional && - (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT || - SnapBuildXactNeedsSkip(builder, buf->origptr))) - return; - - /* Queue the increment (or send immediately if not transactional). */ - snapshot = SnapBuildGetOrBuildSnapshot(builder, xid); - ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr, - origin_id, target_node, transactional, - xlrec->created, tuplebuf); -} |