diff options
Diffstat (limited to 'src/backend/libpq/pqcomm.c')
-rw-r--r-- | src/backend/libpq/pqcomm.c | 98 |
1 files changed, 62 insertions, 36 deletions
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 605d8913b16..dcbb704c6a5 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -102,7 +102,6 @@ int Unix_socket_permissions; char *Unix_socket_group; - /* Where the Unix socket files are (list of palloc'd strings) */ static List *sock_paths = NIL; @@ -134,16 +133,38 @@ static bool DoingCopyOut; /* Internal functions */ -static void pq_close(int code, Datum arg); +static void socket_comm_reset(void); +static void socket_close(int code, Datum arg); +static void socket_set_nonblocking(bool nonblocking); +static int socket_flush(void); +static int socket_flush_if_writable(void); +static bool socket_is_send_pending(void); +static int socket_putmessage(char msgtype, const char *s, size_t len); +static void socket_putmessage_noblock(char msgtype, const char *s, size_t len); +static void socket_startcopyout(void); +static void socket_endcopyout(bool errorAbort); static int internal_putbytes(const char *s, size_t len); static int internal_flush(void); -static void pq_set_nonblocking(bool nonblocking); +static void socket_set_nonblocking(bool nonblocking); #ifdef HAVE_UNIX_SOCKETS static int Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath); static int Setup_AF_UNIX(char *sock_path); #endif /* HAVE_UNIX_SOCKETS */ +PQcommMethods PQcommSocketMethods; + +static PQcommMethods PqCommSocketMethods = { + socket_comm_reset, + socket_flush, + socket_flush_if_writable, + socket_is_send_pending, + socket_putmessage, + socket_putmessage_noblock, + socket_startcopyout, + socket_endcopyout +}; + /* -------------------------------- * pq_init - initialize libpq at backend startup @@ -152,24 +173,25 @@ static int Setup_AF_UNIX(char *sock_path); void pq_init(void) { + PqCommMethods = &PqCommSocketMethods; PqSendBufferSize = PQ_SEND_BUFFER_SIZE; PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize); PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0; PqCommBusy = false; DoingCopyOut = false; - on_proc_exit(pq_close, 0); + on_proc_exit(socket_close, 0); } /* -------------------------------- - * pq_comm_reset - reset libpq during error recovery + * socket_comm_reset - reset libpq during error recovery * * This is called from error recovery at the outer idle loop. It's * just to get us out of trouble if we somehow manage to elog() from * inside a pqcomm.c routine (which ideally will never happen, but...) * -------------------------------- */ -void -pq_comm_reset(void) +static void +socket_comm_reset(void) { /* Do not throw away pending data, but do reset the busy flag */ PqCommBusy = false; @@ -178,14 +200,14 @@ pq_comm_reset(void) } /* -------------------------------- - * pq_close - shutdown libpq at backend exit + * socket_close - shutdown libpq at backend exit * * Note: in a standalone backend MyProcPort will be null, * don't crash during exit... * -------------------------------- */ static void -pq_close(int code, Datum arg) +socket_close(int code, Datum arg) { if (MyProcPort != NULL) { @@ -783,15 +805,20 @@ TouchSocketFiles(void) */ /* -------------------------------- - * pq_set_nonblocking - set socket blocking/non-blocking + * socket_set_nonblocking - set socket blocking/non-blocking * * Sets the socket non-blocking if nonblocking is TRUE, or sets it * blocking otherwise. * -------------------------------- */ static void -pq_set_nonblocking(bool nonblocking) +socket_set_nonblocking(bool nonblocking) { + if (MyProcPort == NULL) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), + errmsg("there is no client connection"))); + if (MyProcPort->noblock == nonblocking) return; @@ -844,7 +871,7 @@ pq_recvbuf(void) } /* Ensure that we're in blocking mode */ - pq_set_nonblocking(false); + socket_set_nonblocking(false); /* Can fill buffer from PqRecvLength and upwards */ for (;;) @@ -935,7 +962,7 @@ pq_getbyte_if_available(unsigned char *c) } /* Put the socket into non-blocking mode */ - pq_set_nonblocking(true); + socket_set_nonblocking(true); r = secure_read(MyProcPort, c, 1); if (r < 0) @@ -1194,7 +1221,7 @@ internal_putbytes(const char *s, size_t len) /* If buffer is full, then flush it out */ if (PqSendPointer >= PqSendBufferSize) { - pq_set_nonblocking(false); + socket_set_nonblocking(false); if (internal_flush()) return EOF; } @@ -1210,13 +1237,13 @@ internal_putbytes(const char *s, size_t len) } /* -------------------------------- - * pq_flush - flush pending output + * socket_flush - flush pending output * * returns 0 if OK, EOF if trouble * -------------------------------- */ -int -pq_flush(void) +static int +socket_flush(void) { int res; @@ -1224,7 +1251,7 @@ pq_flush(void) if (PqCommBusy) return 0; PqCommBusy = true; - pq_set_nonblocking(false); + socket_set_nonblocking(false); res = internal_flush(); PqCommBusy = false; return res; @@ -1310,8 +1337,8 @@ internal_flush(void) * Returns 0 if OK, or EOF if trouble. * -------------------------------- */ -int -pq_flush_if_writable(void) +static int +socket_flush_if_writable(void) { int res; @@ -1324,7 +1351,7 @@ pq_flush_if_writable(void) return 0; /* Temporarily put the socket into non-blocking mode */ - pq_set_nonblocking(true); + socket_set_nonblocking(true); PqCommBusy = true; res = internal_flush(); @@ -1333,11 +1360,11 @@ pq_flush_if_writable(void) } /* -------------------------------- - * pq_is_send_pending - is there any pending data in the output buffer? + * socket_is_send_pending - is there any pending data in the output buffer? * -------------------------------- */ -bool -pq_is_send_pending(void) +static bool +socket_is_send_pending(void) { return (PqSendStart < PqSendPointer); } @@ -1351,7 +1378,7 @@ pq_is_send_pending(void) /* -------------------------------- - * pq_putmessage - send a normal message (suppressed in COPY OUT mode) + * socket_putmessage - send a normal message (suppressed in COPY OUT mode) * * If msgtype is not '\0', it is a message type code to place before * the message body. If msgtype is '\0', then the message has no type @@ -1375,8 +1402,8 @@ pq_is_send_pending(void) * returns 0 if OK, EOF if trouble * -------------------------------- */ -int -pq_putmessage(char msgtype, const char *s, size_t len) +static int +socket_putmessage(char msgtype, const char *s, size_t len) { if (DoingCopyOut || PqCommBusy) return 0; @@ -1408,8 +1435,8 @@ fail: * If the output buffer is too small to hold the message, the buffer * is enlarged. */ -void -pq_putmessage_noblock(char msgtype, const char *s, size_t len) +static void +socket_putmessage_noblock(char msgtype, const char *s, size_t len) { int res PG_USED_FOR_ASSERTS_ONLY; int required; @@ -1431,18 +1458,18 @@ pq_putmessage_noblock(char msgtype, const char *s, size_t len) /* -------------------------------- - * pq_startcopyout - inform libpq that an old-style COPY OUT transfer + * socket_startcopyout - inform libpq that an old-style COPY OUT transfer * is beginning * -------------------------------- */ -void -pq_startcopyout(void) +static void +socket_startcopyout(void) { DoingCopyOut = true; } /* -------------------------------- - * pq_endcopyout - end an old-style COPY OUT transfer + * socket_endcopyout - end an old-style COPY OUT transfer * * If errorAbort is indicated, we are aborting a COPY OUT due to an error, * and must send a terminator line. Since a partial data line might have @@ -1451,8 +1478,8 @@ pq_startcopyout(void) * not allow binary transfers, so a textual terminator is always correct. * -------------------------------- */ -void -pq_endcopyout(bool errorAbort) +static void +socket_endcopyout(bool errorAbort) { if (!DoingCopyOut) return; @@ -1462,7 +1489,6 @@ pq_endcopyout(bool errorAbort) DoingCopyOut = false; } - /* * Support for TCP Keepalive parameters */ |