diff options
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 283 |
1 files changed, 137 insertions, 146 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index ad560345b40..4493930eda0 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -80,26 +80,6 @@ #include "utils/relfilenodemap.h" #include "utils/tqual.h" -/* - * For efficiency and simplicity reasons we want to keep Snapshots, CommandIds - * and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE - * changes. We don't want to leak those internal values to external users - * though (they would just use switch()...default:) because that would make it - * harder to add to new user visible values. - * - * This needs to be synchronized with ReorderBufferChangeType! Adjust the - * StaticAssertExpr's in ReorderBufferAllocate if you add anything! - */ -typedef enum -{ - REORDER_BUFFER_CHANGE_INTERNAL_INSERT, - REORDER_BUFFER_CHANGE_INTERNAL_UPDATE, - REORDER_BUFFER_CHANGE_INTERNAL_DELETE, - REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT, - REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID, - REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID -} ReorderBufferChangeTypeInternal; - /* entry for a hash table we use to map from xid to our transaction state */ typedef struct ReorderBufferTXNByIdEnt { @@ -255,10 +235,6 @@ ReorderBufferAllocate(void) HASHCTL hash_ctl; MemoryContext new_ctx; - StaticAssertExpr((int) REORDER_BUFFER_CHANGE_INTERNAL_INSERT == (int) REORDER_BUFFER_CHANGE_INSERT, "out of sync enums"); - StaticAssertExpr((int) REORDER_BUFFER_CHANGE_INTERNAL_UPDATE == (int) REORDER_BUFFER_CHANGE_UPDATE, "out of sync enums"); - StaticAssertExpr((int) REORDER_BUFFER_CHANGE_INTERNAL_DELETE == (int) REORDER_BUFFER_CHANGE_DELETE, "out of sync enums"); - /* allocate memory in own context, to have better accountability */ new_ctx = AllocSetContextCreate(CurrentMemoryContext, "ReorderBuffer", @@ -427,28 +403,28 @@ void ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) { /* free contained data */ - switch ((ReorderBufferChangeTypeInternal) change->action_internal) + switch (change->action) { - case REORDER_BUFFER_CHANGE_INTERNAL_INSERT: - case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE: - case REORDER_BUFFER_CHANGE_INTERNAL_DELETE: - if (change->tp.newtuple) + case REORDER_BUFFER_CHANGE_INSERT: + case REORDER_BUFFER_CHANGE_UPDATE: + case REORDER_BUFFER_CHANGE_DELETE: + if (change->data.tp.newtuple) { - ReorderBufferReturnTupleBuf(rb, change->tp.newtuple); - change->tp.newtuple = NULL; + ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple); + change->data.tp.newtuple = NULL; } - if (change->tp.oldtuple) + if (change->data.tp.oldtuple) { - ReorderBufferReturnTupleBuf(rb, change->tp.oldtuple); - change->tp.oldtuple = NULL; + ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple); + change->data.tp.oldtuple = NULL; } break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: - if (change->snapshot) + if (change->data.snapshot) { - ReorderBufferFreeSnap(rb, change->snapshot); - change->snapshot = NULL; + ReorderBufferFreeSnap(rb, change->data.snapshot); + change->data.snapshot = NULL; } break; case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: @@ -1086,7 +1062,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferChange *change; change = dlist_container(ReorderBufferChange, node, iter.cur); - Assert(change->action_internal == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); + Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); ReorderBufferReturnChange(rb, change); } @@ -1161,14 +1137,14 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) change = dlist_container(ReorderBufferChange, node, iter.cur); - Assert(change->action_internal == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); + Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); /* be careful about padding */ memset(&key, 0, sizeof(ReorderBufferTupleCidKey)); - key.relnode = change->tuplecid.node; + key.relnode = change->data.tuplecid.node; - ItemPointerCopy(&change->tuplecid.tid, + ItemPointerCopy(&change->data.tuplecid.tid, &key.tid); ent = (ReorderBufferTupleCidEnt *) @@ -1178,22 +1154,22 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) &found); if (!found) { - ent->cmin = change->tuplecid.cmin; - ent->cmax = change->tuplecid.cmax; - ent->combocid = change->tuplecid.combocid; + ent->cmin = change->data.tuplecid.cmin; + ent->cmax = change->data.tuplecid.cmax; + ent->combocid = change->data.tuplecid.combocid; } else { - Assert(ent->cmin == change->tuplecid.cmin); + Assert(ent->cmin == change->data.tuplecid.cmin); Assert(ent->cmax == InvalidCommandId || - ent->cmax == change->tuplecid.cmax); + ent->cmax == change->data.tuplecid.cmax); /* * if the tuple got valid in this transaction and now got deleted * we already have a valid cmin stored. The cmax will be * InvalidCommandId though. */ - ent->cmax = change->tuplecid.cmax; + ent->cmax = change->data.tuplecid.cmax; } } } @@ -1367,33 +1343,33 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, Relation relation = NULL; Oid reloid; - switch ((ReorderBufferChangeTypeInternal) change->action_internal) + switch (change->action) { - case REORDER_BUFFER_CHANGE_INTERNAL_INSERT: - case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE: - case REORDER_BUFFER_CHANGE_INTERNAL_DELETE: + case REORDER_BUFFER_CHANGE_INSERT: + case REORDER_BUFFER_CHANGE_UPDATE: + case REORDER_BUFFER_CHANGE_DELETE: Assert(snapshot_now); - reloid = RelidByRelfilenode(change->tp.relnode.spcNode, - change->tp.relnode.relNode); + reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode, + change->data.tp.relnode.relNode); /* * Catalog tuple without data, emitted while catalog was * in the process of being rewritten. */ if (reloid == InvalidOid && - change->tp.newtuple == NULL && - change->tp.oldtuple == NULL) + change->data.tp.newtuple == NULL && + change->data.tp.oldtuple == NULL) continue; else if (reloid == InvalidOid) elog(ERROR, "could not lookup relation %s", - relpathperm(change->tp.relnode, MAIN_FORKNUM)); + relpathperm(change->data.tp.relnode, MAIN_FORKNUM)); relation = RelationIdGetRelation(reloid); if (relation == NULL) elog(ERROR, "could open relation descriptor %s", - relpathperm(change->tp.relnode, MAIN_FORKNUM)); + relpathperm(change->data.tp.relnode, MAIN_FORKNUM)); if (RelationIsLogicallyLogged(relation)) { @@ -1440,7 +1416,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, { ReorderBufferFreeSnap(rb, snapshot_now); snapshot_now = - ReorderBufferCopySnap(rb, change->snapshot, + ReorderBufferCopySnap(rb, change->data.snapshot, txn, command_id); } /* @@ -1448,15 +1424,15 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, * free. We could introduce refcounting for that, but for * now this seems infrequent enough not to care. */ - else if (change->snapshot->copied) + else if (change->data.snapshot->copied) { snapshot_now = - ReorderBufferCopySnap(rb, change->snapshot, + ReorderBufferCopySnap(rb, change->data.snapshot, txn, command_id); } else { - snapshot_now = change->snapshot; + snapshot_now = change->data.snapshot; } @@ -1465,11 +1441,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, break; case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: - Assert(change->command_id != InvalidCommandId); + Assert(change->data.command_id != InvalidCommandId); - if (command_id < change->command_id) + if (command_id < change->data.command_id) { - command_id = change->command_id; + command_id = change->data.command_id; if (!snapshot_now->copied) { @@ -1712,8 +1688,8 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, { ReorderBufferChange *change = ReorderBufferGetChange(rb); - change->snapshot = snap; - change->action_internal = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT; + change->data.snapshot = snap; + change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT; ReorderBufferQueueChange(rb, xid, lsn, change); } @@ -1752,8 +1728,8 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, { ReorderBufferChange *change = ReorderBufferGetChange(rb); - change->command_id = cid; - change->action_internal = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID; + change->data.command_id = cid; + change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID; ReorderBufferQueueChange(rb, xid, lsn, change); } @@ -1773,13 +1749,13 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); - change->tuplecid.node = node; - change->tuplecid.tid = tid; - change->tuplecid.cmin = cmin; - change->tuplecid.cmax = cmax; - change->tuplecid.combocid = combocid; + change->data.tuplecid.node = node; + change->data.tuplecid.tid = tid; + change->data.tuplecid.cmin = cmin; + change->data.tuplecid.cmax = cmax; + change->data.tuplecid.combocid = combocid; change->lsn = lsn; - change->action_internal = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID; + change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID; dlist_push_tail(&txn->tuplecids, &change->node); txn->ntuplecids++; @@ -2017,26 +1993,30 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ondisk = (ReorderBufferDiskChange *) rb->outbuf; memcpy(&ondisk->change, change, sizeof(ReorderBufferChange)); - switch ((ReorderBufferChangeTypeInternal) change->action_internal) + switch (change->action) { - case REORDER_BUFFER_CHANGE_INTERNAL_INSERT: + case REORDER_BUFFER_CHANGE_INSERT: /* fall through */ - case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE: + case REORDER_BUFFER_CHANGE_UPDATE: /* fall through */ - case REORDER_BUFFER_CHANGE_INTERNAL_DELETE: + case REORDER_BUFFER_CHANGE_DELETE: { char *data; + ReorderBufferTupleBuf *oldtup, *newtup; Size oldlen = 0; Size newlen = 0; - if (change->tp.oldtuple) + oldtup = change->data.tp.oldtuple; + newtup = change->data.tp.newtuple; + + if (oldtup) oldlen = offsetof(ReorderBufferTupleBuf, data) - + change->tp.oldtuple->tuple.t_len + + oldtup->tuple.t_len - offsetof(HeapTupleHeaderData, t_bits); - if (change->tp.newtuple) + if (newtup) newlen = offsetof(ReorderBufferTupleBuf, data) - + change->tp.newtuple->tuple.t_len + + newtup->tuple.t_len - offsetof(HeapTupleHeaderData, t_bits); sz += oldlen; @@ -2051,26 +2031,27 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, if (oldlen) { - memcpy(data, change->tp.oldtuple, oldlen); + memcpy(data, oldtup, oldlen); data += oldlen; - Assert(&change->tp.oldtuple->header == change->tp.oldtuple->tuple.t_data); } if (newlen) { - memcpy(data, change->tp.newtuple, newlen); + memcpy(data, newtup, newlen); data += newlen; - Assert(&change->tp.newtuple->header == change->tp.newtuple->tuple.t_data); } break; } case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { + Snapshot snap; char *data; + snap = change->data.snapshot; + sz += sizeof(SnapshotData) + - sizeof(TransactionId) * change->snapshot->xcnt + - sizeof(TransactionId) * change->snapshot->subxcnt + sizeof(TransactionId) * snap->xcnt + + sizeof(TransactionId) * snap->subxcnt ; /* make sure we have enough space */ @@ -2079,21 +2060,21 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, /* might have been reallocated above */ ondisk = (ReorderBufferDiskChange *) rb->outbuf; - memcpy(data, change->snapshot, sizeof(SnapshotData)); + memcpy(data, snap, sizeof(SnapshotData)); data += sizeof(SnapshotData); - if (change->snapshot->xcnt) + if (snap->xcnt) { - memcpy(data, change->snapshot->xip, - sizeof(TransactionId) + change->snapshot->xcnt); - data += sizeof(TransactionId) + change->snapshot->xcnt; + memcpy(data, snap->xip, + sizeof(TransactionId) + snap->xcnt); + data += sizeof(TransactionId) + snap->xcnt; } - if (change->snapshot->subxcnt) + if (snap->subxcnt) { - memcpy(data, change->snapshot->subxip, - sizeof(TransactionId) + change->snapshot->subxcnt); - data += sizeof(TransactionId) + change->snapshot->subxcnt; + memcpy(data, snap->subxip, + sizeof(TransactionId) + snap->subxcnt); + data += sizeof(TransactionId) + snap->subxcnt; } break; } @@ -2116,7 +2097,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, txn->xid))); } - Assert(ondisk->change.action_internal == change->action_internal); + Assert(ondisk->change.action == change->action); } /* @@ -2271,56 +2252,60 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, data += sizeof(ReorderBufferDiskChange); /* restore individual stuff */ - switch ((ReorderBufferChangeTypeInternal) change->action_internal) + switch (change->action) { - case REORDER_BUFFER_CHANGE_INTERNAL_INSERT: + case REORDER_BUFFER_CHANGE_INSERT: /* fall through */ - case REORDER_BUFFER_CHANGE_INTERNAL_UPDATE: + case REORDER_BUFFER_CHANGE_UPDATE: /* fall through */ - case REORDER_BUFFER_CHANGE_INTERNAL_DELETE: - if (change->tp.newtuple) + case REORDER_BUFFER_CHANGE_DELETE: + if (change->data.tp.newtuple) { Size len = offsetof(ReorderBufferTupleBuf, data) +((ReorderBufferTupleBuf *) data)->tuple.t_len - offsetof(HeapTupleHeaderData, t_bits); - change->tp.newtuple = ReorderBufferGetTupleBuf(rb); - memcpy(change->tp.newtuple, data, len); - change->tp.newtuple->tuple.t_data = &change->tp.newtuple->header; - + change->data.tp.newtuple = ReorderBufferGetTupleBuf(rb); + memcpy(change->data.tp.newtuple, data, len); + change->data.tp.newtuple->tuple.t_data = + &change->data.tp.newtuple->header; data += len; } - if (change->tp.oldtuple) + if (change->data.tp.oldtuple) { Size len = offsetof(ReorderBufferTupleBuf, data) +((ReorderBufferTupleBuf *) data)->tuple.t_len - offsetof(HeapTupleHeaderData, t_bits); - change->tp.oldtuple = ReorderBufferGetTupleBuf(rb); - memcpy(change->tp.oldtuple, data, len); - change->tp.oldtuple->tuple.t_data = &change->tp.oldtuple->header; + change->data.tp.oldtuple = ReorderBufferGetTupleBuf(rb); + memcpy(change->data.tp.oldtuple, data, len); + change->data.tp.oldtuple->tuple.t_data = + &change->data.tp.oldtuple->header; data += len; } break; case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { - Snapshot oldsnap = (Snapshot) data; - Size size = sizeof(SnapshotData) + - sizeof(TransactionId) * oldsnap->xcnt + - sizeof(TransactionId) * (oldsnap->subxcnt + 0) - ; - - Assert(change->snapshot != NULL); - - change->snapshot = MemoryContextAllocZero(rb->context, size); - - memcpy(change->snapshot, data, size); - change->snapshot->xip = (TransactionId *) - (((char *) change->snapshot) + sizeof(SnapshotData)); - change->snapshot->subxip = - change->snapshot->xip + change->snapshot->xcnt + 0; - change->snapshot->copied = true; + Snapshot oldsnap; + Snapshot newsnap; + Size size; + + oldsnap = (Snapshot) data; + + size = sizeof(SnapshotData) + + sizeof(TransactionId) * oldsnap->xcnt + + sizeof(TransactionId) * (oldsnap->subxcnt + 0); + + change->data.snapshot = MemoryContextAllocZero(rb->context, size); + + newsnap = change->data.snapshot; + + memcpy(newsnap, data, size); + newsnap->xip = (TransactionId *) + (((char *) newsnap) + sizeof(SnapshotData)); + newsnap->subxip = newsnap->xip + newsnap->xcnt; + newsnap->copied = true; break; } /* the base struct contains all the data, easy peasy */ @@ -2464,6 +2449,7 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { ReorderBufferToastEnt *ent; + ReorderBufferTupleBuf *newtup; bool found; int32 chunksize; bool isnull; @@ -2477,9 +2463,10 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, Assert(IsToastRelation(relation)); - chunk_id = DatumGetObjectId(fastgetattr(&change->tp.newtuple->tuple, 1, desc, &isnull)); + newtup = change->data.tp.newtuple; + chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull)); Assert(!isnull); - chunk_seq = DatumGetInt32(fastgetattr(&change->tp.newtuple->tuple, 2, desc, &isnull)); + chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull)); Assert(!isnull); ent = (ReorderBufferToastEnt *) @@ -2505,7 +2492,7 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d", chunk_seq, chunk_id, ent->last_chunk_seq + 1); - chunk = DatumGetPointer(fastgetattr(&change->tp.newtuple->tuple, 3, desc, &isnull)); + chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull)); Assert(!isnull); /* calculate size so we can allocate the right size at once later */ @@ -2539,10 +2526,11 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Datum *attrs; bool *isnull; bool *free; - HeapTuple newtup; + HeapTuple tmphtup; Relation toast_rel; TupleDesc toast_desc; MemoryContext oldcontext; + ReorderBufferTupleBuf *newtup; /* no toast tuples changed */ if (txn->toast_hash == NULL) @@ -2551,7 +2539,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, oldcontext = MemoryContextSwitchTo(rb->context); /* we should only have toast tuples in an INSERT or UPDATE */ - Assert(change->tp.newtuple); + Assert(change->data.tp.newtuple); desc = RelationGetDescr(relation); @@ -2563,8 +2551,9 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, isnull = palloc0(sizeof(bool) * desc->natts); free = palloc0(sizeof(bool) * desc->natts); - heap_deform_tuple(&change->tp.newtuple->tuple, desc, - attrs, isnull); + newtup = change->data.tp.newtuple; + + heap_deform_tuple(&newtup->tuple, desc, attrs, isnull); for (natt = 0; natt < desc->natts; natt++) { @@ -2628,10 +2617,14 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, dlist_foreach(it, &ent->chunks) { bool isnull; - ReorderBufferTupleBuf *tup = - dlist_container(ReorderBufferChange, node, it.cur)->tp.newtuple; - Pointer chunk = - DatumGetPointer(fastgetattr(&tup->tuple, 3, toast_desc, &isnull)); + ReorderBufferChange *cchange; + ReorderBufferTupleBuf *ctup; + Pointer chunk; + + cchange = dlist_container(ReorderBufferChange, node, it.cur); + ctup = cchange->data.tp.newtuple; + chunk = DatumGetPointer( + fastgetattr(&ctup->tuple, 3, toast_desc, &isnull)); Assert(!isnull); Assert(!VARATT_IS_EXTERNAL(chunk)); @@ -2665,21 +2658,19 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, * passed to the output plugin. We can't directly heap_fill_tuple() into * the tuplebuf because attrs[] will point back into the current content. */ - newtup = heap_form_tuple(desc, attrs, isnull); - Assert(change->tp.newtuple->tuple.t_len <= MaxHeapTupleSize); - Assert(&change->tp.newtuple->header == change->tp.newtuple->tuple.t_data); + tmphtup = heap_form_tuple(desc, attrs, isnull); + Assert(newtup->tuple.t_len <= MaxHeapTupleSize); + Assert(&newtup->header == newtup->tuple.t_data); - memcpy(change->tp.newtuple->tuple.t_data, - newtup->t_data, - newtup->t_len); - change->tp.newtuple->tuple.t_len = newtup->t_len; + memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len); + newtup->tuple.t_len = tmphtup->t_len; /* * free resources we won't further need, more persistent stuff will be * free'd in ReorderBufferToastReset(). */ RelationClose(toast_rel); - pfree(newtup); + pfree(tmphtup); for (natt = 0; natt < desc->natts; natt++) { if (free[natt]) |