aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/catalog/pg_subscription.c1
-rw-r--r--src/backend/catalog/system_views.sql2
-rw-r--r--src/backend/commands/subscriptioncmds.c23
-rw-r--r--src/backend/replication/logical/worker.c46
-rw-r--r--src/bin/psql/describe.c8
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_subscription.h4
-rw-r--r--src/test/regress/expected/subscription.out144
-rw-r--r--src/test/subscription/t/033_run_as_table_owner.pl204
9 files changed, 343 insertions, 91 deletions
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 87e8ea7efaf..d07f88ce280 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -72,6 +72,7 @@ GetSubscription(Oid subid, bool missing_ok)
sub->twophasestate = subform->subtwophasestate;
sub->disableonerr = subform->subdisableonerr;
sub->passwordrequired = subform->subpasswordrequired;
+ sub->runasowner = subform->subrunasowner;
/* Get conninfo */
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 574cbc2e448..6b098234f8c 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1319,7 +1319,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
REVOKE ALL ON pg_subscription FROM public;
GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
subbinary, substream, subtwophasestate, subdisableonerr,
- subpasswordrequired,
+ subpasswordrequired, subrunasowner,
subslotname, subsynccommit, subpublications, suborigin)
ON pg_subscription TO public;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 87eb23496eb..3251d89ba80 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -68,8 +68,9 @@
#define SUBOPT_TWOPHASE_COMMIT 0x00000200
#define SUBOPT_DISABLE_ON_ERR 0x00000400
#define SUBOPT_PASSWORD_REQUIRED 0x00000800
-#define SUBOPT_LSN 0x00001000
-#define SUBOPT_ORIGIN 0x00002000
+#define SUBOPT_RUN_AS_OWNER 0x00001000
+#define SUBOPT_LSN 0x00002000
+#define SUBOPT_ORIGIN 0x00004000
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -93,6 +94,7 @@ typedef struct SubOpts
bool twophase;
bool disableonerr;
bool passwordrequired;
+ bool runasowner;
char *origin;
XLogRecPtr lsn;
} SubOpts;
@@ -151,6 +153,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->disableonerr = false;
if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
opts->passwordrequired = true;
+ if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
+ opts->runasowner = false;
if (IsSet(supported_opts, SUBOPT_ORIGIN))
opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
@@ -290,6 +294,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
opts->passwordrequired = defGetBoolean(defel);
}
+ else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
+ strcmp(defel->defname, "run_as_owner") == 0)
+ {
+ if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
+ errorConflictingDefElem(defel, pstate);
+
+ opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
+ opts->runasowner = defGetBoolean(defel);
+ }
else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
strcmp(defel->defname, "origin") == 0)
{
@@ -578,7 +591,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
- SUBOPT_ORIGIN);
+ SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
@@ -681,6 +694,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
LOGICALREP_TWOPHASE_STATE_DISABLED);
values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
+ values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (opts.slot_name)
@@ -1115,7 +1129,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
supported_opts = (SUBOPT_SLOT_NAME |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
- SUBOPT_PASSWORD_REQUIRED | SUBOPT_ORIGIN);
+ SUBOPT_PASSWORD_REQUIRED |
+ SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options,
supported_opts, &opts);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 61009fa8cda..3d58910c145 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2401,6 +2401,7 @@ apply_handle_insert(StringInfo s)
EState *estate;
TupleTableSlot *remoteslot;
MemoryContext oldctx;
+ bool run_as_owner;
/*
* Quick return if we are skipping data modification changes or handling
@@ -2425,8 +2426,13 @@ apply_handle_insert(StringInfo s)
return;
}
- /* Make sure that any user-supplied code runs as the table owner. */
- SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+ /*
+ * Make sure that any user-supplied code runs as the table owner, unless
+ * the user has opted out of that behavior.
+ */
+ run_as_owner = MySubscription->runasowner;
+ if (!run_as_owner)
+ SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
/* Set relation for error callback */
apply_error_callback_arg.rel = rel;
@@ -2457,7 +2463,8 @@ apply_handle_insert(StringInfo s)
/* Reset relation for error callback */
apply_error_callback_arg.rel = NULL;
- RestoreUserContext(&ucxt);
+ if (!run_as_owner)
+ RestoreUserContext(&ucxt);
logicalrep_rel_close(rel, NoLock);
@@ -2546,6 +2553,7 @@ apply_handle_update(StringInfo s)
TupleTableSlot *remoteslot;
RTEPermissionInfo *target_perminfo;
MemoryContext oldctx;
+ bool run_as_owner;
/*
* Quick return if we are skipping data modification changes or handling
@@ -2577,8 +2585,13 @@ apply_handle_update(StringInfo s)
/* Check if we can do the update. */
check_relation_updatable(rel);
- /* Make sure that any user-supplied code runs as the table owner. */
- SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+ /*
+ * Make sure that any user-supplied code runs as the table owner, unless
+ * the user has opted out of that behavior.
+ */
+ run_as_owner = MySubscription->runasowner;
+ if (!run_as_owner)
+ SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
/* Initialize the executor state. */
edata = create_edata_for_relation(rel);
@@ -2630,7 +2643,8 @@ apply_handle_update(StringInfo s)
/* Reset relation for error callback */
apply_error_callback_arg.rel = NULL;
- RestoreUserContext(&ucxt);
+ if (!run_as_owner)
+ RestoreUserContext(&ucxt);
logicalrep_rel_close(rel, NoLock);
@@ -2720,6 +2734,7 @@ apply_handle_delete(StringInfo s)
EState *estate;
TupleTableSlot *remoteslot;
MemoryContext oldctx;
+ bool run_as_owner;
/*
* Quick return if we are skipping data modification changes or handling
@@ -2750,8 +2765,13 @@ apply_handle_delete(StringInfo s)
/* Check if we can do the delete. */
check_relation_updatable(rel);
- /* Make sure that any user-supplied code runs as the table owner. */
- SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+ /*
+ * Make sure that any user-supplied code runs as the table owner, unless
+ * the user has opted out of that behavior.
+ */
+ run_as_owner = MySubscription->runasowner;
+ if (!run_as_owner)
+ SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
/* Initialize the executor state. */
edata = create_edata_for_relation(rel);
@@ -2778,7 +2798,8 @@ apply_handle_delete(StringInfo s)
/* Reset relation for error callback */
apply_error_callback_arg.rel = NULL;
- RestoreUserContext(&ucxt);
+ if (!run_as_owner)
+ RestoreUserContext(&ucxt);
logicalrep_rel_close(rel, NoLock);
@@ -3225,13 +3246,18 @@ apply_handle_truncate(StringInfo s)
* Even if we used CASCADE on the upstream primary we explicitly default
* to replaying changes without further cascading. This might be later
* changeable with a user specified option.
+ *
+ * MySubscription->runasowner tells us whether we want to execute
+ * replication actions as the subscription owner; the last argument to
+ * TruncateGuts tells it whether we want to switch to the table owner.
+ * Those are exactly opposite conditions.
*/
ExecuteTruncateGuts(rels,
relids,
relids_logged,
DROP_RESTRICT,
restart_seqs,
- true);
+ !MySubscription->runasowner);
foreach(lc, remote_rels)
{
LogicalRepRelMapEntry *rel = lfirst(lc);
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 99e28f607e8..83a37ee6011 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6493,7 +6493,7 @@ describeSubscriptions(const char *pattern, bool verbose)
PGresult *res;
printQueryOpt myopt = pset.popt;
static const bool translate_columns[] = {false, false, false, false,
- false, false, false, false, false, false, false, false};
+ false, false, false, false, false, false, false, false, false};
if (pset.sversion < 100000)
{
@@ -6550,8 +6550,10 @@ describeSubscriptions(const char *pattern, bool verbose)
if (pset.sversion >= 160000)
appendPQExpBuffer(&buf,
- ", suborigin AS \"%s\"\n",
- gettext_noop("Origin"));
+ ", suborigin AS \"%s\"\n"
+ ", subrunasowner AS \"%s\"\n",
+ gettext_noop("Origin"),
+ gettext_noop("Run as Owner?"));
appendPQExpBuffer(&buf,
", subsynccommit AS \"%s\"\n"
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 67f3f632f04..f0a8ee55aea 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -57,6 +57,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202304041
+#define CATALOG_VERSION_NO 202304042
#endif
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 6319f598d82..91d729d62d2 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -90,6 +90,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
bool subpasswordrequired; /* Must connection use a password? */
+ bool subrunasowner; /* True if replication should execute as
+ * the subscription owner */
+
#ifdef CATALOG_VARLEN /* variable-length fields start here */
/* Connection string to the publisher */
text subconninfo BKI_FORCE_NOT_NULL;
@@ -134,6 +137,7 @@ typedef struct Subscription
* automatically disabled if a worker error
* occurs */
bool passwordrequired; /* Must connection use a password? */
+ bool runasowner; /* Run replication as subscription owner */
char *conninfo; /* Connection string to the publisher */
char *slotname; /* Name of the replication slot */
char *synccommit; /* Synchronous commit setting for worker */
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 9d26d942619..9c52890f1d0 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -115,18 +115,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+ regress_testsub4
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
\dRs+ regress_testsub4
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub3;
@@ -144,10 +144,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -166,10 +166,10 @@ ERROR: unrecognized subscription parameter: "create_slot"
-- ok
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | off | dbname=regress_doesnotexist2 | 0/12345
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | off | dbname=regress_doesnotexist2 | 0/12345
(1 row)
-- ok - with lsn = NONE
@@ -178,10 +178,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
ERROR: invalid WAL location (LSN): 0/0
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | off | dbname=regress_doesnotexist2 | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | off | dbname=regress_doesnotexist2 | 0/0
(1 row)
BEGIN;
@@ -213,10 +213,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
ERROR: invalid value for parameter "synchronous_commit": "foobar"
HINT: Available values: local, remote_write, remote_apply, on, off.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | local | dbname=regress_doesnotexist2 | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | local | dbname=regress_doesnotexist2 | 0/0
(1 row)
-- rename back to keep the rest simple
@@ -245,19 +245,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -269,27 +269,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- fail - publication already exists
@@ -304,10 +304,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
ERROR: publication "testpub1" is already in subscription "regress_testsub"
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- fail - publication used more than once
@@ -322,10 +322,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub"
-- ok - delete publications
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -361,10 +361,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
--fail - alter of two_phase option not supported.
@@ -373,10 +373,10 @@ ERROR: unrecognized subscription parameter: "two_phase"
-- but can alter streaming when two_phase enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -386,10 +386,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -402,18 +402,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | off | dbname=regress_doesnotexist | 0/0
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Run as Owner? | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+---------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/subscription/t/033_run_as_table_owner.pl b/src/test/subscription/t/033_run_as_table_owner.pl
new file mode 100644
index 00000000000..cabc8a7c59a
--- /dev/null
+++ b/src/test/subscription/t/033_run_as_table_owner.pl
@@ -0,0 +1,204 @@
+
+# Copyright (c) 2021-2023, PostgreSQL Global Development Group
+
+# Test that logical replication respects permissions
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use Test::More;
+
+my ($node_publisher, $node_subscriber, $publisher_connstr, $result, $offset);
+$offset = 0;
+
+sub publish_insert
+{
+ my ($tbl, $new_i) = @_;
+ $node_publisher->safe_psql(
+ 'postgres', qq(
+ SET SESSION AUTHORIZATION regress_alice;
+ INSERT INTO $tbl (i) VALUES ($new_i);
+ ));
+}
+
+sub publish_update
+{
+ my ($tbl, $old_i, $new_i) = @_;
+ $node_publisher->safe_psql(
+ 'postgres', qq(
+ SET SESSION AUTHORIZATION regress_alice;
+ UPDATE $tbl SET i = $new_i WHERE i = $old_i;
+ ));
+}
+
+sub publish_delete
+{
+ my ($tbl, $old_i) = @_;
+ $node_publisher->safe_psql(
+ 'postgres', qq(
+ SET SESSION AUTHORIZATION regress_alice;
+ DELETE FROM $tbl WHERE i = $old_i;
+ ));
+}
+
+sub expect_replication
+{
+ my ($tbl, $cnt, $min, $max, $testname) = @_;
+ $node_publisher->wait_for_catchup('admin_sub');
+ $result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT COUNT(i), MIN(i), MAX(i) FROM $tbl));
+ is($result, "$cnt|$min|$max", $testname);
+}
+
+sub expect_failure
+{
+ my ($tbl, $cnt, $min, $max, $re, $testname) = @_;
+ $offset = $node_subscriber->wait_for_log($re, $offset);
+ $result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT COUNT(i), MIN(i), MAX(i) FROM $tbl));
+ is($result, "$cnt|$min|$max", $testname);
+}
+
+sub revoke_superuser
+{
+ my ($role) = @_;
+ $node_subscriber->safe_psql(
+ 'postgres', qq(
+ ALTER ROLE $role NOSUPERUSER));
+}
+
+# Create publisher and subscriber nodes with schemas owned and published by
+# "regress_alice" but subscribed and replicated by different role
+# "regress_admin". For partitioned tables, layout the partitions differently
+# on the publisher than on the subscriber.
+#
+$node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_publisher->init(allows_streaming => 'logical');
+$node_subscriber->init;
+$node_publisher->start;
+$node_subscriber->start;
+$publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+for my $node ($node_publisher, $node_subscriber)
+{
+ $node->safe_psql(
+ 'postgres', qq(
+ CREATE ROLE regress_admin SUPERUSER LOGIN;
+ CREATE ROLE regress_alice NOSUPERUSER LOGIN;
+ GRANT CREATE ON DATABASE postgres TO regress_alice;
+ SET SESSION AUTHORIZATION regress_alice;
+ CREATE SCHEMA alice;
+ GRANT USAGE ON SCHEMA alice TO regress_admin;
+
+ CREATE TABLE alice.unpartitioned (i INTEGER);
+ ALTER TABLE alice.unpartitioned REPLICA IDENTITY FULL;
+ GRANT SELECT ON TABLE alice.unpartitioned TO regress_admin;
+ ));
+}
+$node_publisher->safe_psql(
+ 'postgres', qq(
+SET SESSION AUTHORIZATION regress_alice;
+
+CREATE PUBLICATION alice FOR TABLE alice.unpartitioned
+ WITH (publish_via_partition_root = true);
+));
+$node_subscriber->safe_psql(
+ 'postgres', qq(
+SET SESSION AUTHORIZATION regress_admin;
+CREATE SUBSCRIPTION admin_sub CONNECTION '$publisher_connstr' PUBLICATION alice
+ WITH (run_as_owner = true, password_required = false);
+));
+
+# Wait for initial sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'admin_sub');
+
+# Verify that "regress_admin" can replicate into the tables
+publish_insert("alice.unpartitioned", 1);
+publish_insert("alice.unpartitioned", 3);
+publish_insert("alice.unpartitioned", 5);
+publish_update("alice.unpartitioned", 1 => 7);
+publish_delete("alice.unpartitioned", 3);
+expect_replication("alice.unpartitioned", 2, 5, 7,
+ "superuser can replicate");
+
+# Revoke superuser privilege for "regress_admin", and verify that we now
+# fail to replicate an insert.
+revoke_superuser("regress_admin");
+publish_insert("alice.unpartitioned", 9);
+expect_failure(
+ "alice.unpartitioned",
+ 2,
+ 5,
+ 7,
+ qr/ERROR: ( [A-Z0-9]+:)? permission denied for table unpartitioned/msi,
+ "with no privileges cannot replicate");
+
+# Now grant DML privileges and verify that we can replicate an INSERT.
+$node_subscriber->safe_psql(
+ 'postgres', qq(
+ALTER ROLE regress_admin NOSUPERUSER;
+SET SESSION AUTHORIZATION regress_alice;
+GRANT INSERT,UPDATE,DELETE ON alice.unpartitioned TO regress_admin;
+REVOKE SELECT ON alice.unpartitioned FROM regress_admin;
+));
+expect_replication("alice.unpartitioned", 3, 5, 9,
+ "with INSERT privilege can replicate INSERT"
+);
+
+# We can't yet replicate an UPDATE because we don't have SELECT.
+publish_update("alice.unpartitioned", 5 => 11);
+publish_delete("alice.unpartitioned", 9);
+expect_failure(
+ "alice.unpartitioned",
+ 3,
+ 5,
+ 9,
+ qr/ERROR: ( [A-Z0-9]+:)? permission denied for table unpartitioned/msi,
+ "without SELECT privilege cannot replicate UPDATE or DELETE"
+);
+
+# After granting SELECT, replication resumes.
+$node_subscriber->safe_psql(
+ 'postgres', qq(
+SET SESSION AUTHORIZATION regress_alice;
+GRANT SELECT ON alice.unpartitioned TO regress_admin;
+));
+expect_replication("alice.unpartitioned", 2, 7, 11,
+ "with all privileges can replicate"
+);
+
+# Remove all privileges again. Instead, give the ability to SET ROLE to
+# regress_alice.
+$node_subscriber->safe_psql(
+ 'postgres', qq(
+SET SESSION AUTHORIZATION regress_alice;
+REVOKE ALL PRIVILEGES ON alice.unpartitioned FROM regress_admin;
+RESET SESSION AUTHORIZATION;
+GRANT regress_alice TO regress_admin WITH INHERIT FALSE, SET TRUE;
+));
+
+# Because replication is running as the subscription owner in this test,
+# the above grant doesn't help: it gives the ability to SET ROLE, but not
+# privileges on the table.
+publish_insert("alice.unpartitioned", 13);
+expect_failure(
+ "alice.unpartitioned",
+ 2,
+ 7,
+ 11,
+ qr/ERROR: ( [A-Z0-9]+:)? permission denied for table unpartitioned/msi,
+ "with SET ROLE but not INHERIT cannot replicate"
+);
+
+# Now remove SET ROLE and add INHERIT and check that things start working.
+$node_subscriber->safe_psql(
+ 'postgres', qq(
+GRANT regress_alice TO regress_admin WITH INHERIT TRUE, SET FALSE;
+));
+expect_replication("alice.unpartitioned", 3, 7, 13,
+ "with INHERIT but not SET ROLE can replicate"
+);
+
+done_testing();