diff options
Diffstat (limited to 'src/interfaces')
-rw-r--r-- | src/interfaces/libpq/fe-cancel.c | 102 | ||||
-rw-r--r-- | src/interfaces/libpq/fe-connect.c | 15 | ||||
-rw-r--r-- | src/interfaces/libpq/fe-protocol3.c | 45 | ||||
-rw-r--r-- | src/interfaces/libpq/libpq-int.h | 7 |
4 files changed, 138 insertions, 31 deletions
diff --git a/src/interfaces/libpq/fe-cancel.c b/src/interfaces/libpq/fe-cancel.c index 7ebaa335bba..e84e64bf2a7 100644 --- a/src/interfaces/libpq/fe-cancel.c +++ b/src/interfaces/libpq/fe-cancel.c @@ -1,7 +1,7 @@ /*------------------------------------------------------------------------- * * fe-cancel.c - * functions related to setting up a connection to the backend + * functions related to query cancellation * * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -41,7 +41,6 @@ struct pg_cancel { SockAddr raddr; /* Remote address */ int be_pid; /* PID of to-be-canceled backend */ - int be_key; /* cancel key of to-be-canceled backend */ int pgtcp_user_timeout; /* tcp user timeout */ int keepalives; /* use TCP keepalives? */ int keepalives_idle; /* time between TCP keepalives */ @@ -49,6 +48,10 @@ struct pg_cancel * retransmits */ int keepalives_count; /* maximum number of TCP keepalive * retransmits */ + + /* Pre-constructed cancel request packet starts here */ + int32 cancel_pkt_len; /* in network-byte-order */ + char cancel_req[FLEXIBLE_ARRAY_MEMBER]; /* CancelRequestPacket */ }; @@ -83,6 +86,13 @@ PQcancelCreate(PGconn *conn) return (PGcancelConn *) cancelConn; } + /* Check that we have received a cancellation key */ + if (conn->be_cancel_key_len == 0) + { + libpq_append_conn_error(cancelConn, "no cancellation key received"); + return (PGcancelConn *) cancelConn; + } + /* * Indicate that this connection is used to send a cancellation */ @@ -101,7 +111,15 @@ PQcancelCreate(PGconn *conn) * Copy cancellation token data from the original connection */ cancelConn->be_pid = conn->be_pid; - cancelConn->be_key = conn->be_key; + if (conn->be_cancel_key != NULL) + { + cancelConn->be_cancel_key = malloc(conn->be_cancel_key_len); + if (!conn->be_cancel_key) + goto oom_error; + memcpy(cancelConn->be_cancel_key, conn->be_cancel_key, conn->be_cancel_key_len); + } + cancelConn->be_cancel_key_len = conn->be_cancel_key_len; + cancelConn->pversion = conn->pversion; /* * Cancel requests should not iterate over all possible hosts. The request @@ -349,6 +367,8 @@ PGcancel * PQgetCancel(PGconn *conn) { PGcancel *cancel; + int cancel_req_len; + CancelRequestPacket *req; if (!conn) return NULL; @@ -356,13 +376,17 @@ PQgetCancel(PGconn *conn) if (conn->sock == PGINVALID_SOCKET) return NULL; - cancel = malloc(sizeof(PGcancel)); + /* Check that we have received a cancellation key */ + if (conn->be_cancel_key_len == 0) + return NULL; + + cancel_req_len = offsetof(CancelRequestPacket, cancelAuthCode) + conn->be_cancel_key_len; + cancel = malloc(offsetof(PGcancel, cancel_req) + cancel_req_len); if (cancel == NULL) return NULL; memcpy(&cancel->raddr, &conn->raddr, sizeof(SockAddr)); - cancel->be_pid = conn->be_pid; - cancel->be_key = conn->be_key; + /* We use -1 to indicate an unset connection option */ cancel->pgtcp_user_timeout = -1; cancel->keepalives = -1; @@ -405,6 +429,13 @@ PQgetCancel(PGconn *conn) goto fail; } + req = (CancelRequestPacket *) &cancel->cancel_req; + req->cancelRequestCode = (MsgType) pg_hton32(CANCEL_REQUEST_CODE); + req->backendPID = pg_hton32(conn->be_pid); + memcpy(req->cancelAuthCode, conn->be_cancel_key, conn->be_cancel_key_len); + /* include the length field itself in the length */ + cancel->cancel_pkt_len = pg_hton32(cancel_req_len + 4); + return cancel; fail: @@ -412,6 +443,42 @@ fail: return NULL; } +/* + * PQsendCancelRequest + * Submit a CancelRequest message, but don't wait for it to finish + * + * Returns: 1 if successfully submitted + * 0 if error (conn->errorMessage is set) + */ +int +PQsendCancelRequest(PGconn *cancelConn) +{ + CancelRequestPacket req; + + /* Start the message. */ + if (pqPutMsgStart(0, cancelConn)) + return STATUS_ERROR; + + /* Send the message body. */ + memset(&req, 0, offsetof(CancelRequestPacket, cancelAuthCode)); + req.cancelRequestCode = (MsgType) pg_hton32(CANCEL_REQUEST_CODE); + req.backendPID = pg_hton32(cancelConn->be_pid); + if (pqPutnchar((char *) &req, offsetof(CancelRequestPacket, cancelAuthCode), cancelConn)) + return STATUS_ERROR; + if (pqPutnchar(cancelConn->be_cancel_key, cancelConn->be_cancel_key_len, cancelConn)) + return STATUS_ERROR; + + /* Finish the message. */ + if (pqPutMsgEnd(cancelConn)) + return STATUS_ERROR; + + /* Flush to ensure backend gets it. */ + if (pqFlush(cancelConn)) + return STATUS_ERROR; + + return STATUS_OK; +} + /* PQfreeCancel: free a cancel structure */ void PQfreeCancel(PGcancel *cancel) @@ -465,11 +532,8 @@ PQcancel(PGcancel *cancel, char *errbuf, int errbufsize) int save_errno = SOCK_ERRNO; pgsocket tmpsock = PGINVALID_SOCKET; int maxlen; - struct - { - uint32 packetlen; - CancelRequestPacket cp; - } crp; + char recvbuf; + int cancel_pkt_len; if (!cancel) { @@ -571,15 +635,15 @@ retry3: goto cancel_errReturn; } - /* Create and send the cancel request packet. */ - - crp.packetlen = pg_hton32((uint32) sizeof(crp)); - crp.cp.cancelRequestCode = (MsgType) pg_hton32(CANCEL_REQUEST_CODE); - crp.cp.backendPID = pg_hton32(cancel->be_pid); - crp.cp.cancelAuthCode = pg_hton32(cancel->be_key); + cancel_pkt_len = pg_ntoh32(cancel->cancel_pkt_len); retry4: - if (send(tmpsock, (char *) &crp, sizeof(crp), 0) != (int) sizeof(crp)) + + /* + * Send the cancel request packet. It starts with the message length at + * cancel_pkt_len, followed by the actual packet. + */ + if (send(tmpsock, (char *) &cancel->cancel_pkt_len, cancel_pkt_len, 0) != cancel_pkt_len) { if (SOCK_ERRNO == EINTR) /* Interrupted system call - we'll just try again */ @@ -596,7 +660,7 @@ retry4: * read to obtain any data, we are just waiting for EOF to be signaled. */ retry5: - if (recv(tmpsock, (char *) &crp, 1, 0) < 0) + if (recv(tmpsock, &recvbuf, 1, 0) < 0) { if (SOCK_ERRNO == EINTR) /* Interrupted system call - we'll just try again */ diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 5e3275ffd76..715b5d5aff4 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -696,7 +696,12 @@ pqDropServerData(PGconn *conn) if (!conn->cancelRequest) { conn->be_pid = 0; - conn->be_key = 0; + if (conn->be_cancel_key != NULL) + { + free(conn->be_cancel_key); + conn->be_cancel_key = NULL; + } + conn->be_cancel_key_len = 0; } } @@ -3692,13 +3697,7 @@ keep_going: /* We will come back to here until there is */ if (conn->cancelRequest) { - CancelRequestPacket cancelpacket; - - packetlen = sizeof(cancelpacket); - cancelpacket.cancelRequestCode = (MsgType) pg_hton32(CANCEL_REQUEST_CODE); - cancelpacket.backendPID = pg_hton32(conn->be_pid); - cancelpacket.cancelAuthCode = pg_hton32(conn->be_key); - if (pqPacketSend(conn, 0, &cancelpacket, packetlen) != STATUS_OK) + if (PQsendCancelRequest(conn) != STATUS_OK) { libpq_append_conn_error(conn, "could not send cancel packet: %s", SOCK_STRERROR(SOCK_ERRNO, sebuf, sizeof(sebuf))); diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 7ba49ea4592..d85910f41fc 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -48,6 +48,7 @@ static int getRowDescriptions(PGconn *conn, int msgLength); static int getParamDescriptions(PGconn *conn, int msgLength); static int getAnotherTuple(PGconn *conn, int msgLength); static int getParameterStatus(PGconn *conn); +static int getBackendKeyData(PGconn *conn, int msgLength); static int getNotify(PGconn *conn); static int getCopyStart(PGconn *conn, ExecStatusType copytype); static int getReadyForQuery(PGconn *conn); @@ -308,9 +309,7 @@ pqParseInput3(PGconn *conn) * just as easy to handle it as part of the main loop. * Save the data and continue processing. */ - if (pqGetInt(&(conn->be_pid), 4, conn)) - return; - if (pqGetInt(&(conn->be_key), 4, conn)) + if (getBackendKeyData(conn, msgLength)) return; break; case PqMsg_RowDescription: @@ -1524,6 +1523,46 @@ getParameterStatus(PGconn *conn) return 0; } +/* + * parseInput subroutine to read a BackendKeyData message. + * Entry: 'v' message type and length have already been consumed. + * Exit: returns 0 if successfully consumed message. + * returns EOF if not enough data. + */ +static int +getBackendKeyData(PGconn *conn, int msgLength) +{ + uint8 cancel_key_len; + + if (conn->be_cancel_key) + { + free(conn->be_cancel_key); + conn->be_cancel_key = NULL; + conn->be_cancel_key_len = 0; + } + + if (pqGetInt(&(conn->be_pid), 4, conn)) + return EOF; + + cancel_key_len = 5 + msgLength - (conn->inCursor - conn->inStart); + + conn->be_cancel_key = malloc(cancel_key_len); + if (conn->be_cancel_key == NULL) + { + libpq_append_conn_error(conn, "out of memory"); + /* discard the message */ + return EOF; + } + if (pqGetnchar(conn->be_cancel_key, cancel_key_len, conn)) + { + free(conn->be_cancel_key); + conn->be_cancel_key = NULL; + return EOF; + } + conn->be_cancel_key_len = cancel_key_len; + return 0; +} + /* * Attempt to read a Notify response message. diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 232e0b00f75..25de3e95055 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -546,7 +546,8 @@ struct pg_conn /* Miscellaneous stuff */ int be_pid; /* PID of backend --- needed for cancels */ - int be_key; /* key of backend --- needed for cancels */ + char *be_cancel_key; /* query cancellation key and its length */ + uint16 be_cancel_key_len; pgParameterStatus *pstatus; /* ParameterStatus data */ int client_encoding; /* encoding id */ bool std_strings; /* standard_conforming_strings */ @@ -766,6 +767,10 @@ extern PGresult *pqFunctionCall3(PGconn *conn, Oid fnid, int result_is_int, const PQArgBlock *args, int nargs); +/* === in fe-cancel.c === */ + +extern int PQsendCancelRequest(PGconn *cancelConn); + /* === in fe-misc.c === */ /* |