diff options
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/logical/decode.c | 158 | ||||
-rw-r--r-- | src/backend/replication/logical/logical.c | 7 | ||||
-rw-r--r-- | src/backend/replication/logical/logicalfuncs.c | 4 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 1 | ||||
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 2 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 2 |
6 files changed, 82 insertions, 92 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 8e78aafda7c..1c7dac38fc9 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -31,7 +31,9 @@ #include "access/transam.h" #include "access/xact.h" #include "access/xlog_internal.h" +#include "access/xlogutils.h" #include "access/xlogreader.h" +#include "access/xlogrecord.h" #include "catalog/pg_control.h" @@ -46,8 +48,7 @@ typedef struct XLogRecordBuffer { XLogRecPtr origptr; XLogRecPtr endptr; - XLogRecord record; - char *record_data; + XLogReaderState *record; } XLogRecordBuffer; /* RMGR Handlers */ @@ -79,17 +80,16 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); * context. */ void -LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record) +LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record) { XLogRecordBuffer buf; buf.origptr = ctx->reader->ReadRecPtr; buf.endptr = ctx->reader->EndRecPtr; - buf.record = *record; - buf.record_data = XLogRecGetData(record); + buf.record = record; /* cast so we get a warning when new rmgrs are added */ - switch ((RmgrIds) buf.record.xl_rmid) + switch ((RmgrIds) XLogRecGetRmid(record)) { /* * Rmgrs we care about for logical decoding. Add new rmgrs in @@ -135,7 +135,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record) case RM_BRIN_ID: break; case RM_NEXT_ID: - elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) buf.record.xl_rmid); + elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record)); } } @@ -146,7 +146,7 @@ static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; - uint8 info = buf->record.xl_info & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; switch (info) { @@ -185,8 +185,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; ReorderBuffer *reorder = ctx->reorder; - XLogRecord *r = &buf->record; - uint8 info = r->xl_info & ~XLR_INFO_MASK; + XLogReaderState *r = buf->record; + uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; /* no point in doing anything yet, data could not be decoded anyway */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) @@ -200,12 +200,12 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId *subxacts = NULL; SharedInvalidationMessage *invals = NULL; - xlrec = (xl_xact_commit *) buf->record_data; + xlrec = (xl_xact_commit *) XLogRecGetData(r); subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]); - DecodeCommit(ctx, buf, r->xl_xid, xlrec->dbId, + DecodeCommit(ctx, buf, XLogRecGetXid(r), xlrec->dbId, xlrec->xact_time, xlrec->nsubxacts, subxacts, xlrec->nmsgs, invals); @@ -220,7 +220,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) SharedInvalidationMessage *invals = NULL; /* Prepared commits contain a normal commit record... */ - prec = (xl_xact_commit_prepared *) buf->record_data; + prec = (xl_xact_commit_prepared *) XLogRecGetData(r); xlrec = &prec->crec; subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); @@ -237,9 +237,9 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { xl_xact_commit_compact *xlrec; - xlrec = (xl_xact_commit_compact *) buf->record_data; + xlrec = (xl_xact_commit_compact *) XLogRecGetData(r); - DecodeCommit(ctx, buf, r->xl_xid, InvalidOid, + DecodeCommit(ctx, buf, XLogRecGetXid(r), InvalidOid, xlrec->xact_time, xlrec->nsubxacts, xlrec->subxacts, 0, NULL); @@ -250,11 +250,11 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_xact_abort *xlrec; TransactionId *sub_xids; - xlrec = (xl_xact_abort *) buf->record_data; + xlrec = (xl_xact_abort *) XLogRecGetData(r); sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); - DecodeAbort(ctx, buf->origptr, r->xl_xid, + DecodeAbort(ctx, buf->origptr, XLogRecGetXid(r), sub_xids, xlrec->nsubxacts); break; } @@ -265,7 +265,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId *sub_xids; /* prepared abort contain a normal commit abort... */ - prec = (xl_xact_abort_prepared *) buf->record_data; + prec = (xl_xact_abort_prepared *) XLogRecGetData(r); xlrec = &prec->arec; sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); @@ -282,7 +282,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) int i; TransactionId *sub_xid; - xlrec = (xl_xact_assignment *) buf->record_data; + xlrec = (xl_xact_assignment *) XLogRecGetData(r); sub_xid = &xlrec->xsub[0]; @@ -316,14 +316,14 @@ static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; - XLogRecord *r = &buf->record; - uint8 info = r->xl_info & ~XLR_INFO_MASK; + XLogReaderState *r = buf->record; + uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; switch (info) { case XLOG_RUNNING_XACTS: { - xl_running_xacts *running = (xl_running_xacts *) buf->record_data; + xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r); SnapBuildProcessRunningXacts(builder, buf->origptr, running); @@ -352,8 +352,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { - uint8 info = buf->record.xl_info & XLOG_HEAP_OPMASK; - TransactionId xid = buf->record.xl_xid; + uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; + TransactionId xid = XLogRecGetXid(buf->record); SnapBuild *builder = ctx->snapshot_builder; /* no point in doing anything yet */ @@ -370,7 +370,7 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { xl_heap_new_cid *xlrec; - xlrec = (xl_heap_new_cid *) buf->record_data; + xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record); SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec); break; @@ -405,8 +405,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { - uint8 info = buf->record.xl_info & XLOG_HEAP_OPMASK; - TransactionId xid = buf->record.xl_xid; + uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; + TransactionId xid = XLogRecGetXid(buf->record); SnapBuild *builder = ctx->snapshot_builder; /* no point in doing anything yet */ @@ -576,34 +576,35 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { - XLogRecord *r = &buf->record; + XLogReaderState *r = buf->record; xl_heap_insert *xlrec; ReorderBufferChange *change; + RelFileNode target_node; - xlrec = (xl_heap_insert *) buf->record_data; + xlrec = (xl_heap_insert *) XLogRecGetData(r); /* only interested in our database */ - if (xlrec->target.node.dbNode != ctx->slot->data.database) + XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); + if (target_node.dbNode != ctx->slot->data.database) return; change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_INSERT; - memcpy(&change->data.tp.relnode, &xlrec->target.node, sizeof(RelFileNode)); + memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) { - Assert(r->xl_len > (SizeOfHeapInsert + SizeOfHeapHeader)); + Size tuplelen; + char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen); change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); - DecodeXLogTuple((char *) xlrec + SizeOfHeapInsert, - r->xl_len - SizeOfHeapInsert, - change->data.tp.newtuple); + DecodeXLogTuple(tupledata, tuplelen, change->data.tp.newtuple); } change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, r->xl_xid, buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); } /* @@ -615,62 +616,47 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { - XLogRecord *r = &buf->record; + XLogReaderState *r = buf->record; xl_heap_update *xlrec; - xl_heap_header_len xlhdr; ReorderBufferChange *change; char *data; + Size datalen; + RelFileNode target_node; - xlrec = (xl_heap_update *) buf->record_data; + xlrec = (xl_heap_update *) XLogRecGetData(r); /* only interested in our database */ - if (xlrec->target.node.dbNode != ctx->slot->data.database) + XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); + if (target_node.dbNode != ctx->slot->data.database) return; change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_UPDATE; - memcpy(&change->data.tp.relnode, &xlrec->target.node, sizeof(RelFileNode)); - - /* caution, remaining data in record is not aligned */ - data = buf->record_data + SizeOfHeapUpdate; + memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) { - Assert(r->xl_len > (SizeOfHeapUpdate + SizeOfHeapHeaderLen)); - - memcpy(&xlhdr, data, sizeof(xlhdr)); - data += offsetof(xl_heap_header_len, header); + data = XLogRecGetBlockData(r, 0, &datalen); change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); - DecodeXLogTuple(data, - xlhdr.t_len + SizeOfHeapHeader, - change->data.tp.newtuple); - /* skip over the rest of the tuple header */ - data += SizeOfHeapHeader; - /* skip over the tuple data */ - data += xlhdr.t_len; + DecodeXLogTuple(data, datalen, change->data.tp.newtuple); } if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD) { - memcpy(&xlhdr, data, sizeof(xlhdr)); - data += offsetof(xl_heap_header_len, header); + /* caution, remaining data in record is not aligned */ + data = XLogRecGetData(r) + SizeOfHeapUpdate; + datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate; change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); - DecodeXLogTuple(data, - xlhdr.t_len + SizeOfHeapHeader, - change->data.tp.oldtuple); -#ifdef NOT_USED - data += SizeOfHeapHeader; - data += xlhdr.t_len; -#endif + DecodeXLogTuple(data, datalen, change->data.tp.oldtuple); } change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, r->xl_xid, buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); } /* @@ -681,36 +667,38 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { - XLogRecord *r = &buf->record; + XLogReaderState *r = buf->record; xl_heap_delete *xlrec; ReorderBufferChange *change; + RelFileNode target_node; - xlrec = (xl_heap_delete *) buf->record_data; + xlrec = (xl_heap_delete *) XLogRecGetData(r); /* only interested in our database */ - if (xlrec->target.node.dbNode != ctx->slot->data.database) + XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); + if (target_node.dbNode != ctx->slot->data.database) return; change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_DELETE; - memcpy(&change->data.tp.relnode, &xlrec->target.node, sizeof(RelFileNode)); + memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); /* old primary key stored */ if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD) { - Assert(r->xl_len > (SizeOfHeapDelete + SizeOfHeapHeader)); + Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader)); change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete, - r->xl_len - SizeOfHeapDelete, + XLogRecGetDataLen(r) - SizeOfHeapDelete, change->data.tp.oldtuple); } change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, r->xl_xid, buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); } /* @@ -721,27 +709,24 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { - XLogRecord *r = &buf->record; + XLogReaderState *r = buf->record; xl_heap_multi_insert *xlrec; int i; char *data; - bool isinit = (r->xl_info & XLOG_HEAP_INIT_PAGE) != 0; + char *tupledata; + Size tuplelen; + RelFileNode rnode; - xlrec = (xl_heap_multi_insert *) buf->record_data; + xlrec = (xl_heap_multi_insert *) XLogRecGetData(r); /* only interested in our database */ - if (xlrec->node.dbNode != ctx->slot->data.database) + XLogRecGetBlockTag(r, 0, &rnode, NULL, NULL); + if (rnode.dbNode != ctx->slot->data.database) return; - data = buf->record_data + SizeOfHeapMultiInsert; - - /* - * OffsetNumbers (which are not of interest to us) are stored when - * XLOG_HEAP_INIT_PAGE is not set -- skip over them. - */ - if (!isinit) - data += sizeof(OffsetNumber) * xlrec->ntuples; + tupledata = XLogRecGetBlockData(r, 0, &tuplelen); + data = tupledata; for (i = 0; i < xlrec->ntuples; i++) { ReorderBufferChange *change; @@ -751,7 +736,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_INSERT; - memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode)); + memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode)); /* * CONTAINS_NEW_TUPLE will always be set currently as multi_insert @@ -806,9 +791,10 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) else change->data.tp.clear_toast_afterwards = false; - ReorderBufferQueueChange(ctx->reorder, r->xl_xid, + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); } + Assert(data == tupledata + tuplelen); } /* diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 875b89a6288..8c318cd4b51 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -34,6 +34,7 @@ #include "miscadmin.h" #include "access/xact.h" +#include "access/xlog_internal.h" #include "replication/decode.h" #include "replication/logical.h" @@ -455,12 +456,12 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) record = XLogReadRecord(ctx->reader, startptr, &err); if (err) elog(ERROR, "%s", err); - - Assert(record); + if (!record) + elog(ERROR, "no record found"); /* shouldn't happen */ startptr = InvalidXLogRecPtr; - LogicalDecodingProcessRecord(ctx, record); + LogicalDecodingProcessRecord(ctx, ctx->reader); /* only continue till we found a consistent spot */ if (DecodingContextReady(ctx)) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 3a5ec2f61d9..1977f098c79 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -21,6 +21,8 @@ #include "funcapi.h" #include "miscadmin.h" +#include "access/xlog_internal.h" + #include "catalog/pg_type.h" #include "nodes/makefuncs.h" @@ -431,7 +433,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin * store the description into our tuplestore. */ if (record != NULL) - LogicalDecodingProcessRecord(ctx, record); + LogicalDecodingProcessRecord(ctx, ctx->reader); /* check limits */ if (upto_lsn != InvalidXLogRecPtr && diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 7d8f40738d4..6e75398eabe 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -54,6 +54,7 @@ #include "access/transam.h" #include "access/tuptoaster.h" #include "access/xact.h" +#include "access/xlog_internal.h" #include "catalog/catalog.h" #include "lib/binaryheap.h" #include "miscadmin.h" diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 200b54d7c2a..20f9b04adfa 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -699,7 +699,7 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn); ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn, - xlrec->target.node, xlrec->target.tid, + xlrec->target_node, xlrec->target_tid, xlrec->cmin, xlrec->cmax, xlrec->combocid); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 385d18ba1bb..addae8f6ce5 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2444,7 +2444,7 @@ XLogSendLogical(void) if (record != NULL) { - LogicalDecodingProcessRecord(logical_decoding_ctx, record); + LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader); sentPtr = logical_decoding_ctx->reader->EndRecPtr; } |