diff options
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 34 |
1 files changed, 21 insertions, 13 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 4dbffea240a..c1b5ad35deb 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -799,7 +799,7 @@ StartReplication(StartReplicationCmd *cmd) } /* Send CommandComplete message */ - pq_puttextmessage('C', "START_STREAMING"); + EndReplicationCommand("START_STREAMING"); } /* @@ -1122,11 +1122,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) static void DropReplicationSlot(DropReplicationSlotCmd *cmd) { - QueryCompletion qc; - ReplicationSlotDrop(cmd->slotname, !cmd->wait); - SetQueryCompletion(&qc, CMDTAG_DROP_REPLICATION_SLOT, 0); - EndCommand(&qc, DestRemote, false); } /* @@ -1517,9 +1513,9 @@ exec_replication_command(const char *cmd_string) { int parse_rc; Node *cmd_node; + const char *cmdtag; MemoryContext cmd_context; MemoryContext old_context; - QueryCompletion qc; /* * If WAL sender has been told that shutdown is getting close, switch its @@ -1619,40 +1615,53 @@ exec_replication_command(const char *cmd_string) switch (cmd_node->type) { case T_IdentifySystemCmd: + cmdtag = "IDENTIFY_SYSTEM"; IdentifySystem(); + EndReplicationCommand(cmdtag); break; case T_BaseBackupCmd: - PreventInTransactionBlock(true, "BASE_BACKUP"); + cmdtag = "BASE_BACKUP"; + PreventInTransactionBlock(true, cmdtag); SendBaseBackup((BaseBackupCmd *) cmd_node); + EndReplicationCommand(cmdtag); break; case T_CreateReplicationSlotCmd: + cmdtag = "CREATE_REPLICATION_SLOT"; CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node); + EndReplicationCommand(cmdtag); break; case T_DropReplicationSlotCmd: + cmdtag = "DROP_REPLICATION_SLOT"; DropReplicationSlot((DropReplicationSlotCmd *) cmd_node); + EndReplicationCommand(cmdtag); break; case T_StartReplicationCmd: { StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; - PreventInTransactionBlock(true, "START_REPLICATION"); + cmdtag = "START_REPLICATION"; + PreventInTransactionBlock(true, cmdtag); if (cmd->kind == REPLICATION_KIND_PHYSICAL) StartReplication(cmd); else StartLogicalReplication(cmd); + /* callees already sent their own completion message */ + Assert(xlogreader != NULL); break; } case T_TimeLineHistoryCmd: - PreventInTransactionBlock(true, "TIMELINE_HISTORY"); + cmdtag = "TIMELINE_HISTORY"; + PreventInTransactionBlock(true, cmdtag); SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node); + EndReplicationCommand(cmdtag); break; case T_VariableShowStmt: @@ -1660,10 +1669,13 @@ exec_replication_command(const char *cmd_string) DestReceiver *dest = CreateDestReceiver(DestRemoteSimple); VariableShowStmt *n = (VariableShowStmt *) cmd_node; + cmdtag = "SHOW"; + /* syscache access needs a transaction environment */ StartTransactionCommand(); GetPGVariable(n->name, dest); CommitTransactionCommand(); + EndReplicationCommand(cmdtag); } break; @@ -1676,10 +1688,6 @@ exec_replication_command(const char *cmd_string) MemoryContextSwitchTo(old_context); MemoryContextDelete(cmd_context); - /* Send CommandComplete message */ - SetQueryCompletion(&qc, CMDTAG_SELECT, 0); - EndCommand(&qc, DestRemote, true); - /* Report to pgstat that this process is now idle */ pgstat_report_activity(STATE_IDLE, NULL); debug_query_string = NULL; |