aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r--src/backend/replication/logical/decode.c131
-rw-r--r--src/backend/replication/logical/logical.c88
-rw-r--r--src/backend/replication/logical/proto.c52
-rw-r--r--src/backend/replication/logical/reorderbuffer.c405
-rw-r--r--src/backend/replication/logical/tablesync.c109
-rw-r--r--src/backend/replication/logical/worker.c56
6 files changed, 4 insertions, 837 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c6ea7c98e15..6303647fe0f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -42,7 +42,6 @@
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "storage/standby.h"
-#include "commands/sequence.h"
/* individual record(group)'s handlers */
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -64,7 +63,6 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
/* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
-static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
/* helper functions for decoding transactions */
static inline bool FilterPrepare(LogicalDecodingContext *ctx,
@@ -1252,132 +1250,3 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
ctx->fast_forward || FilterByOrigin(ctx, origin_id));
}
-
-/*
- * DecodeSeqTuple
- * decode tuple describing the sequence increment
- *
- * Sequences are represented as a table with a single row, which gets updated
- * by nextval(). The tuple is stored in WAL right after the xl_seq_rec, so we
- * simply copy it into the tuplebuf (similar to seq_redo).
- */
-static void
-DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
-{
- int datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader;
-
- Assert(datalen >= 0);
-
- tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
-
- ItemPointerSetInvalid(&tuple->tuple.t_self);
-
- tuple->tuple.t_tableOid = InvalidOid;
-
- memcpy(((char *) tuple->tuple.t_data),
- data + sizeof(xl_seq_rec),
- SizeofHeapTupleHeader);
-
- memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
- data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader,
- datalen);
-}
-
-/*
- * Handle sequence decode
- *
- * Decoding sequences is a bit tricky, because while most sequence actions
- * are non-transactional (not subject to rollback), some need to be handled
- * as transactional.
- *
- * By default, a sequence increment is non-transactional - we must not queue
- * it in a transaction as other changes, because the transaction might get
- * rolled back and we'd discard the increment. The downstream would not be
- * notified about the increment, which is wrong.
- *
- * On the other hand, the sequence may be created in a transaction. In this
- * case we *should* queue the change as other changes in the transaction,
- * because we don't want to send the increments for unknown sequence to the
- * plugin - it might get confused about which sequence it's related to etc.
- */
-void
-sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
-{
- SnapBuild *builder = ctx->snapshot_builder;
- ReorderBufferTupleBuf *tuplebuf;
- RelFileNode target_node;
- XLogReaderState *r = buf->record;
- char *tupledata = NULL;
- Size tuplelen;
- Size datalen = 0;
- TransactionId xid = XLogRecGetXid(r);
- uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
- xl_seq_rec *xlrec;
- Snapshot snapshot;
- RepOriginId origin_id = XLogRecGetOrigin(r);
- bool transactional;
-
- /* only decode changes flagged with XLOG_SEQ_LOG */
- if (info != XLOG_SEQ_LOG)
- elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info);
-
- ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
-
- /*
- * If we don't have snapshot or we are just fast-forwarding, there is no
- * point in decoding messages.
- */
- if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
- ctx->fast_forward)
- return;
-
- /* only interested in our database */
- XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
- if (target_node.dbNode != ctx->slot->data.database)
- return;
-
- /* output plugin doesn't look for this origin, no need to queue */
- if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
- return;
-
- tupledata = XLogRecGetData(r);
- datalen = XLogRecGetDataLen(r);
- tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec);
-
- /* extract the WAL record, with "created" flag */
- xlrec = (xl_seq_rec *) XLogRecGetData(r);
-
- /* XXX how could we have sequence change without data? */
- if(!datalen || !tupledata)
- return;
-
- tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
- DecodeSeqTuple(tupledata, datalen, tuplebuf);
-
- /*
- * Should we handle the sequence increment as transactional or not?
- *
- * If the sequence was created in a still-running transaction, treat
- * it as transactional and queue the increments. Otherwise it needs
- * to be treated as non-transactional, in which case we send it to
- * the plugin right away.
- */
- transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
- target_node,
- xlrec->created);
-
- /* Skip the change if already processed (per the snapshot). */
- if (transactional &&
- !SnapBuildProcessChange(builder, xid, buf->origptr))
- return;
- else if (!transactional &&
- (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
- SnapBuildXactNeedsSkip(builder, buf->origptr)))
- return;
-
- /* Queue the increment (or send immediately if not transactional). */
- snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
- ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr,
- origin_id, target_node, transactional,
- xlrec->created, tuplebuf);
-}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 30e33dace33..788769dd738 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -73,10 +73,6 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
-static void sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
- XLogRecPtr sequence_lsn, Relation rel,
- bool transactional,
- int64 last_value, int64 log_cnt, bool is_called);
/* streaming callbacks */
static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -94,10 +90,6 @@ static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn
static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
-static void stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
- XLogRecPtr sequence_lsn, Relation rel,
- bool transactional,
- int64 last_value, int64 log_cnt, bool is_called);
static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change);
@@ -226,7 +218,6 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->apply_truncate = truncate_cb_wrapper;
ctx->reorder->commit = commit_cb_wrapper;
ctx->reorder->message = message_cb_wrapper;
- ctx->reorder->sequence = sequence_cb_wrapper;
/*
* To support streaming, we require start/stop/abort/commit/change
@@ -243,7 +234,6 @@ StartupDecodingContext(List *output_plugin_options,
(ctx->callbacks.stream_commit_cb != NULL) ||
(ctx->callbacks.stream_change_cb != NULL) ||
(ctx->callbacks.stream_message_cb != NULL) ||
- (ctx->callbacks.stream_sequence_cb != NULL) ||
(ctx->callbacks.stream_truncate_cb != NULL);
/*
@@ -261,7 +251,6 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->stream_commit = stream_commit_cb_wrapper;
ctx->reorder->stream_change = stream_change_cb_wrapper;
ctx->reorder->stream_message = stream_message_cb_wrapper;
- ctx->reorder->stream_sequence = stream_sequence_cb_wrapper;
ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
@@ -1217,42 +1206,6 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
}
static void
-sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
- XLogRecPtr sequence_lsn, Relation rel, bool transactional,
- int64 last_value, int64 log_cnt, bool is_called)
-{
- LogicalDecodingContext *ctx = cache->private_data;
- LogicalErrorCallbackState state;
- ErrorContextCallback errcallback;
-
- Assert(!ctx->fast_forward);
-
- if (ctx->callbacks.sequence_cb == NULL)
- return;
-
- /* Push callback + info on the error context stack */
- state.ctx = ctx;
- state.callback_name = "sequence";
- state.report_location = sequence_lsn;
- errcallback.callback = output_plugin_error_callback;
- errcallback.arg = (void *) &state;
- errcallback.previous = error_context_stack;
- error_context_stack = &errcallback;
-
- /* set output state */
- ctx->accept_writes = true;
- ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
- ctx->write_location = sequence_lsn;
-
- /* do the actual work: call callback */
- ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional,
- last_value, log_cnt, is_called);
-
- /* Pop the error context stack */
- error_context_stack = errcallback.previous;
-}
-
-static void
stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr first_lsn)
{
@@ -1558,47 +1511,6 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
}
static void
-stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
- XLogRecPtr sequence_lsn, Relation rel,
- bool transactional,
- int64 last_value, int64 log_cnt, bool is_called)
-{
- LogicalDecodingContext *ctx = cache->private_data;
- LogicalErrorCallbackState state;
- ErrorContextCallback errcallback;
-
- Assert(!ctx->fast_forward);
-
- /* We're only supposed to call this when streaming is supported. */
- Assert(ctx->streaming);
-
- /* this callback is optional */
- if (ctx->callbacks.stream_sequence_cb == NULL)
- return;
-
- /* Push callback + info on the error context stack */
- state.ctx = ctx;
- state.callback_name = "stream_sequence";
- state.report_location = sequence_lsn;
- errcallback.callback = output_plugin_error_callback;
- errcallback.arg = (void *) &state;
- errcallback.previous = error_context_stack;
- error_context_stack = &errcallback;
-
- /* set output state */
- ctx->accept_writes = true;
- ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
- ctx->write_location = sequence_lsn;
-
- /* do the actual work: call callback */
- ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional,
- last_value, log_cnt, is_called);
-
- /* Pop the error context stack */
- error_context_stack = errcallback.previous;
-}
-
-static void
stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
int nrelations, Relation relations[],
ReorderBufferChange *change)
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 18d3cbb9248..ff8513e2d29 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -663,56 +663,6 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
}
/*
- * Write SEQUENCE to stream
- */
-void
-logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid,
- XLogRecPtr lsn, bool transactional,
- int64 last_value, int64 log_cnt, bool is_called)
-{
- uint8 flags = 0;
- char *relname;
-
- pq_sendbyte(out, LOGICAL_REP_MSG_SEQUENCE);
-
- /* transaction ID (if not valid, we're not streaming) */
- if (TransactionIdIsValid(xid))
- pq_sendint32(out, xid);
-
- pq_sendint8(out, flags);
- pq_sendint64(out, lsn);
-
- logicalrep_write_namespace(out, RelationGetNamespace(rel));
- relname = RelationGetRelationName(rel);
- pq_sendstring(out, relname);
-
- pq_sendint8(out, transactional);
- pq_sendint64(out, last_value);
- pq_sendint64(out, log_cnt);
- pq_sendint8(out, is_called);
-}
-
-/*
- * Read SEQUENCE from the stream.
- */
-void
-logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata)
-{
- /* XXX skipping flags and lsn */
- pq_getmsgint(in, 1);
- pq_getmsgint64(in);
-
- /* Read relation name from stream */
- seqdata->nspname = pstrdup(logicalrep_read_namespace(in));
- seqdata->seqname = pstrdup(pq_getmsgstring(in));
-
- seqdata->transactional = pq_getmsgint(in, 1);
- seqdata->last_value = pq_getmsgint64(in);
- seqdata->log_cnt = pq_getmsgint64(in);
- seqdata->is_called = pq_getmsgint(in, 1);
-}
-
-/*
* Write relation description to the output stream.
*/
void
@@ -1286,8 +1236,6 @@ logicalrep_message_type(LogicalRepMsgType action)
return "STREAM ABORT";
case LOGICAL_REP_MSG_STREAM_PREPARE:
return "STREAM PREPARE";
- case LOGICAL_REP_MSG_SEQUENCE:
- return "SEQUENCE";
}
elog(ERROR, "invalid logical replication message type \"%c\"", action);
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 4702750a2e7..5adc016d449 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -77,40 +77,6 @@
* a bit more memory to the oldest subtransactions, because it's likely
* they are the source for the next sequence of changes.
*
- * When decoding sequences, we differentiate between a sequences created
- * in a (running) transaction, and sequences created in other (already
- * committed) transactions. Changes for sequences created in the same
- * top-level transaction are treated as "transactional" i.e. just like
- * any other change from that transaction (and discarded in case of a
- * rollback). Changes for sequences created earlier are treated as not
- * transactional - are processed immediately, as if performed outside
- * any transaction (and thus not rolled back).
- *
- * This mixed behavior is necessary - sequences are non-transactional
- * (e.g. ROLLBACK does not undo the sequence increments). But for new
- * sequences, we need to handle them in a transactional way, because if
- * we ever get some DDL support, the sequence won't exist until the
- * transaction gets applied. So we need to ensure the increments don't
- * happen until the sequence gets created.
- *
- * To differentiate which sequences are "old" and which were created
- * in a still-running transaction, we track sequences created in running
- * transactions in a hash table. Sequences are identified by relfilenode,
- * and we track XID of the (sub)transaction that created it. This means
- * that if a transaction does something that changes the relfilenode
- * (like an alter / reset of a sequence), the new relfilenode will be
- * treated as if created in the transaction. The list of sequences gets
- * discarded when the transaction completes (commit/rollback).
- *
- * We don't use the XID to check if it's the same top-level transaction.
- * It's enough to know it was created in an in-progress transaction,
- * and we know it must be the current one because otherwise it wouldn't
- * see the sequence object.
- *
- * The XID may be valid even for non-transactional sequences - we simply
- * keep the XID logged to WAL, it's up to the reorderbuffer to decide if
- * the increment is transactional.
- *
* -------------------------------------------------------------------------
*/
#include "postgres.h"
@@ -125,7 +91,6 @@
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
-#include "commands/sequence.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -151,13 +116,6 @@ typedef struct ReorderBufferTXNByIdEnt
ReorderBufferTXN *txn;
} ReorderBufferTXNByIdEnt;
-/* entry for hash table we use to track sequences created in running xacts */
-typedef struct ReorderBufferSequenceEnt
-{
- RelFileNode rnode;
- TransactionId xid;
-} ReorderBufferSequenceEnt;
-
/* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
typedef struct ReorderBufferTupleCidKey
{
@@ -388,14 +346,6 @@ ReorderBufferAllocate(void)
buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
- /* hash table of sequences, mapping relfilenode to XID of transaction */
- hash_ctl.keysize = sizeof(RelFileNode);
- hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt);
- hash_ctl.hcxt = buffer->context;
-
- buffer->sequences = hash_create("ReorderBufferSequenceHash", 1000, &hash_ctl,
- HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
buffer->by_txn_last_xid = InvalidTransactionId;
buffer->by_txn_last_txn = NULL;
@@ -582,13 +532,6 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
change->data.truncate.relids = NULL;
}
break;
- case REORDER_BUFFER_CHANGE_SEQUENCE:
- if (change->data.sequence.tuple)
- {
- ReorderBufferReturnTupleBuf(rb, change->data.sequence.tuple);
- change->data.sequence.tuple = NULL;
- }
- break;
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
@@ -924,230 +867,6 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
}
/*
- * Treat the sequence increment as transactional?
- *
- * The hash table tracks all sequences created in in-progress transactions,
- * so we simply do a lookup (the sequence is identified by relfilende). If
- * we find a match, the increment should be handled as transactional.
- */
-bool
-ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
- RelFileNode rnode, bool created)
-{
- bool found = false;
-
- if (created)
- return true;
-
- hash_search(rb->sequences,
- (void *) &rnode,
- HASH_FIND,
- &found);
-
- return found;
-}
-
-/*
- * Cleanup sequences created in in-progress transactions.
- *
- * There's no way to search by XID, so we simply do a seqscan of all
- * the entries in the hash table. Hopefully there are only a couple
- * entries in most cases - people generally don't create many new
- * sequences over and over.
- */
-static void
-ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid)
-{
- HASH_SEQ_STATUS scan_status;
- ReorderBufferSequenceEnt *ent;
-
- hash_seq_init(&scan_status, rb->sequences);
- while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
- {
- /* skip sequences not from this transaction */
- if (ent->xid != xid)
- continue;
-
- (void) hash_search(rb->sequences,
- (void *) &(ent->rnode),
- HASH_REMOVE, NULL);
- }
-}
-
-/*
- * A transactional sequence increment is queued to be processed upon commit
- * and a non-transactional increment gets processed immediately.
- *
- * A sequence update may be both transactional and non-transactional. When
- * created in a running transaction, treat it as transactional and queue
- * the change in it. Otherwise treat it as non-transactional, so that we
- * don't forget the increment in case of a rollback.
- */
-void
-ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
- Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
- RelFileNode rnode, bool transactional, bool created,
- ReorderBufferTupleBuf *tuplebuf)
-{
- /*
- * Change needs to be handled as transactional, because the sequence was
- * created in a transaction that is still running. In that case all the
- * changes need to be queued in that transaction, we must not send them
- * to the downstream until the transaction commits.
- *
- * There's a bit of a trouble with subtransactions - we can't queue it
- * into the subxact, because it might be rolled back and we'd lose the
- * increment. We need to queue it into the same (sub)xact that created
- * the sequence, which is why we track the XID in the hash table.
- */
- if (transactional)
- {
- MemoryContext oldcontext;
- ReorderBufferChange *change;
-
- /* lookup sequence by relfilenode */
- ReorderBufferSequenceEnt *ent;
- bool found;
-
- /* transactional changes require a transaction */
- Assert(xid != InvalidTransactionId);
-
- /* search the lookup table (we ignore the return value, found is enough) */
- ent = hash_search(rb->sequences,
- (void *) &rnode,
- created ? HASH_ENTER : HASH_FIND,
- &found);
-
- /*
- * If this is the "create" increment, we must not have found any
- * pre-existing entry in the hash table (i.e. there must not be
- * any conflicting sequence).
- */
- Assert(!(created && found));
-
- /* But we must have either created or found an existing entry. */
- Assert(created || found);
-
- /*
- * When creating the sequence, remember the XID of the transaction
- * that created id.
- */
- if (created)
- ent->xid = xid;
-
- /* XXX Maybe check that we're still in the same top-level xact? */
-
- /* OK, allocate and queue the change */
- oldcontext = MemoryContextSwitchTo(rb->context);
-
- change = ReorderBufferGetChange(rb);
-
- change->action = REORDER_BUFFER_CHANGE_SEQUENCE;
- change->origin_id = origin_id;
-
- memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode));
-
- change->data.sequence.tuple = tuplebuf;
-
- /* add it to the same subxact that created the sequence */
- ReorderBufferQueueChange(rb, ent->xid, lsn, change, false);
-
- MemoryContextSwitchTo(oldcontext);
- }
- else
- {
- /*
- * This increment is for a sequence that was not created in any
- * running transaction, so we treat it as non-transactional and
- * just send it to the output plugin directly.
- */
- ReorderBufferTXN *txn = NULL;
- volatile Snapshot snapshot_now = snapshot;
- bool using_subtxn;
-
-#ifdef USE_ASSERT_CHECKING
- /* All "creates" have to be handled as transactional. */
- Assert(!created);
-
- /* Make sure the sequence is not in the hash table. */
- {
- bool found;
- hash_search(rb->sequences,
- (void *) &rnode,
- HASH_FIND, &found);
- Assert(!found);
- }
-#endif
-
- if (xid != InvalidTransactionId)
- txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
-
- /* setup snapshot to allow catalog access */
- SetupHistoricSnapshot(snapshot_now, NULL);
-
- /*
- * Decoding needs access to syscaches et al., which in turn use
- * heavyweight locks and such. Thus we need to have enough state around to
- * keep track of those. The easiest way is to simply use a transaction
- * internally. That also allows us to easily enforce that nothing writes
- * to the database by checking for xid assignments.
- *
- * When we're called via the SQL SRF there's already a transaction
- * started, so start an explicit subtransaction there.
- */
- using_subtxn = IsTransactionOrTransactionBlock();
-
- PG_TRY();
- {
- Relation relation;
- HeapTuple tuple;
- Form_pg_sequence_data seq;
- Oid reloid;
-
- if (using_subtxn)
- BeginInternalSubTransaction("sequence");
- else
- StartTransactionCommand();
-
- reloid = RelidByRelfilenode(rnode.spcNode, rnode.relNode);
-
- if (reloid == InvalidOid)
- elog(ERROR, "could not map filenode \"%s\" to relation OID",
- relpathperm(rnode,
- MAIN_FORKNUM));
-
- relation = RelationIdGetRelation(reloid);
- tuple = &tuplebuf->tuple;
- seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
-
- rb->sequence(rb, txn, lsn, relation, transactional,
- seq->last_value, seq->log_cnt, seq->is_called);
-
- RelationClose(relation);
-
- TeardownHistoricSnapshot(false);
-
- AbortCurrentTransaction();
-
- if (using_subtxn)
- RollbackAndReleaseCurrentSubTransaction();
- }
- PG_CATCH();
- {
- TeardownHistoricSnapshot(true);
-
- AbortCurrentTransaction();
-
- if (using_subtxn)
- RollbackAndReleaseCurrentSubTransaction();
-
- PG_RE_THROW();
- }
- PG_END_TRY();
- }
-}
-
-/*
* AssertTXNLsnOrder
* Verify LSN ordering of transaction lists in the reorderbuffer
*
@@ -1823,9 +1542,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
&found);
Assert(found);
- /* Remove sequences created in this transaction (if any). */
- ReorderBufferSequenceCleanup(rb, txn->xid);
-
/* remove entries spilled to disk */
if (rbtxn_is_serialized(txn))
ReorderBufferRestoreCleanup(rb, txn);
@@ -2242,29 +1958,6 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
}
/*
- * Helper function for ReorderBufferProcessTXN for applying sequences.
- */
-static inline void
-ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn,
- Relation relation, ReorderBufferChange *change,
- bool streaming)
-{
- HeapTuple tuple;
- Form_pg_sequence_data seq;
-
- tuple = &change->data.sequence.tuple->tuple;
- seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
-
- /* Only ever called from ReorderBufferApplySequence, so transational. */
- if (streaming)
- rb->stream_sequence(rb, txn, change->lsn, relation, true,
- seq->last_value, seq->log_cnt, seq->is_called);
- else
- rb->sequence(rb, txn, change->lsn, relation, true,
- seq->last_value, seq->log_cnt, seq->is_called);
-}
-
-/*
* Function to store the command id and snapshot at the end of the current
* stream so that we can reuse the same while sending the next stream.
*/
@@ -2706,31 +2399,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
elog(ERROR, "tuplecid value in changequeue");
break;
-
- case REORDER_BUFFER_CHANGE_SEQUENCE:
- Assert(snapshot_now);
-
- reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode,
- change->data.sequence.relnode.relNode);
-
- if (reloid == InvalidOid)
- elog(ERROR, "could not map filenode \"%s\" to relation OID",
- relpathperm(change->data.sequence.relnode,
- MAIN_FORKNUM));
-
- relation = RelationIdGetRelation(reloid);
-
- if (!RelationIsValid(relation))
- elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
- reloid,
- relpathperm(change->data.sequence.relnode,
- MAIN_FORKNUM));
-
- if (RelationIsLogicallyLogged(relation))
- ReorderBufferApplySequence(rb, txn, relation, change, streaming);
-
- RelationClose(relation);
- break;
}
}
@@ -4117,39 +3785,6 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
}
- case REORDER_BUFFER_CHANGE_SEQUENCE:
- {
- char *data;
- ReorderBufferTupleBuf *tup;
- Size len = 0;
-
- tup = change->data.sequence.tuple;
-
- if (tup)
- {
- sz += sizeof(HeapTupleData);
- len = tup->tuple.t_len;
- sz += len;
- }
-
- /* make sure we have enough space */
- ReorderBufferSerializeReserve(rb, sz);
-
- data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
- /* might have been reallocated above */
- ondisk = (ReorderBufferDiskChange *) rb->outbuf;
-
- if (len)
- {
- memcpy(data, &tup->tuple, sizeof(HeapTupleData));
- data += sizeof(HeapTupleData);
-
- memcpy(data, tup->tuple.t_data, len);
- data += len;
- }
-
- break;
- }
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
@@ -4414,22 +4049,6 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
break;
}
- case REORDER_BUFFER_CHANGE_SEQUENCE:
- {
- ReorderBufferTupleBuf *tup;
- Size len = 0;
-
- tup = change->data.sequence.tuple;
-
- if (tup)
- {
- sz += sizeof(HeapTupleData);
- len = tup->tuple.t_len;
- sz += len;
- }
-
- break;
- }
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
@@ -4729,30 +4348,6 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
}
-
- case REORDER_BUFFER_CHANGE_SEQUENCE:
- if (change->data.sequence.tuple)
- {
- uint32 tuplelen = ((HeapTuple) data)->t_len;
-
- change->data.sequence.tuple =
- ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
-
- /* restore ->tuple */
- memcpy(&change->data.sequence.tuple->tuple, data,
- sizeof(HeapTupleData));
- data += sizeof(HeapTupleData);
-
- /* reset t_data pointer into the new tuplebuf */
- change->data.sequence.tuple->tuple.t_data =
- ReorderBufferTupleBufData(change->data.sequence.tuple);
-
- /* restore tuple data itself */
- memcpy(change->data.sequence.tuple->tuple.t_data, data, tuplelen);
- data += tuplelen;
- }
- break;
-
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index b2cb31eaad7..49ceec3bdc8 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -100,7 +100,6 @@
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
-#include "commands/sequence.h"
#include "miscadmin.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
@@ -1138,95 +1137,6 @@ copy_table(Relation rel)
}
/*
- * Fetch sequence data (current state) from the remote node.
- */
-static void
-fetch_sequence_data(char *nspname, char *relname,
- int64 *last_value, int64 *log_cnt, bool *is_called)
-{
- WalRcvExecResult *res;
- StringInfoData cmd;
- TupleTableSlot *slot;
- Oid tableRow[3] = {INT8OID, INT8OID, BOOLOID};
-
- initStringInfo(&cmd);
- appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n"
- " FROM %s", quote_qualified_identifier(nspname, relname));
-
- res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 3, tableRow);
- pfree(cmd.data);
-
- if (res->status != WALRCV_OK_TUPLES)
- ereport(ERROR,
- (errmsg("could not receive list of replicated tables from the publisher: %s",
- res->err)));
-
- /* Process the sequence. */
- slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
- while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
- {
- bool isnull;
-
- *last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull));
- Assert(!isnull);
-
- *log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull));
- Assert(!isnull);
-
- *is_called = DatumGetBool(slot_getattr(slot, 3, &isnull));
- Assert(!isnull);
-
- ExecClearTuple(slot);
- }
- ExecDropSingleTupleTableSlot(slot);
-
- walrcv_clear_result(res);
-}
-
-/*
- * Copy existing data of a sequence from publisher.
- *
- * Caller is responsible for locking the local relation.
- */
-static void
-copy_sequence(Relation rel)
-{
- LogicalRepRelMapEntry *relmapentry;
- LogicalRepRelation lrel;
- List *qual = NIL;
- StringInfoData cmd;
- int64 last_value = 0,
- log_cnt = 0;
- bool is_called = 0;
-
- /* Get the publisher relation info. */
- fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
- RelationGetRelationName(rel), &lrel, &qual);
-
- /* sequences don't have row filters */
- Assert(!qual);
-
- /* Put the relation into relmap. */
- logicalrep_relmap_update(&lrel);
-
- /* Map the publisher relation to local one. */
- relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
- Assert(rel == relmapentry->localrel);
-
- /* Start copy on the publisher. */
- initStringInfo(&cmd);
-
- Assert(lrel.relkind == RELKIND_SEQUENCE);
-
- fetch_sequence_data(lrel.nspname, lrel.relname, &last_value, &log_cnt, &is_called);
-
- /* tablesync sets the sequences in non-transactional way */
- SetSequence(RelationGetRelid(rel), false, last_value, log_cnt, is_called);
-
- logicalrep_rel_close(relmapentry, NoLock);
-}
-
-/*
* Determine the tablesync slot name.
*
* The name must not exceed NAMEDATALEN - 1 because of remote node constraints
@@ -1487,21 +1397,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
originname)));
}
- /* Do the right action depending on the relation kind. */
- if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE)
- {
- /* Now do the initial sequence copy */
- PushActiveSnapshot(GetTransactionSnapshot());
- copy_sequence(rel);
- PopActiveSnapshot();
- }
- else
- {
- /* Now do the initial data copy */
- PushActiveSnapshot(GetTransactionSnapshot());
- copy_table(rel);
- PopActiveSnapshot();
- }
+ /* Now do the initial data copy */
+ PushActiveSnapshot(GetTransactionSnapshot());
+ copy_table(rel);
+ PopActiveSnapshot();
res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7ade49652e7..9181d3e8636 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -143,7 +143,6 @@
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_tablespace.h"
-#include "commands/sequence.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
#include "commands/trigger.h"
@@ -1145,57 +1144,6 @@ apply_handle_origin(StringInfo s)
}
/*
- * Handle SEQUENCE message.
- */
-static void
-apply_handle_sequence(StringInfo s)
-{
- LogicalRepSequence seq;
- Oid relid;
-
- if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s))
- return;
-
- logicalrep_read_sequence(s, &seq);
-
- /*
- * Non-transactional sequence updates should not be part of a remote
- * transaction. There should not be any running transaction.
- */
- Assert((!seq.transactional) || in_remote_transaction);
- Assert(!(!seq.transactional && in_remote_transaction));
- Assert(!(!seq.transactional && IsTransactionState()));
-
- /*
- * Make sure we're in a transaction (needed by SetSequence). For
- * non-transactional updates we're guaranteed to start a new one,
- * and we'll commit it at the end.
- */
- if (!IsTransactionState())
- {
- StartTransactionCommand();
- maybe_reread_subscription();
- }
-
- relid = RangeVarGetRelid(makeRangeVar(seq.nspname,
- seq.seqname, -1),
- RowExclusiveLock, false);
-
- /* lock the sequence in AccessExclusiveLock, as expected by SetSequence */
- LockRelationOid(relid, AccessExclusiveLock);
-
- /* apply the sequence change */
- SetSequence(relid, seq.transactional, seq.last_value, seq.log_cnt, seq.is_called);
-
- /*
- * Commit the per-stream transaction (we only do this when not in
- * remote transaction, i.e. for non-transactional sequence updates.
- */
- if (!in_remote_transaction)
- CommitTransactionCommand();
-}
-
-/*
* Handle STREAM START message.
*/
static void
@@ -2563,10 +2511,6 @@ apply_dispatch(StringInfo s)
*/
break;
- case LOGICAL_REP_MSG_SEQUENCE:
- apply_handle_sequence(s);
- return;
-
case LOGICAL_REP_MSG_STREAM_START:
apply_handle_stream_start(s);
break;