aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/twophase.c79
-rw-r--r--src/backend/commands/subscriptioncmds.c169
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c22
-rw-r--r--src/backend/replication/logical/launcher.c10
-rw-r--r--src/backend/replication/logical/worker.c25
-rw-r--r--src/backend/replication/slot.c45
-rw-r--r--src/backend/replication/walsender.c32
-rw-r--r--src/bin/psql/tab-complete.c2
-rw-r--r--src/include/access/twophase.h5
-rw-r--r--src/include/replication/slot.h3
-rw-r--r--src/include/replication/walreceiver.h12
-rw-r--r--src/include/replication/worker_internal.h3
-rw-r--r--src/test/regress/expected/subscription.out5
-rw-r--r--src/test/regress/sql/subscription.sql5
-rw-r--r--src/test/subscription/t/021_twophase.pl95
15 files changed, 403 insertions, 109 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 9a8257fcafb..e98286d768b 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2681,3 +2681,82 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
LWLockRelease(TwoPhaseStateLock);
return found;
}
+
+/*
+ * TwoPhaseTransactionGid
+ * Form the prepared transaction GID for two_phase transactions.
+ *
+ * Return the GID in the supplied buffer.
+ */
+void
+TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
+{
+ Assert(OidIsValid(subid));
+
+ if (!TransactionIdIsValid(xid))
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("invalid two-phase transaction ID")));
+
+ snprintf(gid_res, szgid, "pg_gid_%u_%u", subid, xid);
+}
+
+/*
+ * IsTwoPhaseTransactionGidForSubid
+ * Check whether the given GID (as formed by TwoPhaseTransactionGid) is
+ * for the specified 'subid'.
+ */
+static bool
+IsTwoPhaseTransactionGidForSubid(Oid subid, char *gid)
+{
+ int ret;
+ Oid subid_from_gid;
+ TransactionId xid_from_gid;
+ char gid_tmp[GIDSIZE];
+
+ /* Extract the subid and xid from the given GID */
+ ret = sscanf(gid, "pg_gid_%u_%u", &subid_from_gid, &xid_from_gid);
+
+ /*
+ * Check that the given GID has expected format, and at least the subid
+ * matches.
+ */
+ if (ret != 2 || subid != subid_from_gid)
+ return false;
+
+ /*
+ * Reconstruct a temporary GID based on the subid and xid extracted from
+ * the given GID and check whether the temporary GID and the given GID
+ * match.
+ */
+ TwoPhaseTransactionGid(subid, xid_from_gid, gid_tmp, sizeof(gid_tmp));
+
+ return strcmp(gid, gid_tmp) == 0;
+}
+
+/*
+ * LookupGXactBySubid
+ * Check if the prepared transaction done by apply worker exists.
+ */
+bool
+LookupGXactBySubid(Oid subid)
+{
+ bool found = false;
+
+ LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+ for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
+ {
+ GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+
+ /* Ignore not-yet-valid GIDs. */
+ if (gxact->valid &&
+ IsTwoPhaseTransactionGidForSubid(subid, gxact->gid))
+ {
+ found = true;
+ break;
+ }
+ }
+ LWLockRelease(TwoPhaseStateLock);
+
+ return found;
+}
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 16d83b32539..d124bfe55ca 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -16,6 +16,7 @@
#include "access/htup_details.h"
#include "access/table.h"
+#include "access/twophase.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
@@ -109,6 +110,8 @@ static void check_publications_origin(WalReceiverConn *wrconn,
static void check_duplicates_in_publist(List *publist, Datum *datums);
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
+static void CheckAlterSubOption(Subscription *sub, const char *option,
+ bool slot_needs_update, bool isTopLevel);
/*
@@ -259,21 +262,9 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_STREAMING;
opts->streaming = defGetStreamingMode(defel);
}
- else if (strcmp(defel->defname, "two_phase") == 0)
+ else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
+ strcmp(defel->defname, "two_phase") == 0)
{
- /*
- * Do not allow toggling of two_phase option. Doing so could cause
- * missing of transactions and lead to an inconsistent replica.
- * See comments atop worker.c
- *
- * Note: Unsupported twophase indicates that this call originated
- * from AlterSubscription.
- */
- if (!IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
-
if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
errorConflictingDefElem(defel, pstate);
@@ -1080,6 +1071,60 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
}
/*
+ * Common checks for altering failover and two_phase options.
+ */
+static void
+CheckAlterSubOption(Subscription *sub, const char *option,
+ bool slot_needs_update, bool isTopLevel)
+{
+ /*
+ * The checks in this function are required only for failover and
+ * two_phase options.
+ */
+ Assert(strcmp(option, "failover") == 0 ||
+ strcmp(option, "two_phase") == 0);
+
+ /*
+ * Do not allow changing the option if the subscription is enabled. This
+ * is because both failover and two_phase options of the slot on the
+ * publisher cannot be modified if the slot is currently acquired by the
+ * existing walsender.
+ *
+ * Note that two_phase is enabled (aka changed from 'false' to 'true') on
+ * the publisher by the existing walsender, so we could have allowed that
+ * even when the subscription is enabled. But we kept this restriction for
+ * the sake of consistency and simplicity.
+ */
+ if (sub->enabled)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot set %s for enabled subscription",
+ option)));
+
+ if (slot_needs_update)
+ {
+ StringInfoData cmd;
+
+ /*
+ * A valid slot must be associated with the subscription for us to
+ * modify any of the slot's properties.
+ */
+ if (!sub->slotname)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot set %s for a subscription that does not have a slot name",
+ option)));
+
+ /* The changed option of the slot can't be rolled back. */
+ initStringInfo(&cmd);
+ appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
+
+ PreventInTransactionBlock(isTopLevel, cmd.data);
+ pfree(cmd.data);
+ }
+}
+
+/*
* Alter the existing subscription.
*/
ObjectAddress
@@ -1094,6 +1139,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
HeapTuple tup;
Oid subid;
bool update_tuple = false;
+ bool update_failover = false;
+ bool update_two_phase = false;
Subscription *sub;
Form_pg_subscription form;
bits32 supported_opts;
@@ -1145,7 +1192,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
{
supported_opts = (SUBOPT_SLOT_NAME |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
- SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
+ SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+ SUBOPT_DISABLE_ON_ERR |
SUBOPT_PASSWORD_REQUIRED |
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
SUBOPT_ORIGIN);
@@ -1227,31 +1275,81 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
replaces[Anum_pg_subscription_subrunasowner - 1] = true;
}
- if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
+ if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
{
- if (!sub->slotname)
+ /*
+ * We need to update both the slot and the subscription
+ * for the two_phase option. We can enable the two_phase
+ * option for a slot only once the initial data
+ * synchronization is done. This is to avoid missing some
+ * data as explained in comments atop worker.c.
+ */
+ update_two_phase = !opts.twophase;
+
+ CheckAlterSubOption(sub, "two_phase", update_two_phase,
+ isTopLevel);
+
+ /*
+ * Modifying the two_phase slot option requires a slot
+ * lookup by slot name, so changing the slot name at the
+ * same time is not allowed.
+ */
+ if (update_two_phase &&
+ IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("slot_name and two_phase cannot be altered at the same time")));
+
+ /*
+ * Note that workers may still survive even if the
+ * subscription has been disabled.
+ *
+ * Ensure workers have already been exited to avoid
+ * getting prepared transactions while we are disabling
+ * the two_phase option. Otherwise, the changes of an
+ * already prepared transaction can be replicated again
+ * along with its corresponding commit, leading to
+ * duplicate data or errors.
+ */
+ if (logicalrep_workers_find(subid, true, true))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot set %s for a subscription that does not have a slot name",
- "failover")));
+ errmsg("cannot alter two_phase when logical replication worker is still running"),
+ errhint("Try again after some time.")));
/*
- * Do not allow changing the failover state if the
- * subscription is enabled. This is because the failover
- * state of the slot on the publisher cannot be modified
- * if the slot is currently acquired by the apply worker.
+ * two_phase cannot be disabled if there are any
+ * uncommitted prepared transactions present otherwise it
+ * can lead to duplicate data or errors as explained in
+ * the comment above.
*/
- if (sub->enabled)
+ if (update_two_phase &&
+ sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
+ LookupGXactBySubid(subid))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot set %s for enabled subscription",
- "failover")));
+ errmsg("cannot disable two_phase when prepared transactions are present"),
+ errhint("Resolve these transactions and try again.")));
+
+ /* Change system catalog accordingly */
+ values[Anum_pg_subscription_subtwophasestate - 1] =
+ CharGetDatum(opts.twophase ?
+ LOGICALREP_TWOPHASE_STATE_PENDING :
+ LOGICALREP_TWOPHASE_STATE_DISABLED);
+ replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
+ }
+ if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
+ {
/*
- * The changed failover option of the slot can't be rolled
- * back.
+ * Similar to the two_phase case above, we need to update
+ * the failover option for both the slot and the
+ * subscription.
*/
- PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... SET (failover)");
+ update_failover = true;
+
+ CheckAlterSubOption(sub, "failover", update_failover,
+ isTopLevel);
values[Anum_pg_subscription_subfailover - 1] =
BoolGetDatum(opts.failover);
@@ -1501,13 +1599,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
}
/*
- * Try to acquire the connection necessary for altering slot.
+ * Try to acquire the connection necessary for altering the slot, if
+ * needed.
*
* This has to be at the end because otherwise if there is an error while
* doing the database operations we won't be able to rollback altered
* slot.
*/
- if (replaces[Anum_pg_subscription_subfailover - 1])
+ if (update_failover || update_two_phase)
{
bool must_use_password;
char *err;
@@ -1528,7 +1627,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
PG_TRY();
{
- walrcv_alter_slot(wrconn, sub->slotname, opts.failover);
+ walrcv_alter_slot(wrconn, sub->slotname,
+ update_failover ? &opts.failover : NULL,
+ update_two_phase ? &opts.twophase : NULL);
}
PG_FINALLY();
{
@@ -1675,9 +1776,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* New workers won't be started because we hold an exclusive lock on the
* subscription till the end of the transaction.
*/
- LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- subworkers = logicalrep_workers_find(subid, false);
- LWLockRelease(LogicalRepWorkerLock);
+ subworkers = logicalrep_workers_find(subid, false, true);
foreach(lc, subworkers)
{
LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 6c42c209d29..97f957cd87b 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -80,7 +80,7 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
CRSSnapshotAction snapshot_action,
XLogRecPtr *lsn);
static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
- bool failover);
+ const bool *failover, const bool *two_phase);
static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
const char *query,
@@ -1121,15 +1121,27 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
*/
static void
libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
- bool failover)
+ const bool *failover, const bool *two_phase)
{
StringInfoData cmd;
PGresult *res;
initStringInfo(&cmd);
- appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )",
- quote_identifier(slotname),
- failover ? "true" : "false");
+ appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( ",
+ quote_identifier(slotname));
+
+ if (failover)
+ appendStringInfo(&cmd, "FAILOVER %s",
+ *failover ? "true" : "false");
+
+ if (failover && two_phase)
+ appendStringInfo(&cmd, ", ");
+
+ if (two_phase)
+ appendStringInfo(&cmd, "TWO_PHASE %s",
+ *two_phase ? "true" : "false");
+
+ appendStringInfoString(&cmd, " );");
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
pfree(cmd.data);
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 27c3a91fb75..c566d50a072 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -272,11 +272,14 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
* the subscription, instead of just one.
*/
List *
-logicalrep_workers_find(Oid subid, bool only_running)
+logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
{
int i;
List *res = NIL;
+ if (acquire_lock)
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
/* Search for attached worker for a given subscription id. */
@@ -288,6 +291,9 @@ logicalrep_workers_find(Oid subid, bool only_running)
res = lappend(res, w);
}
+ if (acquire_lock)
+ LWLockRelease(LogicalRepWorkerLock);
+
return res;
}
@@ -759,7 +765,7 @@ logicalrep_worker_detach(void)
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true);
+ workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
foreach(lc, workers)
{
LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c0bda6269bd..ec96b5fe85e 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -401,9 +401,6 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
LogicalRepTupleData *newtup,
CmdType operation);
-/* Compute GID for two_phase transactions */
-static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
-
/* Functions for skipping changes */
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
static void stop_skipping_changes(void);
@@ -3911,7 +3908,7 @@ maybe_reread_subscription(void)
/* !slotname should never happen when enabled is true. */
Assert(newsub->slotname);
- /* two-phase should not be altered */
+ /* two-phase cannot be altered while the worker is running */
Assert(newsub->twophasestate == MySubscription->twophasestate);
/*
@@ -4397,24 +4394,6 @@ cleanup_subxact_info()
}
/*
- * Form the prepared transaction GID for two_phase transactions.
- *
- * Return the GID in the supplied buffer.
- */
-static void
-TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
-{
- Assert(subid != InvalidRepOriginId);
-
- if (!TransactionIdIsValid(xid))
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("invalid two-phase transaction ID")));
-
- snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
-}
-
-/*
* Common function to run the apply loop with error handling. Disable the
* subscription, if necessary.
*
@@ -5014,7 +4993,7 @@ AtEOXact_LogicalRepWorkers(bool isCommit)
List *workers;
ListCell *lc2;
- workers = logicalrep_workers_find(subid, true);
+ workers = logicalrep_workers_find(subid, true, false);
foreach(lc2, workers)
{
LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index baf9b89dc42..c290339af5f 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -804,9 +804,13 @@ ReplicationSlotDrop(const char *name, bool nowait)
* Change the definition of the slot identified by the specified name.
*/
void
-ReplicationSlotAlter(const char *name, bool failover)
+ReplicationSlotAlter(const char *name, const bool *failover,
+ const bool *two_phase)
{
+ bool update_slot = false;
+
Assert(MyReplicationSlot == NULL);
+ Assert(failover || two_phase);
ReplicationSlotAcquire(name, false);
@@ -832,28 +836,45 @@ ReplicationSlotAlter(const char *name, bool failover)
* Do not allow users to enable failover on the standby as we do not
* support sync to the cascading standby.
*/
- if (failover)
+ if (failover && *failover)
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot enable failover for a replication slot"
" on the standby"));
}
- /*
- * Do not allow users to enable failover for temporary slots as we do not
- * support syncing temporary slots to the standby.
- */
- if (failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
- ereport(ERROR,
- errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("cannot enable failover for a temporary replication slot"));
+ if (failover)
+ {
+ /*
+ * Do not allow users to enable failover for temporary slots as we do
+ * not support syncing temporary slots to the standby.
+ */
+ if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot enable failover for a temporary replication slot"));
+
+ if (MyReplicationSlot->data.failover != *failover)
+ {
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ MyReplicationSlot->data.failover = *failover;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
+ update_slot = true;
+ }
+ }
- if (MyReplicationSlot->data.failover != failover)
+ if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
{
SpinLockAcquire(&MyReplicationSlot->mutex);
- MyReplicationSlot->data.failover = failover;
+ MyReplicationSlot->data.two_phase = *two_phase;
SpinLockRelease(&MyReplicationSlot->mutex);
+ update_slot = true;
+ }
+
+ if (update_slot)
+ {
ReplicationSlotMarkDirty();
ReplicationSlotSave();
}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index ca205594bd0..c5f1009f370 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1407,12 +1407,15 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
}
/*
- * Process extra options given to ALTER_REPLICATION_SLOT.
+ * Change the definition of a replication slot.
*/
static void
-ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
+AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
{
bool failover_given = false;
+ bool two_phase_given = false;
+ bool failover;
+ bool two_phase;
/* Parse options */
foreach_ptr(DefElem, defel, cmd->options)
@@ -1424,23 +1427,24 @@ ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
failover_given = true;
- *failover = defGetBoolean(defel);
+ failover = defGetBoolean(defel);
+ }
+ else if (strcmp(defel->defname, "two_phase") == 0)
+ {
+ if (two_phase_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ two_phase_given = true;
+ two_phase = defGetBoolean(defel);
}
else
elog(ERROR, "unrecognized option: %s", defel->defname);
}
-}
-
-/*
- * Change the definition of a replication slot.
- */
-static void
-AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
-{
- bool failover = false;
- ParseAlterReplSlotOptions(cmd, &failover);
- ReplicationSlotAlter(cmd->slotname, failover);
+ ReplicationSlotAlter(cmd->slotname,
+ failover_given ? &failover : NULL,
+ two_phase_given ? &two_phase : NULL);
}
/*
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index d453e224d93..891face1b65 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1948,7 +1948,7 @@ psql_completion(const char *text, int start, int end)
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
COMPLETE_WITH("binary", "disable_on_error", "failover", "origin",
"password_required", "run_as_owner", "slot_name",
- "streaming", "synchronous_commit");
+ "streaming", "synchronous_commit", "two_phase");
/* ALTER SUBSCRIPTION <name> SKIP ( */
else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
COMPLETE_WITH("lsn");
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 56248c00063..b85b65c604e 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -62,4 +62,9 @@ extern void PrepareRedoRemove(TransactionId xid, bool giveWarning);
extern void restoreTwoPhaseData(void);
extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
TimestampTz origin_prepare_timestamp);
+
+extern void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res,
+ int szgid);
+extern bool LookupGXactBySubid(Oid subid);
+
#endif /* TWOPHASE_H */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index c9675ee87cc..c2ee149fd66 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -243,7 +243,8 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char *name, bool nowait);
extern void ReplicationSlotDropAcquired(void);
-extern void ReplicationSlotAlter(const char *name, bool failover);
+extern void ReplicationSlotAlter(const char *name, const bool *failover,
+ const bool *two_phase);
extern void ReplicationSlotAcquire(const char *name, bool nowait);
extern void ReplicationSlotRelease(void);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 12f71fa99b0..132e789948b 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -372,12 +372,14 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
/*
* walrcv_alter_slot_fn
*
- * Change the definition of a replication slot. Currently, it only supports
- * changing the failover property of the slot.
+ * Change the definition of a replication slot. Currently, it supports
+ * changing the failover and two_phase properties of the slot.
*/
typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
const char *slotname,
- bool failover);
+ const bool *failover,
+ const bool *two_phase);
+
/*
* walrcv_get_backend_pid_fn
@@ -455,8 +457,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \
WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
-#define walrcv_alter_slot(conn, slotname, failover) \
- WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover)
+#define walrcv_alter_slot(conn, slotname, failover, two_phase) \
+ WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover, two_phase)
#define walrcv_get_backend_pid(conn) \
WalReceiverFunctions->walrcv_get_backend_pid(conn)
#define walrcv_exec(conn, exec, nRetTypes, retTypes) \
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 515aefd5191..9646261d7e9 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -240,7 +240,8 @@ extern PGDLLIMPORT bool InitializingApplyWorker;
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
-extern List *logicalrep_workers_find(Oid subid, bool only_running);
+extern List *logicalrep_workers_find(Oid subid, bool only_running,
+ bool acquire_lock);
extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
Oid dbid, Oid subid, const char *subname,
Oid userid, Oid relid,
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 5c2f1ee5171..17d48b16857 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -377,10 +377,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
---fail - alter of two_phase option not supported.
-ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
-ERROR: unrecognized subscription parameter: "two_phase"
--- but can alter streaming when two_phase enabled
+-- we can alter streaming when two_phase enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
List of subscriptions
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 3e5ba4cb8c6..007c9e70374 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -256,10 +256,7 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
\dRs+
---fail - alter of two_phase option not supported.
-ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
-
--- but can alter streaming when two_phase enabled
+-- we can alter streaming when two_phase enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl
index 9437cd4c3b7..a47d3b7dd6e 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -367,6 +367,99 @@ $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
is($result, qq(2), 'replicated data in subscriber table');
+# Clean up
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+###############################
+# Alter the subscription to set two_phase to false.
+# Verify that the altered subscription reflects the new two_phase option.
+###############################
+
+# Confirm that the two-phase slot option is enabled before altering
+$result = $node_publisher->safe_psql('postgres',
+ "SELECT two_phase FROM pg_replication_slots WHERE slot_name = 'tap_sub_copy';"
+);
+is($result, qq(t), 'two-phase is enabled');
+
+# Alter subscription two_phase to false
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub_copy DISABLE;");
+$node_subscriber->poll_query_until('postgres',
+ "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication worker'"
+);
+$node_subscriber->safe_psql(
+ 'postgres', "
+ ALTER SUBSCRIPTION tap_sub_copy SET (two_phase = false);
+ ALTER SUBSCRIPTION tap_sub_copy ENABLE;");
+
+# Wait for subscription startup
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy);
+
+# Make sure that the two-phase is disabled on the subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT subtwophasestate FROM pg_subscription WHERE subname = 'tap_sub_copy';"
+);
+is($result, qq(d), 'two-phase subscription option should be disabled');
+
+# Make sure that the two-phase slot option is also disabled
+$result = $node_publisher->safe_psql('postgres',
+ "SELECT two_phase FROM pg_replication_slots WHERE slot_name = 'tap_sub_copy';"
+);
+is($result, qq(f), 'two-phase slot option should be disabled');
+
+###############################
+# Now do a prepare on the publisher and verify that it is not replicated.
+###############################
+$node_publisher->safe_psql(
+ 'postgres', qq{
+ BEGIN;
+ INSERT INTO tab_copy VALUES (100);
+ PREPARE TRANSACTION 'newgid';
+ });
+
+# Wait for the subscriber to catchup
+$node_publisher->wait_for_catchup($appname_copy);
+
+# Make sure there are no prepared transactions on the subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'should be no prepared transactions on subscriber');
+
+###############################
+# Set two_phase to "true" and failover to "true" before the COMMIT PREPARED.
+#
+# This tests the scenario where both two_phase and failover are altered
+# simultaneously.
+###############################
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub_copy DISABLE;");
+$node_subscriber->poll_query_until('postgres',
+ "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication worker'"
+);
+$node_subscriber->safe_psql(
+ 'postgres', "
+ ALTER SUBSCRIPTION tap_sub_copy SET (two_phase = true, failover = true);
+ ALTER SUBSCRIPTION tap_sub_copy ENABLE;");
+
+###############################
+# Now commit the insert and verify that it is replicated.
+###############################
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'newgid';");
+
+# Wait for the subscriber to catchup
+$node_publisher->wait_for_catchup($appname_copy);
+
+# Make sure that the committed transaction is replicated.
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
+is($result, qq(3), 'replicated data in subscriber table');
+
+# Make sure that the two-phase is enabled on the subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT subtwophasestate FROM pg_subscription WHERE subname = 'tap_sub_copy';"
+);
+is($result, qq(e), 'two-phase should be enabled');
+
$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_copy;");
$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_copy;");
@@ -374,8 +467,6 @@ $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_copy;");
# check all the cleanup
###############################
-$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
-
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_subscription");
is($result, qq(0), 'check subscription was dropped on subscriber');