aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/walsender.c34
-rw-r--r--src/backend/tcop/dest.c12
-rw-r--r--src/include/tcop/cmdtaglist.h1
-rw-r--r--src/include/tcop/dest.h1
4 files changed, 34 insertions, 14 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 4dbffea240a..c1b5ad35deb 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -799,7 +799,7 @@ StartReplication(StartReplicationCmd *cmd)
}
/* Send CommandComplete message */
- pq_puttextmessage('C', "START_STREAMING");
+ EndReplicationCommand("START_STREAMING");
}
/*
@@ -1122,11 +1122,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
static void
DropReplicationSlot(DropReplicationSlotCmd *cmd)
{
- QueryCompletion qc;
-
ReplicationSlotDrop(cmd->slotname, !cmd->wait);
- SetQueryCompletion(&qc, CMDTAG_DROP_REPLICATION_SLOT, 0);
- EndCommand(&qc, DestRemote, false);
}
/*
@@ -1517,9 +1513,9 @@ exec_replication_command(const char *cmd_string)
{
int parse_rc;
Node *cmd_node;
+ const char *cmdtag;
MemoryContext cmd_context;
MemoryContext old_context;
- QueryCompletion qc;
/*
* If WAL sender has been told that shutdown is getting close, switch its
@@ -1619,40 +1615,53 @@ exec_replication_command(const char *cmd_string)
switch (cmd_node->type)
{
case T_IdentifySystemCmd:
+ cmdtag = "IDENTIFY_SYSTEM";
IdentifySystem();
+ EndReplicationCommand(cmdtag);
break;
case T_BaseBackupCmd:
- PreventInTransactionBlock(true, "BASE_BACKUP");
+ cmdtag = "BASE_BACKUP";
+ PreventInTransactionBlock(true, cmdtag);
SendBaseBackup((BaseBackupCmd *) cmd_node);
+ EndReplicationCommand(cmdtag);
break;
case T_CreateReplicationSlotCmd:
+ cmdtag = "CREATE_REPLICATION_SLOT";
CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
+ EndReplicationCommand(cmdtag);
break;
case T_DropReplicationSlotCmd:
+ cmdtag = "DROP_REPLICATION_SLOT";
DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
+ EndReplicationCommand(cmdtag);
break;
case T_StartReplicationCmd:
{
StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
- PreventInTransactionBlock(true, "START_REPLICATION");
+ cmdtag = "START_REPLICATION";
+ PreventInTransactionBlock(true, cmdtag);
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
StartReplication(cmd);
else
StartLogicalReplication(cmd);
+ /* callees already sent their own completion message */
+
Assert(xlogreader != NULL);
break;
}
case T_TimeLineHistoryCmd:
- PreventInTransactionBlock(true, "TIMELINE_HISTORY");
+ cmdtag = "TIMELINE_HISTORY";
+ PreventInTransactionBlock(true, cmdtag);
SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
+ EndReplicationCommand(cmdtag);
break;
case T_VariableShowStmt:
@@ -1660,10 +1669,13 @@ exec_replication_command(const char *cmd_string)
DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
VariableShowStmt *n = (VariableShowStmt *) cmd_node;
+ cmdtag = "SHOW";
+
/* syscache access needs a transaction environment */
StartTransactionCommand();
GetPGVariable(n->name, dest);
CommitTransactionCommand();
+ EndReplicationCommand(cmdtag);
}
break;
@@ -1676,10 +1688,6 @@ exec_replication_command(const char *cmd_string)
MemoryContextSwitchTo(old_context);
MemoryContextDelete(cmd_context);
- /* Send CommandComplete message */
- SetQueryCompletion(&qc, CMDTAG_SELECT, 0);
- EndCommand(&qc, DestRemote, true);
-
/* Report to pgstat that this process is now idle */
pgstat_report_activity(STATE_IDLE, NULL);
debug_query_string = NULL;
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index 7208751ec78..96789f88ef9 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -212,6 +212,18 @@ EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_o
}
/* ----------------
+ * EndReplicationCommand - stripped down version of EndCommand
+ *
+ * For use by replication commands.
+ * ----------------
+ */
+void
+EndReplicationCommand(const char *commandTag)
+{
+ pq_putmessage('C', commandTag, strlen(commandTag) + 1);
+}
+
+/* ----------------
* NullCommand - tell dest that an empty query string was recognized
*
* In FE/BE protocol version 1.0, this hack is necessary to support
diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h
index 8ef0f55e748..be94852bbd3 100644
--- a/src/include/tcop/cmdtaglist.h
+++ b/src/include/tcop/cmdtaglist.h
@@ -157,7 +157,6 @@ PG_CMDTAG(CMDTAG_DROP_OWNED, "DROP OWNED", true, false, false)
PG_CMDTAG(CMDTAG_DROP_POLICY, "DROP POLICY", true, false, false)
PG_CMDTAG(CMDTAG_DROP_PROCEDURE, "DROP PROCEDURE", true, false, false)
PG_CMDTAG(CMDTAG_DROP_PUBLICATION, "DROP PUBLICATION", true, false, false)
-PG_CMDTAG(CMDTAG_DROP_REPLICATION_SLOT, "DROP REPLICATION SLOT", false, false, false)
PG_CMDTAG(CMDTAG_DROP_ROLE, "DROP ROLE", false, false, false)
PG_CMDTAG(CMDTAG_DROP_ROUTINE, "DROP ROUTINE", true, false, false)
PG_CMDTAG(CMDTAG_DROP_RULE, "DROP RULE", true, false, false)
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index 662ce8a56f8..2e07f1516d1 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -139,6 +139,7 @@ extern void BeginCommand(CommandTag commandTag, CommandDest dest);
extern DestReceiver *CreateDestReceiver(CommandDest dest);
extern void EndCommand(const QueryCompletion *qc, CommandDest dest,
bool force_undecorated_output);
+extern void EndReplicationCommand(const char *commandTag);
/* Additional functions that go with destination management, more or less. */