diff options
author | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2025-04-04 12:31:00 +0300 |
---|---|---|
committer | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2025-04-04 12:38:32 +0300 |
commit | 6e9c81836e101bc7f37ddc5e2f6ab58d62efcb24 (patch) | |
tree | b0c6e5c4cf53c29e34df1e8589aa5c829b74d737 | |
parent | 8123e91f5aeb26c6e4cf583bb61c99281485af83 (diff) | |
download | postgresql-6e9c81836e101bc7f37ddc5e2f6ab58d62efcb24.tar.gz postgresql-6e9c81836e101bc7f37ddc5e2f6ab58d62efcb24.zip |
Use standard die() signal handler in walreceiver
This gets rid of the bespoken ProcessWalRcvInterrupts() function,
which lets walreceiver terminate at any CHECK_FOR_INTERRUPTS() call.
And it's less code anyway.
We can now use the standard libpqsrv_connect_params() libpq wrapper
from libpq-be-fe-helpers.h, removing more code. We attempted to do
that earlier already in commit 728f86fec6, but that was reverted
because it didn't call ProcessWalRcvInterrupts() and therefore didn't
react to shutdown requests. Now that ProcessWalRcvInterrupts() is
gone, it works. As stated in that commit, this also leads to
libpqwalreceiver reserving file descriptors for libpq conncetions,
which is nice.
Author: Andres Freund <andres@anarazel.de> (the earlier commit)
Author: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Reviewed-by: Fujii Masao <masao.fujii@gmail.com>
Reviewed-by: Yura Sokolov <y.sokolov@postgrespro.ru>
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 208 | ||||
-rw-r--r-- | src/backend/replication/walreceiver.c | 41 | ||||
-rw-r--r-- | src/backend/tcop/postgres.c | 4 | ||||
-rw-r--r-- | src/include/replication/walreceiver.h | 1 |
4 files changed, 51 insertions, 203 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index ee3101c093e..200e8f1aabd 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -25,6 +25,7 @@ #include "common/connect.h" #include "funcapi.h" #include "libpq-fe.h" +#include "libpq/libpq-be-fe-helpers.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "pgstat.h" @@ -113,8 +114,6 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { }; /* Prototypes for private functions */ -static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query); -static PGresult *libpqrcv_PQgetResult(PGconn *streamConn); static char *stringlist_to_identifierstr(PGconn *conn, List *strings); /* @@ -148,7 +147,6 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical, bool must_use_password, const char *appname, char **err) { WalReceiverConn *conn; - PostgresPollingStatusType status; const char *keys[6]; const char *vals[6]; int i = 0; @@ -214,56 +212,17 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical, Assert(i < lengthof(keys)); conn = palloc0(sizeof(WalReceiverConn)); - conn->streamConn = PQconnectStartParams(keys, vals, - /* expand_dbname = */ true); - if (PQstatus(conn->streamConn) == CONNECTION_BAD) - goto bad_connection_errmsg; - - /* - * Poll connection until we have OK or FAILED status. - * - * Per spec for PQconnectPoll, first wait till socket is write-ready. - */ - status = PGRES_POLLING_WRITING; - do - { - int io_flag; - int rc; - - if (status == PGRES_POLLING_READING) - io_flag = WL_SOCKET_READABLE; -#ifdef WIN32 - /* Windows needs a different test while waiting for connection-made */ - else if (PQstatus(conn->streamConn) == CONNECTION_STARTED) - io_flag = WL_SOCKET_CONNECTED; -#endif - else - io_flag = WL_SOCKET_WRITEABLE; - - rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag, - PQsocket(conn->streamConn), - 0, - WAIT_EVENT_LIBPQWALRECEIVER_CONNECT); - - /* Interrupted? */ - if (rc & WL_LATCH_SET) - { - ResetLatch(MyLatch); - ProcessWalRcvInterrupts(); - } - - /* If socket is ready, advance the libpq state machine */ - if (rc & io_flag) - status = PQconnectPoll(conn->streamConn); - } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED); + conn->streamConn = + libpqsrv_connect_params(keys, vals, + /* expand_dbname = */ true, + WAIT_EVENT_LIBPQWALRECEIVER_CONNECT); if (PQstatus(conn->streamConn) != CONNECTION_OK) goto bad_connection_errmsg; if (must_use_password && !PQconnectionUsedPassword(conn->streamConn)) { - PQfinish(conn->streamConn); + libpqsrv_disconnect(conn->streamConn); pfree(conn); ereport(ERROR, @@ -281,8 +240,9 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical, { PGresult *res; - res = libpqrcv_PQexec(conn->streamConn, - ALWAYS_SECURE_SEARCH_PATH_SQL); + res = libpqsrv_exec(conn->streamConn, + ALWAYS_SECURE_SEARCH_PATH_SQL, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -303,7 +263,7 @@ bad_connection_errmsg: /* error path, error already set */ bad_connection: - PQfinish(conn->streamConn); + libpqsrv_disconnect(conn->streamConn); pfree(conn); return NULL; } @@ -454,7 +414,9 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) * Get the system identifier and timeline ID as a DataRow message from the * primary server. */ - res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM"); + res = libpqsrv_exec(conn->streamConn, + "IDENTIFY_SYSTEM", + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -631,7 +593,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn, options->proto.physical.startpointTLI); /* Start streaming. */ - res = libpqrcv_PQexec(conn->streamConn, cmd.data); + res = libpqsrv_exec(conn->streamConn, + cmd.data, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); pfree(cmd.data); if (PQresultStatus(res) == PGRES_COMMAND_OK) @@ -661,7 +625,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PGresult *res; /* - * Send copy-end message. As in libpqrcv_PQexec, this could theoretically + * Send copy-end message. As in libpqsrv_exec, this could theoretically * block, but the risk seems small. */ if (PQputCopyEnd(conn->streamConn, NULL) <= 0 || @@ -681,7 +645,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is * also possible in case we aborted the copy in mid-stream. */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) == PGRES_TUPLES_OK) { /* @@ -696,7 +661,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PQclear(res); /* the result set should be followed by CommandComplete */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); } else if (PQresultStatus(res) == PGRES_COPY_OUT) { @@ -710,7 +676,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) pchomp(PQerrorMessage(conn->streamConn))))); /* CommandComplete should follow */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); } if (PQresultStatus(res) != PGRES_COMMAND_OK) @@ -721,7 +688,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) PQclear(res); /* Verify that there are no more results */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (res != NULL) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -746,7 +714,9 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, * Request the primary to send over the history file for given timeline. */ snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli); - res = libpqrcv_PQexec(conn->streamConn, cmd); + res = libpqsrv_exec(conn->streamConn, + cmd, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -777,113 +747,12 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, } /* - * Send a query and wait for the results by using the asynchronous libpq - * functions and socket readiness events. - * - * The function is modeled on libpqsrv_exec(), with the behavior difference - * being that it calls ProcessWalRcvInterrupts(). As an optimization, it - * skips try/catch, since all errors terminate the process. - * - * May return NULL, rather than an error result, on failure. - */ -static PGresult * -libpqrcv_PQexec(PGconn *streamConn, const char *query) -{ - PGresult *lastResult = NULL; - - /* - * PQexec() silently discards any prior query results on the connection. - * This is not required for this function as it's expected that the caller - * (which is this library in all cases) will behave correctly and we don't - * have to be backwards compatible with old libpq. - */ - - /* - * Submit the query. Since we don't use non-blocking mode, this could - * theoretically block. In practice, since we don't send very long query - * strings, the risk seems negligible. - */ - if (!PQsendQuery(streamConn, query)) - return NULL; - - for (;;) - { - /* Wait for, and collect, the next PGresult. */ - PGresult *result; - - result = libpqrcv_PQgetResult(streamConn); - if (result == NULL) - break; /* query is complete, or failure */ - - /* - * Emulate PQexec()'s behavior of returning the last result when there - * are many. We are fine with returning just last error message. - */ - PQclear(lastResult); - lastResult = result; - - if (PQresultStatus(lastResult) == PGRES_COPY_IN || - PQresultStatus(lastResult) == PGRES_COPY_OUT || - PQresultStatus(lastResult) == PGRES_COPY_BOTH || - PQstatus(streamConn) == CONNECTION_BAD) - break; - } - - return lastResult; -} - -/* - * Perform the equivalent of PQgetResult(), but watch for interrupts. - */ -static PGresult * -libpqrcv_PQgetResult(PGconn *streamConn) -{ - /* - * Collect data until PQgetResult is ready to get the result without - * blocking. - */ - while (PQisBusy(streamConn)) - { - int rc; - - /* - * We don't need to break down the sleep into smaller increments, - * since we'll get interrupted by signals and can handle any - * interrupts here. - */ - rc = WaitLatchOrSocket(MyLatch, - WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | - WL_LATCH_SET, - PQsocket(streamConn), - 0, - WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); - - /* Interrupted? */ - if (rc & WL_LATCH_SET) - { - ResetLatch(MyLatch); - ProcessWalRcvInterrupts(); - } - - /* Consume whatever data is available from the socket */ - if (PQconsumeInput(streamConn) == 0) - { - /* trouble; return NULL */ - return NULL; - } - } - - /* Now we can collect and return the next PGresult */ - return PQgetResult(streamConn); -} - -/* * Disconnect connection to primary, if any. */ static void libpqrcv_disconnect(WalReceiverConn *conn) { - PQfinish(conn->streamConn); + libpqsrv_disconnect(conn->streamConn); PQfreemem(conn->recvBuf); pfree(conn); } @@ -937,13 +806,15 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, { PGresult *res; - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (PQresultStatus(res) == PGRES_COMMAND_OK) { PQclear(res); /* Verify that there are no more results. */ - res = libpqrcv_PQgetResult(conn->streamConn); + res = libpqsrv_get_result(conn->streamConn, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); if (res != NULL) { PQclear(res); @@ -1094,7 +965,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL"); } - res = libpqrcv_PQexec(conn->streamConn, cmd.data); + res = libpqsrv_exec(conn->streamConn, + cmd.data, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); pfree(cmd.data); if (PQresultStatus(res) != PGRES_TUPLES_OK) @@ -1147,7 +1020,8 @@ libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, appendStringInfoString(&cmd, " );"); - res = libpqrcv_PQexec(conn->streamConn, cmd.data); + res = libpqsrv_exec(conn->streamConn, cmd.data, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); pfree(cmd.data); if (PQresultStatus(res) != PGRES_COMMAND_OK) @@ -1214,7 +1088,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, { char *cstrs[MaxTupleAttributeNumber]; - ProcessWalRcvInterrupts(); + CHECK_FOR_INTERRUPTS(); /* Do the allocations in temporary context. */ oldcontext = MemoryContextSwitchTo(rowcontext); @@ -1260,7 +1134,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("the query interface requires a database connection"))); - pgres = libpqrcv_PQexec(conn->streamConn, query); + pgres = libpqsrv_exec(conn->streamConn, + query, + WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE); switch (PQresultStatus(pgres)) { diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 2e5dd6deb2c..8c4d0fd9aed 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -71,6 +71,7 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "storage/procsignal.h" +#include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/guc.h" @@ -145,38 +146,6 @@ static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now); -/* - * Process any interrupts the walreceiver process may have received. - * This should be called any time the process's latch has become set. - * - * Currently, only SIGTERM is of interest. We can't just exit(1) within the - * SIGTERM signal handler, because the signal might arrive in the middle of - * some critical operation, like while we're holding a spinlock. Instead, the - * signal handler sets a flag variable as well as setting the process's latch. - * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the - * latch has become set. Operations that could block for a long time, such as - * reading from a remote server, must pay attention to the latch too; see - * libpqrcv_PQgetResult for example. - */ -void -ProcessWalRcvInterrupts(void) -{ - /* - * Although walreceiver interrupt handling doesn't use the same scheme as - * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive - * any incoming signals on Win32, and also to make sure we process any - * barrier events. - */ - CHECK_FOR_INTERRUPTS(); - - if (ShutdownRequestPending) - { - ereport(FATAL, - (errcode(ERRCODE_ADMIN_SHUTDOWN), - errmsg("terminating walreceiver process due to administrator command"))); - } -} - /* Main entry point for walreceiver process */ void @@ -280,7 +249,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config * file */ pqsignal(SIGINT, SIG_IGN); - pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */ + pqsignal(SIGTERM, die); /* request shutdown */ /* SIGQUIT handler was already set up by InitPostmasterChild */ pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); @@ -459,7 +428,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) errmsg("cannot continue WAL streaming, recovery has already ended"))); /* Process any requests or signals received recently */ - ProcessWalRcvInterrupts(); + CHECK_FOR_INTERRUPTS(); if (ConfigReloadPending) { @@ -555,7 +524,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) if (rc & WL_LATCH_SET) { ResetLatch(MyLatch); - ProcessWalRcvInterrupts(); + CHECK_FOR_INTERRUPTS(); if (walrcv->force_reply) { @@ -704,7 +673,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) { ResetLatch(MyLatch); - ProcessWalRcvInterrupts(); + CHECK_FOR_INTERRUPTS(); SpinLockAcquire(&walrcv->mutex); Assert(walrcv->walRcvState == WALRCV_RESTARTING || diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 89189848862..6ae9f38f0c8 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -3335,6 +3335,10 @@ ProcessInterrupts(void) */ proc_exit(1); } + else if (AmWalReceiverProcess()) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating walreceiver process due to administrator command"))); else if (AmBackgroundWorkerProcess()) ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index ecca21fecb4..89f63f908f8 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -487,7 +487,6 @@ walrcv_clear_result(WalRcvExecResult *walres) /* prototypes for functions in walreceiver.c */ pg_noreturn extern void WalReceiverMain(const void *startup_data, size_t startup_data_len); -extern void ProcessWalRcvInterrupts(void); extern void WalRcvForceReply(void); /* prototypes for functions in walreceiverfuncs.c */ |