diff options
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r-- | src/backend/replication/logical/decode.c | 131 | ||||
-rw-r--r-- | src/backend/replication/logical/logical.c | 88 | ||||
-rw-r--r-- | src/backend/replication/logical/proto.c | 52 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 405 | ||||
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 109 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 56 |
6 files changed, 4 insertions, 837 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c6ea7c98e15..6303647fe0f 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -42,7 +42,6 @@ #include "replication/reorderbuffer.h" #include "replication/snapbuild.h" #include "storage/standby.h" -#include "commands/sequence.h" /* individual record(group)'s handlers */ static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -64,7 +63,6 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); -static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple); /* helper functions for decoding transactions */ static inline bool FilterPrepare(LogicalDecodingContext *ctx, @@ -1252,132 +1250,3 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) || ctx->fast_forward || FilterByOrigin(ctx, origin_id)); } - -/* - * DecodeSeqTuple - * decode tuple describing the sequence increment - * - * Sequences are represented as a table with a single row, which gets updated - * by nextval(). The tuple is stored in WAL right after the xl_seq_rec, so we - * simply copy it into the tuplebuf (similar to seq_redo). - */ -static void -DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) -{ - int datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader; - - Assert(datalen >= 0); - - tuple->tuple.t_len = datalen + SizeofHeapTupleHeader; - - ItemPointerSetInvalid(&tuple->tuple.t_self); - - tuple->tuple.t_tableOid = InvalidOid; - - memcpy(((char *) tuple->tuple.t_data), - data + sizeof(xl_seq_rec), - SizeofHeapTupleHeader); - - memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader, - data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader, - datalen); -} - -/* - * Handle sequence decode - * - * Decoding sequences is a bit tricky, because while most sequence actions - * are non-transactional (not subject to rollback), some need to be handled - * as transactional. - * - * By default, a sequence increment is non-transactional - we must not queue - * it in a transaction as other changes, because the transaction might get - * rolled back and we'd discard the increment. The downstream would not be - * notified about the increment, which is wrong. - * - * On the other hand, the sequence may be created in a transaction. In this - * case we *should* queue the change as other changes in the transaction, - * because we don't want to send the increments for unknown sequence to the - * plugin - it might get confused about which sequence it's related to etc. - */ -void -sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) -{ - SnapBuild *builder = ctx->snapshot_builder; - ReorderBufferTupleBuf *tuplebuf; - RelFileNode target_node; - XLogReaderState *r = buf->record; - char *tupledata = NULL; - Size tuplelen; - Size datalen = 0; - TransactionId xid = XLogRecGetXid(r); - uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; - xl_seq_rec *xlrec; - Snapshot snapshot; - RepOriginId origin_id = XLogRecGetOrigin(r); - bool transactional; - - /* only decode changes flagged with XLOG_SEQ_LOG */ - if (info != XLOG_SEQ_LOG) - elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info); - - ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); - - /* - * If we don't have snapshot or we are just fast-forwarding, there is no - * point in decoding messages. - */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || - ctx->fast_forward) - return; - - /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); - if (target_node.dbNode != ctx->slot->data.database) - return; - - /* output plugin doesn't look for this origin, no need to queue */ - if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) - return; - - tupledata = XLogRecGetData(r); - datalen = XLogRecGetDataLen(r); - tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec); - - /* extract the WAL record, with "created" flag */ - xlrec = (xl_seq_rec *) XLogRecGetData(r); - - /* XXX how could we have sequence change without data? */ - if(!datalen || !tupledata) - return; - - tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); - DecodeSeqTuple(tupledata, datalen, tuplebuf); - - /* - * Should we handle the sequence increment as transactional or not? - * - * If the sequence was created in a still-running transaction, treat - * it as transactional and queue the increments. Otherwise it needs - * to be treated as non-transactional, in which case we send it to - * the plugin right away. - */ - transactional = ReorderBufferSequenceIsTransactional(ctx->reorder, - target_node, - xlrec->created); - - /* Skip the change if already processed (per the snapshot). */ - if (transactional && - !SnapBuildProcessChange(builder, xid, buf->origptr)) - return; - else if (!transactional && - (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT || - SnapBuildXactNeedsSkip(builder, buf->origptr))) - return; - - /* Queue the increment (or send immediately if not transactional). */ - snapshot = SnapBuildGetOrBuildSnapshot(builder, xid); - ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr, - origin_id, target_node, transactional, - xlrec->created, tuplebuf); -} diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 30e33dace33..788769dd738 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -73,10 +73,6 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); -static void sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr sequence_lsn, Relation rel, - bool transactional, - int64 last_value, int64 log_cnt, bool is_called); /* streaming callbacks */ static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -94,10 +90,6 @@ static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); -static void stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr sequence_lsn, Relation rel, - bool transactional, - int64 last_value, int64 log_cnt, bool is_called); static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); @@ -226,7 +218,6 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->apply_truncate = truncate_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; - ctx->reorder->sequence = sequence_cb_wrapper; /* * To support streaming, we require start/stop/abort/commit/change @@ -243,7 +234,6 @@ StartupDecodingContext(List *output_plugin_options, (ctx->callbacks.stream_commit_cb != NULL) || (ctx->callbacks.stream_change_cb != NULL) || (ctx->callbacks.stream_message_cb != NULL) || - (ctx->callbacks.stream_sequence_cb != NULL) || (ctx->callbacks.stream_truncate_cb != NULL); /* @@ -261,7 +251,6 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->stream_commit = stream_commit_cb_wrapper; ctx->reorder->stream_change = stream_change_cb_wrapper; ctx->reorder->stream_message = stream_message_cb_wrapper; - ctx->reorder->stream_sequence = stream_sequence_cb_wrapper; ctx->reorder->stream_truncate = stream_truncate_cb_wrapper; @@ -1217,42 +1206,6 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, } static void -sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr sequence_lsn, Relation rel, bool transactional, - int64 last_value, int64 log_cnt, bool is_called) -{ - LogicalDecodingContext *ctx = cache->private_data; - LogicalErrorCallbackState state; - ErrorContextCallback errcallback; - - Assert(!ctx->fast_forward); - - if (ctx->callbacks.sequence_cb == NULL) - return; - - /* Push callback + info on the error context stack */ - state.ctx = ctx; - state.callback_name = "sequence"; - state.report_location = sequence_lsn; - errcallback.callback = output_plugin_error_callback; - errcallback.arg = (void *) &state; - errcallback.previous = error_context_stack; - error_context_stack = &errcallback; - - /* set output state */ - ctx->accept_writes = true; - ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; - ctx->write_location = sequence_lsn; - - /* do the actual work: call callback */ - ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional, - last_value, log_cnt, is_called); - - /* Pop the error context stack */ - error_context_stack = errcallback.previous; -} - -static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr first_lsn) { @@ -1558,47 +1511,6 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, } static void -stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr sequence_lsn, Relation rel, - bool transactional, - int64 last_value, int64 log_cnt, bool is_called) -{ - LogicalDecodingContext *ctx = cache->private_data; - LogicalErrorCallbackState state; - ErrorContextCallback errcallback; - - Assert(!ctx->fast_forward); - - /* We're only supposed to call this when streaming is supported. */ - Assert(ctx->streaming); - - /* this callback is optional */ - if (ctx->callbacks.stream_sequence_cb == NULL) - return; - - /* Push callback + info on the error context stack */ - state.ctx = ctx; - state.callback_name = "stream_sequence"; - state.report_location = sequence_lsn; - errcallback.callback = output_plugin_error_callback; - errcallback.arg = (void *) &state; - errcallback.previous = error_context_stack; - error_context_stack = &errcallback; - - /* set output state */ - ctx->accept_writes = true; - ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; - ctx->write_location = sequence_lsn; - - /* do the actual work: call callback */ - ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional, - last_value, log_cnt, is_called); - - /* Pop the error context stack */ - error_context_stack = errcallback.previous; -} - -static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 18d3cbb9248..ff8513e2d29 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -663,56 +663,6 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, } /* - * Write SEQUENCE to stream - */ -void -logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid, - XLogRecPtr lsn, bool transactional, - int64 last_value, int64 log_cnt, bool is_called) -{ - uint8 flags = 0; - char *relname; - - pq_sendbyte(out, LOGICAL_REP_MSG_SEQUENCE); - - /* transaction ID (if not valid, we're not streaming) */ - if (TransactionIdIsValid(xid)) - pq_sendint32(out, xid); - - pq_sendint8(out, flags); - pq_sendint64(out, lsn); - - logicalrep_write_namespace(out, RelationGetNamespace(rel)); - relname = RelationGetRelationName(rel); - pq_sendstring(out, relname); - - pq_sendint8(out, transactional); - pq_sendint64(out, last_value); - pq_sendint64(out, log_cnt); - pq_sendint8(out, is_called); -} - -/* - * Read SEQUENCE from the stream. - */ -void -logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata) -{ - /* XXX skipping flags and lsn */ - pq_getmsgint(in, 1); - pq_getmsgint64(in); - - /* Read relation name from stream */ - seqdata->nspname = pstrdup(logicalrep_read_namespace(in)); - seqdata->seqname = pstrdup(pq_getmsgstring(in)); - - seqdata->transactional = pq_getmsgint(in, 1); - seqdata->last_value = pq_getmsgint64(in); - seqdata->log_cnt = pq_getmsgint64(in); - seqdata->is_called = pq_getmsgint(in, 1); -} - -/* * Write relation description to the output stream. */ void @@ -1286,8 +1236,6 @@ logicalrep_message_type(LogicalRepMsgType action) return "STREAM ABORT"; case LOGICAL_REP_MSG_STREAM_PREPARE: return "STREAM PREPARE"; - case LOGICAL_REP_MSG_SEQUENCE: - return "SEQUENCE"; } elog(ERROR, "invalid logical replication message type \"%c\"", action); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 4702750a2e7..5adc016d449 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -77,40 +77,6 @@ * a bit more memory to the oldest subtransactions, because it's likely * they are the source for the next sequence of changes. * - * When decoding sequences, we differentiate between a sequences created - * in a (running) transaction, and sequences created in other (already - * committed) transactions. Changes for sequences created in the same - * top-level transaction are treated as "transactional" i.e. just like - * any other change from that transaction (and discarded in case of a - * rollback). Changes for sequences created earlier are treated as not - * transactional - are processed immediately, as if performed outside - * any transaction (and thus not rolled back). - * - * This mixed behavior is necessary - sequences are non-transactional - * (e.g. ROLLBACK does not undo the sequence increments). But for new - * sequences, we need to handle them in a transactional way, because if - * we ever get some DDL support, the sequence won't exist until the - * transaction gets applied. So we need to ensure the increments don't - * happen until the sequence gets created. - * - * To differentiate which sequences are "old" and which were created - * in a still-running transaction, we track sequences created in running - * transactions in a hash table. Sequences are identified by relfilenode, - * and we track XID of the (sub)transaction that created it. This means - * that if a transaction does something that changes the relfilenode - * (like an alter / reset of a sequence), the new relfilenode will be - * treated as if created in the transaction. The list of sequences gets - * discarded when the transaction completes (commit/rollback). - * - * We don't use the XID to check if it's the same top-level transaction. - * It's enough to know it was created in an in-progress transaction, - * and we know it must be the current one because otherwise it wouldn't - * see the sequence object. - * - * The XID may be valid even for non-transactional sequences - we simply - * keep the XID logged to WAL, it's up to the reorderbuffer to decide if - * the increment is transactional. - * * ------------------------------------------------------------------------- */ #include "postgres.h" @@ -125,7 +91,6 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/catalog.h" -#include "commands/sequence.h" #include "lib/binaryheap.h" #include "miscadmin.h" #include "pgstat.h" @@ -151,13 +116,6 @@ typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXN *txn; } ReorderBufferTXNByIdEnt; -/* entry for hash table we use to track sequences created in running xacts */ -typedef struct ReorderBufferSequenceEnt -{ - RelFileNode rnode; - TransactionId xid; -} ReorderBufferSequenceEnt; - /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */ typedef struct ReorderBufferTupleCidKey { @@ -388,14 +346,6 @@ ReorderBufferAllocate(void) buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - /* hash table of sequences, mapping relfilenode to XID of transaction */ - hash_ctl.keysize = sizeof(RelFileNode); - hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt); - hash_ctl.hcxt = buffer->context; - - buffer->sequences = hash_create("ReorderBufferSequenceHash", 1000, &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - buffer->by_txn_last_xid = InvalidTransactionId; buffer->by_txn_last_txn = NULL; @@ -582,13 +532,6 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, change->data.truncate.relids = NULL; } break; - case REORDER_BUFFER_CHANGE_SEQUENCE: - if (change->data.sequence.tuple) - { - ReorderBufferReturnTupleBuf(rb, change->data.sequence.tuple); - change->data.sequence.tuple = NULL; - } - break; case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: @@ -924,230 +867,6 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, } /* - * Treat the sequence increment as transactional? - * - * The hash table tracks all sequences created in in-progress transactions, - * so we simply do a lookup (the sequence is identified by relfilende). If - * we find a match, the increment should be handled as transactional. - */ -bool -ReorderBufferSequenceIsTransactional(ReorderBuffer *rb, - RelFileNode rnode, bool created) -{ - bool found = false; - - if (created) - return true; - - hash_search(rb->sequences, - (void *) &rnode, - HASH_FIND, - &found); - - return found; -} - -/* - * Cleanup sequences created in in-progress transactions. - * - * There's no way to search by XID, so we simply do a seqscan of all - * the entries in the hash table. Hopefully there are only a couple - * entries in most cases - people generally don't create many new - * sequences over and over. - */ -static void -ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid) -{ - HASH_SEQ_STATUS scan_status; - ReorderBufferSequenceEnt *ent; - - hash_seq_init(&scan_status, rb->sequences); - while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL) - { - /* skip sequences not from this transaction */ - if (ent->xid != xid) - continue; - - (void) hash_search(rb->sequences, - (void *) &(ent->rnode), - HASH_REMOVE, NULL); - } -} - -/* - * A transactional sequence increment is queued to be processed upon commit - * and a non-transactional increment gets processed immediately. - * - * A sequence update may be both transactional and non-transactional. When - * created in a running transaction, treat it as transactional and queue - * the change in it. Otherwise treat it as non-transactional, so that we - * don't forget the increment in case of a rollback. - */ -void -ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid, - Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id, - RelFileNode rnode, bool transactional, bool created, - ReorderBufferTupleBuf *tuplebuf) -{ - /* - * Change needs to be handled as transactional, because the sequence was - * created in a transaction that is still running. In that case all the - * changes need to be queued in that transaction, we must not send them - * to the downstream until the transaction commits. - * - * There's a bit of a trouble with subtransactions - we can't queue it - * into the subxact, because it might be rolled back and we'd lose the - * increment. We need to queue it into the same (sub)xact that created - * the sequence, which is why we track the XID in the hash table. - */ - if (transactional) - { - MemoryContext oldcontext; - ReorderBufferChange *change; - - /* lookup sequence by relfilenode */ - ReorderBufferSequenceEnt *ent; - bool found; - - /* transactional changes require a transaction */ - Assert(xid != InvalidTransactionId); - - /* search the lookup table (we ignore the return value, found is enough) */ - ent = hash_search(rb->sequences, - (void *) &rnode, - created ? HASH_ENTER : HASH_FIND, - &found); - - /* - * If this is the "create" increment, we must not have found any - * pre-existing entry in the hash table (i.e. there must not be - * any conflicting sequence). - */ - Assert(!(created && found)); - - /* But we must have either created or found an existing entry. */ - Assert(created || found); - - /* - * When creating the sequence, remember the XID of the transaction - * that created id. - */ - if (created) - ent->xid = xid; - - /* XXX Maybe check that we're still in the same top-level xact? */ - - /* OK, allocate and queue the change */ - oldcontext = MemoryContextSwitchTo(rb->context); - - change = ReorderBufferGetChange(rb); - - change->action = REORDER_BUFFER_CHANGE_SEQUENCE; - change->origin_id = origin_id; - - memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode)); - - change->data.sequence.tuple = tuplebuf; - - /* add it to the same subxact that created the sequence */ - ReorderBufferQueueChange(rb, ent->xid, lsn, change, false); - - MemoryContextSwitchTo(oldcontext); - } - else - { - /* - * This increment is for a sequence that was not created in any - * running transaction, so we treat it as non-transactional and - * just send it to the output plugin directly. - */ - ReorderBufferTXN *txn = NULL; - volatile Snapshot snapshot_now = snapshot; - bool using_subtxn; - -#ifdef USE_ASSERT_CHECKING - /* All "creates" have to be handled as transactional. */ - Assert(!created); - - /* Make sure the sequence is not in the hash table. */ - { - bool found; - hash_search(rb->sequences, - (void *) &rnode, - HASH_FIND, &found); - Assert(!found); - } -#endif - - if (xid != InvalidTransactionId) - txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); - - /* setup snapshot to allow catalog access */ - SetupHistoricSnapshot(snapshot_now, NULL); - - /* - * Decoding needs access to syscaches et al., which in turn use - * heavyweight locks and such. Thus we need to have enough state around to - * keep track of those. The easiest way is to simply use a transaction - * internally. That also allows us to easily enforce that nothing writes - * to the database by checking for xid assignments. - * - * When we're called via the SQL SRF there's already a transaction - * started, so start an explicit subtransaction there. - */ - using_subtxn = IsTransactionOrTransactionBlock(); - - PG_TRY(); - { - Relation relation; - HeapTuple tuple; - Form_pg_sequence_data seq; - Oid reloid; - - if (using_subtxn) - BeginInternalSubTransaction("sequence"); - else - StartTransactionCommand(); - - reloid = RelidByRelfilenode(rnode.spcNode, rnode.relNode); - - if (reloid == InvalidOid) - elog(ERROR, "could not map filenode \"%s\" to relation OID", - relpathperm(rnode, - MAIN_FORKNUM)); - - relation = RelationIdGetRelation(reloid); - tuple = &tuplebuf->tuple; - seq = (Form_pg_sequence_data) GETSTRUCT(tuple); - - rb->sequence(rb, txn, lsn, relation, transactional, - seq->last_value, seq->log_cnt, seq->is_called); - - RelationClose(relation); - - TeardownHistoricSnapshot(false); - - AbortCurrentTransaction(); - - if (using_subtxn) - RollbackAndReleaseCurrentSubTransaction(); - } - PG_CATCH(); - { - TeardownHistoricSnapshot(true); - - AbortCurrentTransaction(); - - if (using_subtxn) - RollbackAndReleaseCurrentSubTransaction(); - - PG_RE_THROW(); - } - PG_END_TRY(); - } -} - -/* * AssertTXNLsnOrder * Verify LSN ordering of transaction lists in the reorderbuffer * @@ -1823,9 +1542,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) &found); Assert(found); - /* Remove sequences created in this transaction (if any). */ - ReorderBufferSequenceCleanup(rb, txn->xid); - /* remove entries spilled to disk */ if (rbtxn_is_serialized(txn)) ReorderBufferRestoreCleanup(rb, txn); @@ -2242,29 +1958,6 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, } /* - * Helper function for ReorderBufferProcessTXN for applying sequences. - */ -static inline void -ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn, - Relation relation, ReorderBufferChange *change, - bool streaming) -{ - HeapTuple tuple; - Form_pg_sequence_data seq; - - tuple = &change->data.sequence.tuple->tuple; - seq = (Form_pg_sequence_data) GETSTRUCT(tuple); - - /* Only ever called from ReorderBufferApplySequence, so transational. */ - if (streaming) - rb->stream_sequence(rb, txn, change->lsn, relation, true, - seq->last_value, seq->log_cnt, seq->is_called); - else - rb->sequence(rb, txn, change->lsn, relation, true, - seq->last_value, seq->log_cnt, seq->is_called); -} - -/* * Function to store the command id and snapshot at the end of the current * stream so that we can reuse the same while sending the next stream. */ @@ -2706,31 +2399,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: elog(ERROR, "tuplecid value in changequeue"); break; - - case REORDER_BUFFER_CHANGE_SEQUENCE: - Assert(snapshot_now); - - reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode, - change->data.sequence.relnode.relNode); - - if (reloid == InvalidOid) - elog(ERROR, "could not map filenode \"%s\" to relation OID", - relpathperm(change->data.sequence.relnode, - MAIN_FORKNUM)); - - relation = RelationIdGetRelation(reloid); - - if (!RelationIsValid(relation)) - elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")", - reloid, - relpathperm(change->data.sequence.relnode, - MAIN_FORKNUM)); - - if (RelationIsLogicallyLogged(relation)) - ReorderBufferApplySequence(rb, txn, relation, change, streaming); - - RelationClose(relation); - break; } } @@ -4117,39 +3785,6 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } - case REORDER_BUFFER_CHANGE_SEQUENCE: - { - char *data; - ReorderBufferTupleBuf *tup; - Size len = 0; - - tup = change->data.sequence.tuple; - - if (tup) - { - sz += sizeof(HeapTupleData); - len = tup->tuple.t_len; - sz += len; - } - - /* make sure we have enough space */ - ReorderBufferSerializeReserve(rb, sz); - - data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); - /* might have been reallocated above */ - ondisk = (ReorderBufferDiskChange *) rb->outbuf; - - if (len) - { - memcpy(data, &tup->tuple, sizeof(HeapTupleData)); - data += sizeof(HeapTupleData); - - memcpy(data, tup->tuple.t_data, len); - data += len; - } - - break; - } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: @@ -4414,22 +4049,6 @@ ReorderBufferChangeSize(ReorderBufferChange *change) break; } - case REORDER_BUFFER_CHANGE_SEQUENCE: - { - ReorderBufferTupleBuf *tup; - Size len = 0; - - tup = change->data.sequence.tuple; - - if (tup) - { - sz += sizeof(HeapTupleData); - len = tup->tuple.t_len; - sz += len; - } - - break; - } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: @@ -4729,30 +4348,6 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } - - case REORDER_BUFFER_CHANGE_SEQUENCE: - if (change->data.sequence.tuple) - { - uint32 tuplelen = ((HeapTuple) data)->t_len; - - change->data.sequence.tuple = - ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); - - /* restore ->tuple */ - memcpy(&change->data.sequence.tuple->tuple, data, - sizeof(HeapTupleData)); - data += sizeof(HeapTupleData); - - /* reset t_data pointer into the new tuplebuf */ - change->data.sequence.tuple->tuple.t_data = - ReorderBufferTupleBufData(change->data.sequence.tuple); - - /* restore tuple data itself */ - memcpy(change->data.sequence.tuple->tuple.t_data, data, tuplelen); - data += tuplelen; - } - break; - case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index b2cb31eaad7..49ceec3bdc8 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -100,7 +100,6 @@ #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" -#include "commands/sequence.h" #include "miscadmin.h" #include "parser/parse_relation.h" #include "pgstat.h" @@ -1138,95 +1137,6 @@ copy_table(Relation rel) } /* - * Fetch sequence data (current state) from the remote node. - */ -static void -fetch_sequence_data(char *nspname, char *relname, - int64 *last_value, int64 *log_cnt, bool *is_called) -{ - WalRcvExecResult *res; - StringInfoData cmd; - TupleTableSlot *slot; - Oid tableRow[3] = {INT8OID, INT8OID, BOOLOID}; - - initStringInfo(&cmd); - appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n" - " FROM %s", quote_qualified_identifier(nspname, relname)); - - res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 3, tableRow); - pfree(cmd.data); - - if (res->status != WALRCV_OK_TUPLES) - ereport(ERROR, - (errmsg("could not receive list of replicated tables from the publisher: %s", - res->err))); - - /* Process the sequence. */ - slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); - while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) - { - bool isnull; - - *last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull)); - Assert(!isnull); - - *log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull)); - Assert(!isnull); - - *is_called = DatumGetBool(slot_getattr(slot, 3, &isnull)); - Assert(!isnull); - - ExecClearTuple(slot); - } - ExecDropSingleTupleTableSlot(slot); - - walrcv_clear_result(res); -} - -/* - * Copy existing data of a sequence from publisher. - * - * Caller is responsible for locking the local relation. - */ -static void -copy_sequence(Relation rel) -{ - LogicalRepRelMapEntry *relmapentry; - LogicalRepRelation lrel; - List *qual = NIL; - StringInfoData cmd; - int64 last_value = 0, - log_cnt = 0; - bool is_called = 0; - - /* Get the publisher relation info. */ - fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), - RelationGetRelationName(rel), &lrel, &qual); - - /* sequences don't have row filters */ - Assert(!qual); - - /* Put the relation into relmap. */ - logicalrep_relmap_update(&lrel); - - /* Map the publisher relation to local one. */ - relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); - Assert(rel == relmapentry->localrel); - - /* Start copy on the publisher. */ - initStringInfo(&cmd); - - Assert(lrel.relkind == RELKIND_SEQUENCE); - - fetch_sequence_data(lrel.nspname, lrel.relname, &last_value, &log_cnt, &is_called); - - /* tablesync sets the sequences in non-transactional way */ - SetSequence(RelationGetRelid(rel), false, last_value, log_cnt, is_called); - - logicalrep_rel_close(relmapentry, NoLock); -} - -/* * Determine the tablesync slot name. * * The name must not exceed NAMEDATALEN - 1 because of remote node constraints @@ -1487,21 +1397,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) originname))); } - /* Do the right action depending on the relation kind. */ - if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE) - { - /* Now do the initial sequence copy */ - PushActiveSnapshot(GetTransactionSnapshot()); - copy_sequence(rel); - PopActiveSnapshot(); - } - else - { - /* Now do the initial data copy */ - PushActiveSnapshot(GetTransactionSnapshot()); - copy_table(rel); - PopActiveSnapshot(); - } + /* Now do the initial data copy */ + PushActiveSnapshot(GetTransactionSnapshot()); + copy_table(rel); + PopActiveSnapshot(); res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL); if (res->status != WALRCV_OK_COMMAND) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 7ade49652e7..9181d3e8636 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -143,7 +143,6 @@ #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_tablespace.h" -#include "commands/sequence.h" #include "commands/tablecmds.h" #include "commands/tablespace.h" #include "commands/trigger.h" @@ -1145,57 +1144,6 @@ apply_handle_origin(StringInfo s) } /* - * Handle SEQUENCE message. - */ -static void -apply_handle_sequence(StringInfo s) -{ - LogicalRepSequence seq; - Oid relid; - - if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s)) - return; - - logicalrep_read_sequence(s, &seq); - - /* - * Non-transactional sequence updates should not be part of a remote - * transaction. There should not be any running transaction. - */ - Assert((!seq.transactional) || in_remote_transaction); - Assert(!(!seq.transactional && in_remote_transaction)); - Assert(!(!seq.transactional && IsTransactionState())); - - /* - * Make sure we're in a transaction (needed by SetSequence). For - * non-transactional updates we're guaranteed to start a new one, - * and we'll commit it at the end. - */ - if (!IsTransactionState()) - { - StartTransactionCommand(); - maybe_reread_subscription(); - } - - relid = RangeVarGetRelid(makeRangeVar(seq.nspname, - seq.seqname, -1), - RowExclusiveLock, false); - - /* lock the sequence in AccessExclusiveLock, as expected by SetSequence */ - LockRelationOid(relid, AccessExclusiveLock); - - /* apply the sequence change */ - SetSequence(relid, seq.transactional, seq.last_value, seq.log_cnt, seq.is_called); - - /* - * Commit the per-stream transaction (we only do this when not in - * remote transaction, i.e. for non-transactional sequence updates. - */ - if (!in_remote_transaction) - CommitTransactionCommand(); -} - -/* * Handle STREAM START message. */ static void @@ -2563,10 +2511,6 @@ apply_dispatch(StringInfo s) */ break; - case LOGICAL_REP_MSG_SEQUENCE: - apply_handle_sequence(s); - return; - case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); break; |