aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c34
1 files changed, 21 insertions, 13 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 4dbffea240a..c1b5ad35deb 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -799,7 +799,7 @@ StartReplication(StartReplicationCmd *cmd)
}
/* Send CommandComplete message */
- pq_puttextmessage('C', "START_STREAMING");
+ EndReplicationCommand("START_STREAMING");
}
/*
@@ -1122,11 +1122,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
static void
DropReplicationSlot(DropReplicationSlotCmd *cmd)
{
- QueryCompletion qc;
-
ReplicationSlotDrop(cmd->slotname, !cmd->wait);
- SetQueryCompletion(&qc, CMDTAG_DROP_REPLICATION_SLOT, 0);
- EndCommand(&qc, DestRemote, false);
}
/*
@@ -1517,9 +1513,9 @@ exec_replication_command(const char *cmd_string)
{
int parse_rc;
Node *cmd_node;
+ const char *cmdtag;
MemoryContext cmd_context;
MemoryContext old_context;
- QueryCompletion qc;
/*
* If WAL sender has been told that shutdown is getting close, switch its
@@ -1619,40 +1615,53 @@ exec_replication_command(const char *cmd_string)
switch (cmd_node->type)
{
case T_IdentifySystemCmd:
+ cmdtag = "IDENTIFY_SYSTEM";
IdentifySystem();
+ EndReplicationCommand(cmdtag);
break;
case T_BaseBackupCmd:
- PreventInTransactionBlock(true, "BASE_BACKUP");
+ cmdtag = "BASE_BACKUP";
+ PreventInTransactionBlock(true, cmdtag);
SendBaseBackup((BaseBackupCmd *) cmd_node);
+ EndReplicationCommand(cmdtag);
break;
case T_CreateReplicationSlotCmd:
+ cmdtag = "CREATE_REPLICATION_SLOT";
CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
+ EndReplicationCommand(cmdtag);
break;
case T_DropReplicationSlotCmd:
+ cmdtag = "DROP_REPLICATION_SLOT";
DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
+ EndReplicationCommand(cmdtag);
break;
case T_StartReplicationCmd:
{
StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
- PreventInTransactionBlock(true, "START_REPLICATION");
+ cmdtag = "START_REPLICATION";
+ PreventInTransactionBlock(true, cmdtag);
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
StartReplication(cmd);
else
StartLogicalReplication(cmd);
+ /* callees already sent their own completion message */
+
Assert(xlogreader != NULL);
break;
}
case T_TimeLineHistoryCmd:
- PreventInTransactionBlock(true, "TIMELINE_HISTORY");
+ cmdtag = "TIMELINE_HISTORY";
+ PreventInTransactionBlock(true, cmdtag);
SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
+ EndReplicationCommand(cmdtag);
break;
case T_VariableShowStmt:
@@ -1660,10 +1669,13 @@ exec_replication_command(const char *cmd_string)
DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
VariableShowStmt *n = (VariableShowStmt *) cmd_node;
+ cmdtag = "SHOW";
+
/* syscache access needs a transaction environment */
StartTransactionCommand();
GetPGVariable(n->name, dest);
CommitTransactionCommand();
+ EndReplicationCommand(cmdtag);
}
break;
@@ -1676,10 +1688,6 @@ exec_replication_command(const char *cmd_string)
MemoryContextSwitchTo(old_context);
MemoryContextDelete(cmd_context);
- /* Send CommandComplete message */
- SetQueryCompletion(&qc, CMDTAG_SELECT, 0);
- EndCommand(&qc, DestRemote, true);
-
/* Report to pgstat that this process is now idle */
pgstat_report_activity(STATE_IDLE, NULL);
debug_query_string = NULL;