diff options
author | Jeff Davis <jdavis@postgresql.org> | 2022-04-06 22:26:43 -0700 |
---|---|---|
committer | Jeff Davis <jdavis@postgresql.org> | 2022-04-06 23:06:46 -0700 |
commit | 5c279a6d350205cc98f91fb8e1d3e4442a6b25d1 (patch) | |
tree | 4165add040730afa0e116ab5be1db5dc6fa93aea /src/backend/access | |
parent | a8cfb0c1a964ebbe830c5138d389b0d2627ec298 (diff) | |
download | postgresql-5c279a6d350205cc98f91fb8e1d3e4442a6b25d1.tar.gz postgresql-5c279a6d350205cc98f91fb8e1d3e4442a6b25d1.zip |
Custom WAL Resource Managers.
Allow extensions to specify a new custom resource manager (rmgr),
which allows specialized WAL. This is meant to be used by a Table
Access Method or Index Access Method.
Prior to this commit, only Generic WAL was available, which offers
support for recovery and physical replication but not logical
replication.
Reviewed-by: Julien Rouhaud, Bharath Rupireddy, Andres Freund
Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com
Diffstat (limited to 'src/backend/access')
-rw-r--r-- | src/backend/access/transam/rmgr.c | 124 | ||||
-rw-r--r-- | src/backend/access/transam/xlogreader.c | 2 | ||||
-rw-r--r-- | src/backend/access/transam/xlogrecovery.c | 33 |
3 files changed, 135 insertions, 24 deletions
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index f8847d5aebf..3c2dc1000db 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -24,16 +24,138 @@ #include "commands/dbcommands_xlog.h" #include "commands/sequence.h" #include "commands/tablespace.h" +#include "fmgr.h" +#include "funcapi.h" +#include "miscadmin.h" #include "replication/decode.h" #include "replication/message.h" #include "replication/origin.h" #include "storage/standby.h" +#include "utils/builtins.h" #include "utils/relmapper.h" /* must be kept in sync with RmgrData definition in xlog_internal.h */ #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \ { name, redo, desc, identify, startup, cleanup, mask, decode }, -const RmgrData RmgrTable[RM_MAX_ID + 1] = { +RmgrData RmgrTable[RM_MAX_ID + 1] = { #include "access/rmgrlist.h" }; + +/* + * Start up all resource managers. + */ +void +RmgrStartup(void) +{ + for (int rmid = 0; rmid <= RM_MAX_ID; rmid++) + { + if (!RmgrIdExists(rmid)) + continue; + + if (RmgrTable[rmid].rm_startup != NULL) + RmgrTable[rmid].rm_startup(); + } +} + +/* + * Clean up all resource managers. + */ +void +RmgrCleanup(void) +{ + for (int rmid = 0; rmid <= RM_MAX_ID; rmid++) + { + if (!RmgrIdExists(rmid)) + continue; + + if (RmgrTable[rmid].rm_cleanup != NULL) + RmgrTable[rmid].rm_cleanup(); + } +} + +/* + * Emit ERROR when we encounter a record with an RmgrId we don't + * recognize. + */ +void +RmgrNotFound(RmgrId rmid) +{ + ereport(ERROR, (errmsg("resource manager with ID %d not registered", rmid), + errhint("Include the extension module that implements this resource manager in shared_preload_libraries."))); +} + +/* + * Register a new custom WAL resource manager. + * + * Resource manager IDs must be globally unique across all extensions. Refer + * to https://wiki.postgresql.org/wiki/CustomWALResourceManager to reserve a + * unique RmgrId for your extension, to avoid conflicts with other extension + * developers. During development, use RM_EXPERIMENTAL_ID to avoid needlessly + * reserving a new ID. + */ +void +RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr) +{ + if (rmgr->rm_name == NULL || strlen(rmgr->rm_name) == 0) + ereport(ERROR, (errmsg("custom resource manager name is invalid"), + errhint("Provide a non-empty name for the custom resource manager."))); + + if (!RMID_IS_CUSTOM(rmid)) + ereport(ERROR, (errmsg("custom resource manager ID %d is out of range", rmid), + errhint("Provide a custom resource manager ID between %d and %d.", + RM_MIN_CUSTOM_ID, RM_MAX_CUSTOM_ID))); + + if (!process_shared_preload_libraries_in_progress) + ereport(ERROR, + (errmsg("failed to register custom resource manager \"%s\" with ID %d", rmgr->rm_name, rmid), + errdetail("Custom resource manager must be registered while initializing modules in shared_preload_libraries."))); + + if (RmgrTable[rmid].rm_name != NULL) + ereport(ERROR, + (errmsg("failed to register custom resource manager \"%s\" with ID %d", rmgr->rm_name, rmid), + errdetail("Custom resource manager \"%s\" already registered with the same ID.", + RmgrTable[rmid].rm_name))); + + /* check for existing rmgr with the same name */ + for (int existing_rmid = 0; existing_rmid <= RM_MAX_ID; existing_rmid++) + { + if (!RmgrIdExists(existing_rmid)) + continue; + + if (!pg_strcasecmp(RmgrTable[existing_rmid].rm_name, rmgr->rm_name)) + ereport(ERROR, + (errmsg("failed to register custom resource manager \"%s\" with ID %d", rmgr->rm_name, rmid), + errdetail("Existing resource manager with ID %d has the same name.", existing_rmid))); + } + + /* register it */ + RmgrTable[rmid] = *rmgr; + ereport(LOG, + (errmsg("registered custom resource manager \"%s\" with ID %d", + rmgr->rm_name, rmid))); +} + +/* SQL SRF showing loaded resource managers */ +Datum +pg_get_wal_resource_managers(PG_FUNCTION_ARGS) +{ +#define PG_GET_RESOURCE_MANAGERS_COLS 3 + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + Datum values[PG_GET_RESOURCE_MANAGERS_COLS]; + bool nulls[PG_GET_RESOURCE_MANAGERS_COLS] = {0}; + + SetSingleFuncCall(fcinfo, 0); + + for (int rmid = 0; rmid <= RM_MAX_ID; rmid++) + { + if (!RmgrIdExists(rmid)) + continue; + values[0] = Int32GetDatum(rmid); + values[1] = CStringGetTextDatum(GetRmgr(rmid).rm_name); + values[2] = BoolGetDatum(RMID_IS_BUILTIN(rmid)); + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + } + + return (Datum) 0; +} diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index e437c429920..161cf13fed2 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1102,7 +1102,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, (uint32) SizeOfXLogRecord, record->xl_tot_len); return false; } - if (record->xl_rmid > RM_MAX_ID) + if (!RMID_IS_VALID(record->xl_rmid)) { report_invalid_record(state, "invalid resource manager ID %u at %X/%X", diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 1b7bae387a0..55391921679 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -1541,7 +1541,6 @@ ShutdownWalRecovery(void) void PerformWalRecovery(void) { - int rmid; XLogRecord *record; bool reachedRecoveryTarget = false; TimeLineID replayTLI; @@ -1614,12 +1613,7 @@ PerformWalRecovery(void) InRedo = true; - /* Initialize resource managers */ - for (rmid = 0; rmid <= RM_MAX_ID; rmid++) - { - if (RmgrTable[rmid].rm_startup != NULL) - RmgrTable[rmid].rm_startup(); - } + RmgrStartup(); ereport(LOG, (errmsg("redo starts at %X/%X", @@ -1756,12 +1750,7 @@ PerformWalRecovery(void) } } - /* Allow resource managers to do any required cleanup. */ - for (rmid = 0; rmid <= RM_MAX_ID; rmid++) - { - if (RmgrTable[rmid].rm_cleanup != NULL) - RmgrTable[rmid].rm_cleanup(); - } + RmgrCleanup(); ereport(LOG, (errmsg("redo done at %X/%X system usage: %s", @@ -1881,7 +1870,7 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl xlogrecovery_redo(xlogreader, *replayTLI); /* Now apply the WAL record itself */ - RmgrTable[record->xl_rmid].rm_redo(xlogreader); + GetRmgr(record->xl_rmid).rm_redo(xlogreader); /* * After redo, check whether the backup pages associated with the WAL @@ -2111,20 +2100,20 @@ rm_redo_error_callback(void *arg) void xlog_outdesc(StringInfo buf, XLogReaderState *record) { - RmgrId rmid = XLogRecGetRmid(record); + RmgrData rmgr = GetRmgr(XLogRecGetRmid(record)); uint8 info = XLogRecGetInfo(record); const char *id; - appendStringInfoString(buf, RmgrTable[rmid].rm_name); + appendStringInfoString(buf, rmgr.rm_name); appendStringInfoChar(buf, '/'); - id = RmgrTable[rmid].rm_identify(info); + id = rmgr.rm_identify(info); if (id == NULL) appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK); else appendStringInfo(buf, "%s: ", id); - RmgrTable[rmid].rm_desc(buf, record); + rmgr.rm_desc(buf, record); } #ifdef WAL_DEBUG @@ -2273,7 +2262,7 @@ getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime) static void verifyBackupPageConsistency(XLogReaderState *record) { - RmgrId rmid = XLogRecGetRmid(record); + RmgrData rmgr = GetRmgr(XLogRecGetRmid(record)); RelFileNode rnode; ForkNumber forknum; BlockNumber blkno; @@ -2353,10 +2342,10 @@ verifyBackupPageConsistency(XLogReaderState *record) * If masking function is defined, mask both the primary and replay * images */ - if (RmgrTable[rmid].rm_mask != NULL) + if (rmgr.rm_mask != NULL) { - RmgrTable[rmid].rm_mask(replay_image_masked, blkno); - RmgrTable[rmid].rm_mask(primary_image_masked, blkno); + rmgr.rm_mask(replay_image_masked, blkno); + rmgr.rm_mask(primary_image_masked, blkno); } /* Time to compare the primary and replay images. */ |