diff options
Diffstat (limited to 'src/backend/storage/ipc/latch.c')
-rw-r--r-- | src/backend/storage/ipc/latch.c | 79 |
1 files changed, 70 insertions, 9 deletions
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index 5bb609b368d..c3aaa8bff03 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -840,6 +840,7 @@ FreeWaitEventSet(WaitEventSet *set) * - WL_SOCKET_CONNECTED: Wait for socket connection to be established, * can be combined with other WL_SOCKET_* events (on non-Windows * platforms, this is the same as WL_SOCKET_WRITEABLE) + * - WL_SOCKET_CLOSED: Wait for socket to be closed by remote peer. * - WL_EXIT_ON_PM_DEATH: Exit immediately if the postmaster dies * * Returns the offset in WaitEventSet->events (starting from 0), which can be @@ -1042,12 +1043,16 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action) else { Assert(event->fd != PGINVALID_SOCKET); - Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)); + Assert(event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)); if (event->events & WL_SOCKET_READABLE) epoll_ev.events |= EPOLLIN; if (event->events & WL_SOCKET_WRITEABLE) epoll_ev.events |= EPOLLOUT; + if (event->events & WL_SOCKET_CLOSED) + epoll_ev.events |= EPOLLRDHUP; } /* @@ -1086,12 +1091,18 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event) } else { - Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)); + Assert(event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)); pollfd->events = 0; if (event->events & WL_SOCKET_READABLE) pollfd->events |= POLLIN; if (event->events & WL_SOCKET_WRITEABLE) pollfd->events |= POLLOUT; +#ifdef POLLRDHUP + if (event->events & WL_SOCKET_CLOSED) + pollfd->events |= POLLRDHUP; +#endif } Assert(event->fd != PGINVALID_SOCKET); @@ -1164,7 +1175,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) Assert(event->events != WL_LATCH_SET || set->latch != NULL); Assert(event->events == WL_LATCH_SET || event->events == WL_POSTMASTER_DEATH || - (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))); + (event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED))); if (event->events == WL_POSTMASTER_DEATH) { @@ -1187,9 +1200,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) * old event mask to the new event mask, since kevent treats readable * and writable as separate events. */ - if (old_events & WL_SOCKET_READABLE) + if (old_events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED)) old_filt_read = true; - if (event->events & WL_SOCKET_READABLE) + if (event->events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED)) new_filt_read = true; if (old_events & WL_SOCKET_WRITEABLE) old_filt_write = true; @@ -1209,7 +1222,10 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) event); } - Assert(count > 0); + /* For WL_SOCKET_READ -> WL_SOCKET_CLOSED, no change needed. */ + if (count == 0) + return; + Assert(count <= 2); rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL); @@ -1524,7 +1540,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, returned_events++; } } - else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + else if (cur_event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)) { Assert(cur_event->fd != PGINVALID_SOCKET); @@ -1542,6 +1560,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, occurred_events->events |= WL_SOCKET_WRITEABLE; } + if ((cur_event->events & WL_SOCKET_CLOSED) && + (cur_epoll_event->events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP))) + { + /* remote peer shut down, or error */ + occurred_events->events |= WL_SOCKET_CLOSED; + } + if (occurred_events->events != 0) { occurred_events->fd = cur_event->fd; @@ -1667,7 +1692,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, occurred_events++; returned_events++; } - else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + else if (cur_event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)) { Assert(cur_event->fd >= 0); @@ -1678,6 +1705,14 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, occurred_events->events |= WL_SOCKET_READABLE; } + if ((cur_event->events & WL_SOCKET_CLOSED) && + (cur_kqueue_event->filter == EVFILT_READ) && + (cur_kqueue_event->flags & EV_EOF)) + { + /* the remote peer has shut down */ + occurred_events->events |= WL_SOCKET_CLOSED; + } + if ((cur_event->events & WL_SOCKET_WRITEABLE) && (cur_kqueue_event->filter == EVFILT_WRITE)) { @@ -1788,7 +1823,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, returned_events++; } } - else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) + else if (cur_event->events & (WL_SOCKET_READABLE | + WL_SOCKET_WRITEABLE | + WL_SOCKET_CLOSED)) { int errflags = POLLHUP | POLLERR | POLLNVAL; @@ -1808,6 +1845,15 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, occurred_events->events |= WL_SOCKET_WRITEABLE; } +#ifdef POLLRDHUP + if ((cur_event->events & WL_SOCKET_CLOSED) && + (cur_pollfd->revents & (POLLRDHUP | errflags))) + { + /* remote peer closed, or error */ + occurred_events->events |= WL_SOCKET_CLOSED; + } +#endif + if (occurred_events->events != 0) { occurred_events->fd = cur_event->fd; @@ -2015,6 +2061,21 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, #endif /* + * Return whether the current build options can report WL_SOCKET_CLOSED. + */ +bool +WaitEventSetCanReportClosed(void) +{ +#if (defined(WAIT_USE_POLL) && defined(POLLRDHUP)) || \ + defined(WAIT_USE_EPOLL) || \ + defined(WAIT_USE_KQUEUE) + return true; +#else + return false; +#endif +} + +/* * Get the number of wait events registered in a given WaitEventSet. */ int |