diff options
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 405 |
1 files changed, 0 insertions, 405 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 4702750a2e7..5adc016d449 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -77,40 +77,6 @@ * a bit more memory to the oldest subtransactions, because it's likely * they are the source for the next sequence of changes. * - * When decoding sequences, we differentiate between a sequences created - * in a (running) transaction, and sequences created in other (already - * committed) transactions. Changes for sequences created in the same - * top-level transaction are treated as "transactional" i.e. just like - * any other change from that transaction (and discarded in case of a - * rollback). Changes for sequences created earlier are treated as not - * transactional - are processed immediately, as if performed outside - * any transaction (and thus not rolled back). - * - * This mixed behavior is necessary - sequences are non-transactional - * (e.g. ROLLBACK does not undo the sequence increments). But for new - * sequences, we need to handle them in a transactional way, because if - * we ever get some DDL support, the sequence won't exist until the - * transaction gets applied. So we need to ensure the increments don't - * happen until the sequence gets created. - * - * To differentiate which sequences are "old" and which were created - * in a still-running transaction, we track sequences created in running - * transactions in a hash table. Sequences are identified by relfilenode, - * and we track XID of the (sub)transaction that created it. This means - * that if a transaction does something that changes the relfilenode - * (like an alter / reset of a sequence), the new relfilenode will be - * treated as if created in the transaction. The list of sequences gets - * discarded when the transaction completes (commit/rollback). - * - * We don't use the XID to check if it's the same top-level transaction. - * It's enough to know it was created in an in-progress transaction, - * and we know it must be the current one because otherwise it wouldn't - * see the sequence object. - * - * The XID may be valid even for non-transactional sequences - we simply - * keep the XID logged to WAL, it's up to the reorderbuffer to decide if - * the increment is transactional. - * * ------------------------------------------------------------------------- */ #include "postgres.h" @@ -125,7 +91,6 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/catalog.h" -#include "commands/sequence.h" #include "lib/binaryheap.h" #include "miscadmin.h" #include "pgstat.h" @@ -151,13 +116,6 @@ typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXN *txn; } ReorderBufferTXNByIdEnt; -/* entry for hash table we use to track sequences created in running xacts */ -typedef struct ReorderBufferSequenceEnt -{ - RelFileNode rnode; - TransactionId xid; -} ReorderBufferSequenceEnt; - /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */ typedef struct ReorderBufferTupleCidKey { @@ -388,14 +346,6 @@ ReorderBufferAllocate(void) buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - /* hash table of sequences, mapping relfilenode to XID of transaction */ - hash_ctl.keysize = sizeof(RelFileNode); - hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt); - hash_ctl.hcxt = buffer->context; - - buffer->sequences = hash_create("ReorderBufferSequenceHash", 1000, &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - buffer->by_txn_last_xid = InvalidTransactionId; buffer->by_txn_last_txn = NULL; @@ -582,13 +532,6 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, change->data.truncate.relids = NULL; } break; - case REORDER_BUFFER_CHANGE_SEQUENCE: - if (change->data.sequence.tuple) - { - ReorderBufferReturnTupleBuf(rb, change->data.sequence.tuple); - change->data.sequence.tuple = NULL; - } - break; case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: @@ -924,230 +867,6 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, } /* - * Treat the sequence increment as transactional? - * - * The hash table tracks all sequences created in in-progress transactions, - * so we simply do a lookup (the sequence is identified by relfilende). If - * we find a match, the increment should be handled as transactional. - */ -bool -ReorderBufferSequenceIsTransactional(ReorderBuffer *rb, - RelFileNode rnode, bool created) -{ - bool found = false; - - if (created) - return true; - - hash_search(rb->sequences, - (void *) &rnode, - HASH_FIND, - &found); - - return found; -} - -/* - * Cleanup sequences created in in-progress transactions. - * - * There's no way to search by XID, so we simply do a seqscan of all - * the entries in the hash table. Hopefully there are only a couple - * entries in most cases - people generally don't create many new - * sequences over and over. - */ -static void -ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid) -{ - HASH_SEQ_STATUS scan_status; - ReorderBufferSequenceEnt *ent; - - hash_seq_init(&scan_status, rb->sequences); - while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL) - { - /* skip sequences not from this transaction */ - if (ent->xid != xid) - continue; - - (void) hash_search(rb->sequences, - (void *) &(ent->rnode), - HASH_REMOVE, NULL); - } -} - -/* - * A transactional sequence increment is queued to be processed upon commit - * and a non-transactional increment gets processed immediately. - * - * A sequence update may be both transactional and non-transactional. When - * created in a running transaction, treat it as transactional and queue - * the change in it. Otherwise treat it as non-transactional, so that we - * don't forget the increment in case of a rollback. - */ -void -ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid, - Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id, - RelFileNode rnode, bool transactional, bool created, - ReorderBufferTupleBuf *tuplebuf) -{ - /* - * Change needs to be handled as transactional, because the sequence was - * created in a transaction that is still running. In that case all the - * changes need to be queued in that transaction, we must not send them - * to the downstream until the transaction commits. - * - * There's a bit of a trouble with subtransactions - we can't queue it - * into the subxact, because it might be rolled back and we'd lose the - * increment. We need to queue it into the same (sub)xact that created - * the sequence, which is why we track the XID in the hash table. - */ - if (transactional) - { - MemoryContext oldcontext; - ReorderBufferChange *change; - - /* lookup sequence by relfilenode */ - ReorderBufferSequenceEnt *ent; - bool found; - - /* transactional changes require a transaction */ - Assert(xid != InvalidTransactionId); - - /* search the lookup table (we ignore the return value, found is enough) */ - ent = hash_search(rb->sequences, - (void *) &rnode, - created ? HASH_ENTER : HASH_FIND, - &found); - - /* - * If this is the "create" increment, we must not have found any - * pre-existing entry in the hash table (i.e. there must not be - * any conflicting sequence). - */ - Assert(!(created && found)); - - /* But we must have either created or found an existing entry. */ - Assert(created || found); - - /* - * When creating the sequence, remember the XID of the transaction - * that created id. - */ - if (created) - ent->xid = xid; - - /* XXX Maybe check that we're still in the same top-level xact? */ - - /* OK, allocate and queue the change */ - oldcontext = MemoryContextSwitchTo(rb->context); - - change = ReorderBufferGetChange(rb); - - change->action = REORDER_BUFFER_CHANGE_SEQUENCE; - change->origin_id = origin_id; - - memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode)); - - change->data.sequence.tuple = tuplebuf; - - /* add it to the same subxact that created the sequence */ - ReorderBufferQueueChange(rb, ent->xid, lsn, change, false); - - MemoryContextSwitchTo(oldcontext); - } - else - { - /* - * This increment is for a sequence that was not created in any - * running transaction, so we treat it as non-transactional and - * just send it to the output plugin directly. - */ - ReorderBufferTXN *txn = NULL; - volatile Snapshot snapshot_now = snapshot; - bool using_subtxn; - -#ifdef USE_ASSERT_CHECKING - /* All "creates" have to be handled as transactional. */ - Assert(!created); - - /* Make sure the sequence is not in the hash table. */ - { - bool found; - hash_search(rb->sequences, - (void *) &rnode, - HASH_FIND, &found); - Assert(!found); - } -#endif - - if (xid != InvalidTransactionId) - txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); - - /* setup snapshot to allow catalog access */ - SetupHistoricSnapshot(snapshot_now, NULL); - - /* - * Decoding needs access to syscaches et al., which in turn use - * heavyweight locks and such. Thus we need to have enough state around to - * keep track of those. The easiest way is to simply use a transaction - * internally. That also allows us to easily enforce that nothing writes - * to the database by checking for xid assignments. - * - * When we're called via the SQL SRF there's already a transaction - * started, so start an explicit subtransaction there. - */ - using_subtxn = IsTransactionOrTransactionBlock(); - - PG_TRY(); - { - Relation relation; - HeapTuple tuple; - Form_pg_sequence_data seq; - Oid reloid; - - if (using_subtxn) - BeginInternalSubTransaction("sequence"); - else - StartTransactionCommand(); - - reloid = RelidByRelfilenode(rnode.spcNode, rnode.relNode); - - if (reloid == InvalidOid) - elog(ERROR, "could not map filenode \"%s\" to relation OID", - relpathperm(rnode, - MAIN_FORKNUM)); - - relation = RelationIdGetRelation(reloid); - tuple = &tuplebuf->tuple; - seq = (Form_pg_sequence_data) GETSTRUCT(tuple); - - rb->sequence(rb, txn, lsn, relation, transactional, - seq->last_value, seq->log_cnt, seq->is_called); - - RelationClose(relation); - - TeardownHistoricSnapshot(false); - - AbortCurrentTransaction(); - - if (using_subtxn) - RollbackAndReleaseCurrentSubTransaction(); - } - PG_CATCH(); - { - TeardownHistoricSnapshot(true); - - AbortCurrentTransaction(); - - if (using_subtxn) - RollbackAndReleaseCurrentSubTransaction(); - - PG_RE_THROW(); - } - PG_END_TRY(); - } -} - -/* * AssertTXNLsnOrder * Verify LSN ordering of transaction lists in the reorderbuffer * @@ -1823,9 +1542,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) &found); Assert(found); - /* Remove sequences created in this transaction (if any). */ - ReorderBufferSequenceCleanup(rb, txn->xid); - /* remove entries spilled to disk */ if (rbtxn_is_serialized(txn)) ReorderBufferRestoreCleanup(rb, txn); @@ -2242,29 +1958,6 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, } /* - * Helper function for ReorderBufferProcessTXN for applying sequences. - */ -static inline void -ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn, - Relation relation, ReorderBufferChange *change, - bool streaming) -{ - HeapTuple tuple; - Form_pg_sequence_data seq; - - tuple = &change->data.sequence.tuple->tuple; - seq = (Form_pg_sequence_data) GETSTRUCT(tuple); - - /* Only ever called from ReorderBufferApplySequence, so transational. */ - if (streaming) - rb->stream_sequence(rb, txn, change->lsn, relation, true, - seq->last_value, seq->log_cnt, seq->is_called); - else - rb->sequence(rb, txn, change->lsn, relation, true, - seq->last_value, seq->log_cnt, seq->is_called); -} - -/* * Function to store the command id and snapshot at the end of the current * stream so that we can reuse the same while sending the next stream. */ @@ -2706,31 +2399,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: elog(ERROR, "tuplecid value in changequeue"); break; - - case REORDER_BUFFER_CHANGE_SEQUENCE: - Assert(snapshot_now); - - reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode, - change->data.sequence.relnode.relNode); - - if (reloid == InvalidOid) - elog(ERROR, "could not map filenode \"%s\" to relation OID", - relpathperm(change->data.sequence.relnode, - MAIN_FORKNUM)); - - relation = RelationIdGetRelation(reloid); - - if (!RelationIsValid(relation)) - elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")", - reloid, - relpathperm(change->data.sequence.relnode, - MAIN_FORKNUM)); - - if (RelationIsLogicallyLogged(relation)) - ReorderBufferApplySequence(rb, txn, relation, change, streaming); - - RelationClose(relation); - break; } } @@ -4117,39 +3785,6 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } - case REORDER_BUFFER_CHANGE_SEQUENCE: - { - char *data; - ReorderBufferTupleBuf *tup; - Size len = 0; - - tup = change->data.sequence.tuple; - - if (tup) - { - sz += sizeof(HeapTupleData); - len = tup->tuple.t_len; - sz += len; - } - - /* make sure we have enough space */ - ReorderBufferSerializeReserve(rb, sz); - - data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); - /* might have been reallocated above */ - ondisk = (ReorderBufferDiskChange *) rb->outbuf; - - if (len) - { - memcpy(data, &tup->tuple, sizeof(HeapTupleData)); - data += sizeof(HeapTupleData); - - memcpy(data, tup->tuple.t_data, len); - data += len; - } - - break; - } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: @@ -4414,22 +4049,6 @@ ReorderBufferChangeSize(ReorderBufferChange *change) break; } - case REORDER_BUFFER_CHANGE_SEQUENCE: - { - ReorderBufferTupleBuf *tup; - Size len = 0; - - tup = change->data.sequence.tuple; - - if (tup) - { - sz += sizeof(HeapTupleData); - len = tup->tuple.t_len; - sz += len; - } - - break; - } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: @@ -4729,30 +4348,6 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } - - case REORDER_BUFFER_CHANGE_SEQUENCE: - if (change->data.sequence.tuple) - { - uint32 tuplelen = ((HeapTuple) data)->t_len; - - change->data.sequence.tuple = - ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); - - /* restore ->tuple */ - memcpy(&change->data.sequence.tuple->tuple, data, - sizeof(HeapTupleData)); - data += sizeof(HeapTupleData); - - /* reset t_data pointer into the new tuplebuf */ - change->data.sequence.tuple->tuple.t_data = - ReorderBufferTupleBufData(change->data.sequence.tuple); - - /* restore tuple data itself */ - memcpy(change->data.sequence.tuple->tuple.t_data, data, tuplelen); - data += tuplelen; - } - break; - case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: |