aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/reorderbuffer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c405
1 files changed, 0 insertions, 405 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 4702750a2e7..5adc016d449 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -77,40 +77,6 @@
* a bit more memory to the oldest subtransactions, because it's likely
* they are the source for the next sequence of changes.
*
- * When decoding sequences, we differentiate between a sequences created
- * in a (running) transaction, and sequences created in other (already
- * committed) transactions. Changes for sequences created in the same
- * top-level transaction are treated as "transactional" i.e. just like
- * any other change from that transaction (and discarded in case of a
- * rollback). Changes for sequences created earlier are treated as not
- * transactional - are processed immediately, as if performed outside
- * any transaction (and thus not rolled back).
- *
- * This mixed behavior is necessary - sequences are non-transactional
- * (e.g. ROLLBACK does not undo the sequence increments). But for new
- * sequences, we need to handle them in a transactional way, because if
- * we ever get some DDL support, the sequence won't exist until the
- * transaction gets applied. So we need to ensure the increments don't
- * happen until the sequence gets created.
- *
- * To differentiate which sequences are "old" and which were created
- * in a still-running transaction, we track sequences created in running
- * transactions in a hash table. Sequences are identified by relfilenode,
- * and we track XID of the (sub)transaction that created it. This means
- * that if a transaction does something that changes the relfilenode
- * (like an alter / reset of a sequence), the new relfilenode will be
- * treated as if created in the transaction. The list of sequences gets
- * discarded when the transaction completes (commit/rollback).
- *
- * We don't use the XID to check if it's the same top-level transaction.
- * It's enough to know it was created in an in-progress transaction,
- * and we know it must be the current one because otherwise it wouldn't
- * see the sequence object.
- *
- * The XID may be valid even for non-transactional sequences - we simply
- * keep the XID logged to WAL, it's up to the reorderbuffer to decide if
- * the increment is transactional.
- *
* -------------------------------------------------------------------------
*/
#include "postgres.h"
@@ -125,7 +91,6 @@
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
-#include "commands/sequence.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -151,13 +116,6 @@ typedef struct ReorderBufferTXNByIdEnt
ReorderBufferTXN *txn;
} ReorderBufferTXNByIdEnt;
-/* entry for hash table we use to track sequences created in running xacts */
-typedef struct ReorderBufferSequenceEnt
-{
- RelFileNode rnode;
- TransactionId xid;
-} ReorderBufferSequenceEnt;
-
/* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
typedef struct ReorderBufferTupleCidKey
{
@@ -388,14 +346,6 @@ ReorderBufferAllocate(void)
buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
- /* hash table of sequences, mapping relfilenode to XID of transaction */
- hash_ctl.keysize = sizeof(RelFileNode);
- hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt);
- hash_ctl.hcxt = buffer->context;
-
- buffer->sequences = hash_create("ReorderBufferSequenceHash", 1000, &hash_ctl,
- HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
buffer->by_txn_last_xid = InvalidTransactionId;
buffer->by_txn_last_txn = NULL;
@@ -582,13 +532,6 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
change->data.truncate.relids = NULL;
}
break;
- case REORDER_BUFFER_CHANGE_SEQUENCE:
- if (change->data.sequence.tuple)
- {
- ReorderBufferReturnTupleBuf(rb, change->data.sequence.tuple);
- change->data.sequence.tuple = NULL;
- }
- break;
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
@@ -924,230 +867,6 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
}
/*
- * Treat the sequence increment as transactional?
- *
- * The hash table tracks all sequences created in in-progress transactions,
- * so we simply do a lookup (the sequence is identified by relfilende). If
- * we find a match, the increment should be handled as transactional.
- */
-bool
-ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
- RelFileNode rnode, bool created)
-{
- bool found = false;
-
- if (created)
- return true;
-
- hash_search(rb->sequences,
- (void *) &rnode,
- HASH_FIND,
- &found);
-
- return found;
-}
-
-/*
- * Cleanup sequences created in in-progress transactions.
- *
- * There's no way to search by XID, so we simply do a seqscan of all
- * the entries in the hash table. Hopefully there are only a couple
- * entries in most cases - people generally don't create many new
- * sequences over and over.
- */
-static void
-ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid)
-{
- HASH_SEQ_STATUS scan_status;
- ReorderBufferSequenceEnt *ent;
-
- hash_seq_init(&scan_status, rb->sequences);
- while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
- {
- /* skip sequences not from this transaction */
- if (ent->xid != xid)
- continue;
-
- (void) hash_search(rb->sequences,
- (void *) &(ent->rnode),
- HASH_REMOVE, NULL);
- }
-}
-
-/*
- * A transactional sequence increment is queued to be processed upon commit
- * and a non-transactional increment gets processed immediately.
- *
- * A sequence update may be both transactional and non-transactional. When
- * created in a running transaction, treat it as transactional and queue
- * the change in it. Otherwise treat it as non-transactional, so that we
- * don't forget the increment in case of a rollback.
- */
-void
-ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
- Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
- RelFileNode rnode, bool transactional, bool created,
- ReorderBufferTupleBuf *tuplebuf)
-{
- /*
- * Change needs to be handled as transactional, because the sequence was
- * created in a transaction that is still running. In that case all the
- * changes need to be queued in that transaction, we must not send them
- * to the downstream until the transaction commits.
- *
- * There's a bit of a trouble with subtransactions - we can't queue it
- * into the subxact, because it might be rolled back and we'd lose the
- * increment. We need to queue it into the same (sub)xact that created
- * the sequence, which is why we track the XID in the hash table.
- */
- if (transactional)
- {
- MemoryContext oldcontext;
- ReorderBufferChange *change;
-
- /* lookup sequence by relfilenode */
- ReorderBufferSequenceEnt *ent;
- bool found;
-
- /* transactional changes require a transaction */
- Assert(xid != InvalidTransactionId);
-
- /* search the lookup table (we ignore the return value, found is enough) */
- ent = hash_search(rb->sequences,
- (void *) &rnode,
- created ? HASH_ENTER : HASH_FIND,
- &found);
-
- /*
- * If this is the "create" increment, we must not have found any
- * pre-existing entry in the hash table (i.e. there must not be
- * any conflicting sequence).
- */
- Assert(!(created && found));
-
- /* But we must have either created or found an existing entry. */
- Assert(created || found);
-
- /*
- * When creating the sequence, remember the XID of the transaction
- * that created id.
- */
- if (created)
- ent->xid = xid;
-
- /* XXX Maybe check that we're still in the same top-level xact? */
-
- /* OK, allocate and queue the change */
- oldcontext = MemoryContextSwitchTo(rb->context);
-
- change = ReorderBufferGetChange(rb);
-
- change->action = REORDER_BUFFER_CHANGE_SEQUENCE;
- change->origin_id = origin_id;
-
- memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode));
-
- change->data.sequence.tuple = tuplebuf;
-
- /* add it to the same subxact that created the sequence */
- ReorderBufferQueueChange(rb, ent->xid, lsn, change, false);
-
- MemoryContextSwitchTo(oldcontext);
- }
- else
- {
- /*
- * This increment is for a sequence that was not created in any
- * running transaction, so we treat it as non-transactional and
- * just send it to the output plugin directly.
- */
- ReorderBufferTXN *txn = NULL;
- volatile Snapshot snapshot_now = snapshot;
- bool using_subtxn;
-
-#ifdef USE_ASSERT_CHECKING
- /* All "creates" have to be handled as transactional. */
- Assert(!created);
-
- /* Make sure the sequence is not in the hash table. */
- {
- bool found;
- hash_search(rb->sequences,
- (void *) &rnode,
- HASH_FIND, &found);
- Assert(!found);
- }
-#endif
-
- if (xid != InvalidTransactionId)
- txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
-
- /* setup snapshot to allow catalog access */
- SetupHistoricSnapshot(snapshot_now, NULL);
-
- /*
- * Decoding needs access to syscaches et al., which in turn use
- * heavyweight locks and such. Thus we need to have enough state around to
- * keep track of those. The easiest way is to simply use a transaction
- * internally. That also allows us to easily enforce that nothing writes
- * to the database by checking for xid assignments.
- *
- * When we're called via the SQL SRF there's already a transaction
- * started, so start an explicit subtransaction there.
- */
- using_subtxn = IsTransactionOrTransactionBlock();
-
- PG_TRY();
- {
- Relation relation;
- HeapTuple tuple;
- Form_pg_sequence_data seq;
- Oid reloid;
-
- if (using_subtxn)
- BeginInternalSubTransaction("sequence");
- else
- StartTransactionCommand();
-
- reloid = RelidByRelfilenode(rnode.spcNode, rnode.relNode);
-
- if (reloid == InvalidOid)
- elog(ERROR, "could not map filenode \"%s\" to relation OID",
- relpathperm(rnode,
- MAIN_FORKNUM));
-
- relation = RelationIdGetRelation(reloid);
- tuple = &tuplebuf->tuple;
- seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
-
- rb->sequence(rb, txn, lsn, relation, transactional,
- seq->last_value, seq->log_cnt, seq->is_called);
-
- RelationClose(relation);
-
- TeardownHistoricSnapshot(false);
-
- AbortCurrentTransaction();
-
- if (using_subtxn)
- RollbackAndReleaseCurrentSubTransaction();
- }
- PG_CATCH();
- {
- TeardownHistoricSnapshot(true);
-
- AbortCurrentTransaction();
-
- if (using_subtxn)
- RollbackAndReleaseCurrentSubTransaction();
-
- PG_RE_THROW();
- }
- PG_END_TRY();
- }
-}
-
-/*
* AssertTXNLsnOrder
* Verify LSN ordering of transaction lists in the reorderbuffer
*
@@ -1823,9 +1542,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
&found);
Assert(found);
- /* Remove sequences created in this transaction (if any). */
- ReorderBufferSequenceCleanup(rb, txn->xid);
-
/* remove entries spilled to disk */
if (rbtxn_is_serialized(txn))
ReorderBufferRestoreCleanup(rb, txn);
@@ -2242,29 +1958,6 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
}
/*
- * Helper function for ReorderBufferProcessTXN for applying sequences.
- */
-static inline void
-ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn,
- Relation relation, ReorderBufferChange *change,
- bool streaming)
-{
- HeapTuple tuple;
- Form_pg_sequence_data seq;
-
- tuple = &change->data.sequence.tuple->tuple;
- seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
-
- /* Only ever called from ReorderBufferApplySequence, so transational. */
- if (streaming)
- rb->stream_sequence(rb, txn, change->lsn, relation, true,
- seq->last_value, seq->log_cnt, seq->is_called);
- else
- rb->sequence(rb, txn, change->lsn, relation, true,
- seq->last_value, seq->log_cnt, seq->is_called);
-}
-
-/*
* Function to store the command id and snapshot at the end of the current
* stream so that we can reuse the same while sending the next stream.
*/
@@ -2706,31 +2399,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
elog(ERROR, "tuplecid value in changequeue");
break;
-
- case REORDER_BUFFER_CHANGE_SEQUENCE:
- Assert(snapshot_now);
-
- reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode,
- change->data.sequence.relnode.relNode);
-
- if (reloid == InvalidOid)
- elog(ERROR, "could not map filenode \"%s\" to relation OID",
- relpathperm(change->data.sequence.relnode,
- MAIN_FORKNUM));
-
- relation = RelationIdGetRelation(reloid);
-
- if (!RelationIsValid(relation))
- elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
- reloid,
- relpathperm(change->data.sequence.relnode,
- MAIN_FORKNUM));
-
- if (RelationIsLogicallyLogged(relation))
- ReorderBufferApplySequence(rb, txn, relation, change, streaming);
-
- RelationClose(relation);
- break;
}
}
@@ -4117,39 +3785,6 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
}
- case REORDER_BUFFER_CHANGE_SEQUENCE:
- {
- char *data;
- ReorderBufferTupleBuf *tup;
- Size len = 0;
-
- tup = change->data.sequence.tuple;
-
- if (tup)
- {
- sz += sizeof(HeapTupleData);
- len = tup->tuple.t_len;
- sz += len;
- }
-
- /* make sure we have enough space */
- ReorderBufferSerializeReserve(rb, sz);
-
- data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
- /* might have been reallocated above */
- ondisk = (ReorderBufferDiskChange *) rb->outbuf;
-
- if (len)
- {
- memcpy(data, &tup->tuple, sizeof(HeapTupleData));
- data += sizeof(HeapTupleData);
-
- memcpy(data, tup->tuple.t_data, len);
- data += len;
- }
-
- break;
- }
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
@@ -4414,22 +4049,6 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
break;
}
- case REORDER_BUFFER_CHANGE_SEQUENCE:
- {
- ReorderBufferTupleBuf *tup;
- Size len = 0;
-
- tup = change->data.sequence.tuple;
-
- if (tup)
- {
- sz += sizeof(HeapTupleData);
- len = tup->tuple.t_len;
- sz += len;
- }
-
- break;
- }
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
@@ -4729,30 +4348,6 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
}
-
- case REORDER_BUFFER_CHANGE_SEQUENCE:
- if (change->data.sequence.tuple)
- {
- uint32 tuplelen = ((HeapTuple) data)->t_len;
-
- change->data.sequence.tuple =
- ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
-
- /* restore ->tuple */
- memcpy(&change->data.sequence.tuple->tuple, data,
- sizeof(HeapTupleData));
- data += sizeof(HeapTupleData);
-
- /* reset t_data pointer into the new tuplebuf */
- change->data.sequence.tuple->tuple.t_data =
- ReorderBufferTupleBufData(change->data.sequence.tuple);
-
- /* restore tuple data itself */
- memcpy(change->data.sequence.tuple->tuple.t_data, data, tuplelen);
- data += tuplelen;
- }
- break;
-
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: