aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/storage/ipc/latch.c79
-rw-r--r--src/include/storage/latch.h6
2 files changed, 74 insertions, 11 deletions
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 5bb609b368d..c3aaa8bff03 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -840,6 +840,7 @@ FreeWaitEventSet(WaitEventSet *set)
* - WL_SOCKET_CONNECTED: Wait for socket connection to be established,
* can be combined with other WL_SOCKET_* events (on non-Windows
* platforms, this is the same as WL_SOCKET_WRITEABLE)
+ * - WL_SOCKET_CLOSED: Wait for socket to be closed by remote peer.
* - WL_EXIT_ON_PM_DEATH: Exit immediately if the postmaster dies
*
* Returns the offset in WaitEventSet->events (starting from 0), which can be
@@ -1042,12 +1043,16 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
else
{
Assert(event->fd != PGINVALID_SOCKET);
- Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE));
+ Assert(event->events & (WL_SOCKET_READABLE |
+ WL_SOCKET_WRITEABLE |
+ WL_SOCKET_CLOSED));
if (event->events & WL_SOCKET_READABLE)
epoll_ev.events |= EPOLLIN;
if (event->events & WL_SOCKET_WRITEABLE)
epoll_ev.events |= EPOLLOUT;
+ if (event->events & WL_SOCKET_CLOSED)
+ epoll_ev.events |= EPOLLRDHUP;
}
/*
@@ -1086,12 +1091,18 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
}
else
{
- Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE));
+ Assert(event->events & (WL_SOCKET_READABLE |
+ WL_SOCKET_WRITEABLE |
+ WL_SOCKET_CLOSED));
pollfd->events = 0;
if (event->events & WL_SOCKET_READABLE)
pollfd->events |= POLLIN;
if (event->events & WL_SOCKET_WRITEABLE)
pollfd->events |= POLLOUT;
+#ifdef POLLRDHUP
+ if (event->events & WL_SOCKET_CLOSED)
+ pollfd->events |= POLLRDHUP;
+#endif
}
Assert(event->fd != PGINVALID_SOCKET);
@@ -1164,7 +1175,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
Assert(event->events != WL_LATCH_SET || set->latch != NULL);
Assert(event->events == WL_LATCH_SET ||
event->events == WL_POSTMASTER_DEATH ||
- (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)));
+ (event->events & (WL_SOCKET_READABLE |
+ WL_SOCKET_WRITEABLE |
+ WL_SOCKET_CLOSED)));
if (event->events == WL_POSTMASTER_DEATH)
{
@@ -1187,9 +1200,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
* old event mask to the new event mask, since kevent treats readable
* and writable as separate events.
*/
- if (old_events & WL_SOCKET_READABLE)
+ if (old_events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED))
old_filt_read = true;
- if (event->events & WL_SOCKET_READABLE)
+ if (event->events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED))
new_filt_read = true;
if (old_events & WL_SOCKET_WRITEABLE)
old_filt_write = true;
@@ -1209,7 +1222,10 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
event);
}
- Assert(count > 0);
+ /* For WL_SOCKET_READ -> WL_SOCKET_CLOSED, no change needed. */
+ if (count == 0)
+ return;
+
Assert(count <= 2);
rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL);
@@ -1524,7 +1540,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
returned_events++;
}
}
- else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+ else if (cur_event->events & (WL_SOCKET_READABLE |
+ WL_SOCKET_WRITEABLE |
+ WL_SOCKET_CLOSED))
{
Assert(cur_event->fd != PGINVALID_SOCKET);
@@ -1542,6 +1560,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
occurred_events->events |= WL_SOCKET_WRITEABLE;
}
+ if ((cur_event->events & WL_SOCKET_CLOSED) &&
+ (cur_epoll_event->events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)))
+ {
+ /* remote peer shut down, or error */
+ occurred_events->events |= WL_SOCKET_CLOSED;
+ }
+
if (occurred_events->events != 0)
{
occurred_events->fd = cur_event->fd;
@@ -1667,7 +1692,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
occurred_events++;
returned_events++;
}
- else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+ else if (cur_event->events & (WL_SOCKET_READABLE |
+ WL_SOCKET_WRITEABLE |
+ WL_SOCKET_CLOSED))
{
Assert(cur_event->fd >= 0);
@@ -1678,6 +1705,14 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
occurred_events->events |= WL_SOCKET_READABLE;
}
+ if ((cur_event->events & WL_SOCKET_CLOSED) &&
+ (cur_kqueue_event->filter == EVFILT_READ) &&
+ (cur_kqueue_event->flags & EV_EOF))
+ {
+ /* the remote peer has shut down */
+ occurred_events->events |= WL_SOCKET_CLOSED;
+ }
+
if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
(cur_kqueue_event->filter == EVFILT_WRITE))
{
@@ -1788,7 +1823,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
returned_events++;
}
}
- else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+ else if (cur_event->events & (WL_SOCKET_READABLE |
+ WL_SOCKET_WRITEABLE |
+ WL_SOCKET_CLOSED))
{
int errflags = POLLHUP | POLLERR | POLLNVAL;
@@ -1808,6 +1845,15 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
occurred_events->events |= WL_SOCKET_WRITEABLE;
}
+#ifdef POLLRDHUP
+ if ((cur_event->events & WL_SOCKET_CLOSED) &&
+ (cur_pollfd->revents & (POLLRDHUP | errflags)))
+ {
+ /* remote peer closed, or error */
+ occurred_events->events |= WL_SOCKET_CLOSED;
+ }
+#endif
+
if (occurred_events->events != 0)
{
occurred_events->fd = cur_event->fd;
@@ -2015,6 +2061,21 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
#endif
/*
+ * Return whether the current build options can report WL_SOCKET_CLOSED.
+ */
+bool
+WaitEventSetCanReportClosed(void)
+{
+#if (defined(WAIT_USE_POLL) && defined(POLLRDHUP)) || \
+ defined(WAIT_USE_EPOLL) || \
+ defined(WAIT_USE_KQUEUE)
+ return true;
+#else
+ return false;
+#endif
+}
+
+/*
* Get the number of wait events registered in a given WaitEventSet.
*/
int
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index 3aa7b338343..0dd79d73fa2 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -134,10 +134,11 @@ typedef struct Latch
/* avoid having to deal with case on platforms not requiring it */
#define WL_SOCKET_CONNECTED WL_SOCKET_WRITEABLE
#endif
-
+#define WL_SOCKET_CLOSED (1 << 7)
#define WL_SOCKET_MASK (WL_SOCKET_READABLE | \
WL_SOCKET_WRITEABLE | \
- WL_SOCKET_CONNECTED)
+ WL_SOCKET_CONNECTED | \
+ WL_SOCKET_CLOSED)
typedef struct WaitEvent
{
@@ -180,5 +181,6 @@ extern int WaitLatchOrSocket(Latch *latch, int wakeEvents,
pgsocket sock, long timeout, uint32 wait_event_info);
extern void InitializeLatchWaitSet(void);
extern int GetNumRegisteredWaitEvents(WaitEventSet *set);
+extern bool WaitEventSetCanReportClosed(void);
#endif /* LATCH_H */