diff options
author | Jeff Davis <jdavis@postgresql.org> | 2022-01-19 14:58:04 -0800 |
---|---|---|
committer | Jeff Davis <jdavis@postgresql.org> | 2022-01-19 14:58:49 -0800 |
commit | 7a5f6b47488d824b1ea1326be4337e2c32325ff2 (patch) | |
tree | 236969831968cd55bad7ad199131643fdd9f877d /src/backend/replication/logical/decode.c | |
parent | a3d6264bbce0ff7002be35a907b73b01e2e37f45 (diff) | |
download | postgresql-7a5f6b47488d824b1ea1326be4337e2c32325ff2.tar.gz postgresql-7a5f6b47488d824b1ea1326be4337e2c32325ff2.zip |
Make logical decoding a part of the rmgr.
Add a new rmgr method, rm_decode, and use that rather than a switch
statement.
In preparation for rmgr extensibility.
Reviewed-by: Julien Rouhaud
Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com
Discussion: https://postgr.es/m/20220118095332.6xtlcjoyxobv6cbk@jrouhaud
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r-- | src/backend/replication/logical/decode.c | 105 |
1 files changed, 21 insertions, 84 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 1d22208c1ad..9b450c9f90e 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -43,21 +43,6 @@ #include "replication/snapbuild.h" #include "storage/standby.h" -typedef struct XLogRecordBuffer -{ - XLogRecPtr origptr; - XLogRecPtr endptr; - XLogReaderState *record; -} XLogRecordBuffer; - -/* RMGR Handlers */ -static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); -static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); - /* individual record(group)'s handlers */ static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -107,6 +92,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor { XLogRecordBuffer buf; TransactionId txid; + RmgrId rmid; buf.origptr = ctx->reader->ReadRecPtr; buf.endptr = ctx->reader->EndRecPtr; @@ -127,72 +113,23 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor buf.origptr); } - /* cast so we get a warning when new rmgrs are added */ - switch ((RmgrId) XLogRecGetRmid(record)) - { - /* - * Rmgrs we care about for logical decoding. Add new rmgrs in - * rmgrlist.h's order. - */ - case RM_XLOG_ID: - DecodeXLogOp(ctx, &buf); - break; - - case RM_XACT_ID: - DecodeXactOp(ctx, &buf); - break; + rmid = XLogRecGetRmid(record); - case RM_STANDBY_ID: - DecodeStandbyOp(ctx, &buf); - break; - - case RM_HEAP2_ID: - DecodeHeap2Op(ctx, &buf); - break; - - case RM_HEAP_ID: - DecodeHeapOp(ctx, &buf); - break; - - case RM_LOGICALMSG_ID: - DecodeLogicalMsgOp(ctx, &buf); - break; - - /* - * Rmgrs irrelevant for logical decoding; they describe stuff not - * represented in logical decoding. Add new rmgrs in rmgrlist.h's - * order. - */ - case RM_SMGR_ID: - case RM_CLOG_ID: - case RM_DBASE_ID: - case RM_TBLSPC_ID: - case RM_MULTIXACT_ID: - case RM_RELMAP_ID: - case RM_BTREE_ID: - case RM_HASH_ID: - case RM_GIN_ID: - case RM_GIST_ID: - case RM_SEQ_ID: - case RM_SPGIST_ID: - case RM_BRIN_ID: - case RM_COMMIT_TS_ID: - case RM_REPLORIGIN_ID: - case RM_GENERIC_ID: - /* just deal with xid, and done */ - ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), - buf.origptr); - break; - case RM_NEXT_ID: - elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record)); + if (RmgrTable[rmid].rm_decode != NULL) + RmgrTable[rmid].rm_decode(ctx, &buf); + else + { + /* just deal with xid, and done */ + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), + buf.origptr); } } /* * Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; @@ -234,8 +171,8 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; ReorderBuffer *reorder = ctx->reorder; @@ -391,8 +328,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; XLogReaderState *r = buf->record; @@ -437,8 +374,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; TransactionId xid = XLogRecGetXid(buf->record); @@ -497,8 +434,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; TransactionId xid = XLogRecGetXid(buf->record); @@ -619,8 +556,8 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) /* * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). */ -static void -DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +void +logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; XLogReaderState *r = buf->record; |