diff options
Diffstat (limited to 'src/bin/scripts/scripts_parallel.c')
-rw-r--r-- | src/bin/scripts/scripts_parallel.c | 284 |
1 files changed, 0 insertions, 284 deletions
diff --git a/src/bin/scripts/scripts_parallel.c b/src/bin/scripts/scripts_parallel.c deleted file mode 100644 index 1f863a1bb46..00000000000 --- a/src/bin/scripts/scripts_parallel.c +++ /dev/null @@ -1,284 +0,0 @@ -/*------------------------------------------------------------------------- - * - * scripts_parallel.c - * Parallel support for bin/scripts/ - * - * - * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group - * Portions Copyright (c) 1994, Regents of the University of California - * - * src/bin/scripts/scripts_parallel.c - * - *------------------------------------------------------------------------- - */ - -#ifdef WIN32 -#define FD_SETSIZE 1024 /* must set before winsock2.h is included */ -#endif - -#include "postgres_fe.h" - -#ifdef HAVE_SYS_SELECT_H -#include <sys/select.h> -#endif - -#include "common.h" -#include "common/logging.h" -#include "fe_utils/cancel.h" -#include "scripts_parallel.h" - -static void init_slot(ParallelSlot *slot, PGconn *conn); -static int select_loop(int maxFd, fd_set *workerset); - -static void -init_slot(ParallelSlot *slot, PGconn *conn) -{ - slot->connection = conn; - /* Initially assume connection is idle */ - slot->isFree = true; -} - -/* - * Wait until a file descriptor from the given set becomes readable. - * - * Returns the number of ready descriptors, or -1 on failure (including - * getting a cancel request). - */ -static int -select_loop(int maxFd, fd_set *workerset) -{ - int i; - fd_set saveSet = *workerset; - - if (CancelRequested) - return -1; - - for (;;) - { - /* - * On Windows, we need to check once in a while for cancel requests; - * on other platforms we rely on select() returning when interrupted. - */ - struct timeval *tvp; -#ifdef WIN32 - struct timeval tv = {0, 1000000}; - - tvp = &tv; -#else - tvp = NULL; -#endif - - *workerset = saveSet; - i = select(maxFd + 1, workerset, NULL, NULL, tvp); - -#ifdef WIN32 - if (i == SOCKET_ERROR) - { - i = -1; - - if (WSAGetLastError() == WSAEINTR) - errno = EINTR; - } -#endif - - if (i < 0 && errno == EINTR) - continue; /* ignore this */ - if (i < 0 || CancelRequested) - return -1; /* but not this */ - if (i == 0) - continue; /* timeout (Win32 only) */ - break; - } - - return i; -} - -/* - * ParallelSlotsGetIdle - * Return a connection slot that is ready to execute a command. - * - * This returns the first slot we find that is marked isFree, if one is; - * otherwise, we loop on select() until one socket becomes available. When - * this happens, we read the whole set and mark as free all sockets that - * become available. If an error occurs, NULL is returned. - */ -ParallelSlot * -ParallelSlotsGetIdle(ParallelSlot *slots, int numslots) -{ - int i; - int firstFree = -1; - - /* - * Look for any connection currently free. If there is one, mark it as - * taken and let the caller know the slot to use. - */ - for (i = 0; i < numslots; i++) - { - if (slots[i].isFree) - { - slots[i].isFree = false; - return slots + i; - } - } - - /* - * No free slot found, so wait until one of the connections has finished - * its task and return the available slot. - */ - while (firstFree < 0) - { - fd_set slotset; - int maxFd = 0; - - /* We must reconstruct the fd_set for each call to select_loop */ - FD_ZERO(&slotset); - - for (i = 0; i < numslots; i++) - { - int sock = PQsocket(slots[i].connection); - - /* - * We don't really expect any connections to lose their sockets - * after startup, but just in case, cope by ignoring them. - */ - if (sock < 0) - continue; - - FD_SET(sock, &slotset); - if (sock > maxFd) - maxFd = sock; - } - - SetCancelConn(slots->connection); - i = select_loop(maxFd, &slotset); - ResetCancelConn(); - - /* failure? */ - if (i < 0) - return NULL; - - for (i = 0; i < numslots; i++) - { - int sock = PQsocket(slots[i].connection); - - if (sock >= 0 && FD_ISSET(sock, &slotset)) - { - /* select() says input is available, so consume it */ - PQconsumeInput(slots[i].connection); - } - - /* Collect result(s) as long as any are available */ - while (!PQisBusy(slots[i].connection)) - { - PGresult *result = PQgetResult(slots[i].connection); - - if (result != NULL) - { - /* Check and discard the command result */ - if (!processQueryResult(slots[i].connection, result)) - return NULL; - } - else - { - /* This connection has become idle */ - slots[i].isFree = true; - if (firstFree < 0) - firstFree = i; - break; - } - } - } - } - - slots[firstFree].isFree = false; - return slots + firstFree; -} - -/* - * ParallelSlotsSetup - * Prepare a set of parallel slots to use on a given database. - * - * This creates and initializes a set of connections to the database - * using the information given by the caller, marking all parallel slots - * as free and ready to use. "conn" is an initial connection set up - * by the caller and is associated with the first slot in the parallel - * set. - */ -ParallelSlot * -ParallelSlotsSetup(const ConnParams *cparams, - const char *progname, bool echo, - PGconn *conn, int numslots) -{ - ParallelSlot *slots; - int i; - - Assert(conn != NULL); - - slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * numslots); - init_slot(slots, conn); - if (numslots > 1) - { - for (i = 1; i < numslots; i++) - { - conn = connectDatabase(cparams, progname, echo, false, true); - - /* - * Fail and exit immediately if trying to use a socket in an - * unsupported range. POSIX requires open(2) to use the lowest - * unused file descriptor and the hint given relies on that. - */ - if (PQsocket(conn) >= FD_SETSIZE) - { - pg_log_fatal("too many jobs for this platform -- try %d", i); - exit(1); - } - - init_slot(slots + i, conn); - } - } - - return slots; -} - -/* - * ParallelSlotsTerminate - * Clean up a set of parallel slots - * - * Iterate through all connections in a given set of ParallelSlots and - * terminate all connections. - */ -void -ParallelSlotsTerminate(ParallelSlot *slots, int numslots) -{ - int i; - - for (i = 0; i < numslots; i++) - { - PGconn *conn = slots[i].connection; - - if (conn == NULL) - continue; - - disconnectDatabase(conn); - } -} - -/* - * ParallelSlotsWaitCompletion - * - * Wait for all connections to finish, returning false if at least one - * error has been found on the way. - */ -bool -ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots) -{ - int i; - - for (i = 0; i < numslots; i++) - { - if (!consumeQueryResult((slots + i)->connection)) - return false; - } - - return true; -} |