aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/decode.c
diff options
context:
space:
mode:
authorJeff Davis <jdavis@postgresql.org>2022-01-19 14:58:04 -0800
committerJeff Davis <jdavis@postgresql.org>2022-01-19 14:58:49 -0800
commit7a5f6b47488d824b1ea1326be4337e2c32325ff2 (patch)
tree236969831968cd55bad7ad199131643fdd9f877d /src/backend/replication/logical/decode.c
parenta3d6264bbce0ff7002be35a907b73b01e2e37f45 (diff)
downloadpostgresql-7a5f6b47488d824b1ea1326be4337e2c32325ff2.tar.gz
postgresql-7a5f6b47488d824b1ea1326be4337e2c32325ff2.zip
Make logical decoding a part of the rmgr.
Add a new rmgr method, rm_decode, and use that rather than a switch statement. In preparation for rmgr extensibility. Reviewed-by: Julien Rouhaud Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com Discussion: https://postgr.es/m/20220118095332.6xtlcjoyxobv6cbk@jrouhaud
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r--src/backend/replication/logical/decode.c105
1 files changed, 21 insertions, 84 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 1d22208c1ad..9b450c9f90e 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -43,21 +43,6 @@
#include "replication/snapbuild.h"
#include "storage/standby.h"
-typedef struct XLogRecordBuffer
-{
- XLogRecPtr origptr;
- XLogRecPtr endptr;
- XLogReaderState *record;
-} XLogRecordBuffer;
-
-/* RMGR Handlers */
-static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
-
/* individual record(group)'s handlers */
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -107,6 +92,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
{
XLogRecordBuffer buf;
TransactionId txid;
+ RmgrId rmid;
buf.origptr = ctx->reader->ReadRecPtr;
buf.endptr = ctx->reader->EndRecPtr;
@@ -127,72 +113,23 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
buf.origptr);
}
- /* cast so we get a warning when new rmgrs are added */
- switch ((RmgrId) XLogRecGetRmid(record))
- {
- /*
- * Rmgrs we care about for logical decoding. Add new rmgrs in
- * rmgrlist.h's order.
- */
- case RM_XLOG_ID:
- DecodeXLogOp(ctx, &buf);
- break;
-
- case RM_XACT_ID:
- DecodeXactOp(ctx, &buf);
- break;
+ rmid = XLogRecGetRmid(record);
- case RM_STANDBY_ID:
- DecodeStandbyOp(ctx, &buf);
- break;
-
- case RM_HEAP2_ID:
- DecodeHeap2Op(ctx, &buf);
- break;
-
- case RM_HEAP_ID:
- DecodeHeapOp(ctx, &buf);
- break;
-
- case RM_LOGICALMSG_ID:
- DecodeLogicalMsgOp(ctx, &buf);
- break;
-
- /*
- * Rmgrs irrelevant for logical decoding; they describe stuff not
- * represented in logical decoding. Add new rmgrs in rmgrlist.h's
- * order.
- */
- case RM_SMGR_ID:
- case RM_CLOG_ID:
- case RM_DBASE_ID:
- case RM_TBLSPC_ID:
- case RM_MULTIXACT_ID:
- case RM_RELMAP_ID:
- case RM_BTREE_ID:
- case RM_HASH_ID:
- case RM_GIN_ID:
- case RM_GIST_ID:
- case RM_SEQ_ID:
- case RM_SPGIST_ID:
- case RM_BRIN_ID:
- case RM_COMMIT_TS_ID:
- case RM_REPLORIGIN_ID:
- case RM_GENERIC_ID:
- /* just deal with xid, and done */
- ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
- buf.origptr);
- break;
- case RM_NEXT_ID:
- elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
+ if (RmgrTable[rmid].rm_decode != NULL)
+ RmgrTable[rmid].rm_decode(ctx, &buf);
+ else
+ {
+ /* just deal with xid, and done */
+ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
+ buf.origptr);
}
}
/*
* Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer().
*/
-static void
-DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
SnapBuild *builder = ctx->snapshot_builder;
uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
@@ -234,8 +171,8 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer().
*/
-static void
-DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
SnapBuild *builder = ctx->snapshot_builder;
ReorderBuffer *reorder = ctx->reorder;
@@ -391,8 +328,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer().
*/
-static void
-DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
SnapBuild *builder = ctx->snapshot_builder;
XLogReaderState *r = buf->record;
@@ -437,8 +374,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer().
*/
-static void
-DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
TransactionId xid = XLogRecGetXid(buf->record);
@@ -497,8 +434,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer().
*/
-static void
-DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
TransactionId xid = XLogRecGetXid(buf->record);
@@ -619,8 +556,8 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
/*
* Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
*/
-static void
-DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+void
+logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
SnapBuild *builder = ctx->snapshot_builder;
XLogReaderState *r = buf->record;