diff options
Diffstat (limited to 'src/fe_utils/parallel_slot.c')
-rw-r--r-- | src/fe_utils/parallel_slot.c | 407 |
1 files changed, 284 insertions, 123 deletions
diff --git a/src/fe_utils/parallel_slot.c b/src/fe_utils/parallel_slot.c index b625deb2545..69581157c29 100644 --- a/src/fe_utils/parallel_slot.c +++ b/src/fe_utils/parallel_slot.c @@ -25,25 +25,16 @@ #include "common/logging.h" #include "fe_utils/cancel.h" #include "fe_utils/parallel_slot.h" +#include "fe_utils/query_utils.h" #define ERRCODE_UNDEFINED_TABLE "42P01" -static void init_slot(ParallelSlot *slot, PGconn *conn); static int select_loop(int maxFd, fd_set *workerset); static bool processQueryResult(ParallelSlot *slot, PGresult *result); -static void -init_slot(ParallelSlot *slot, PGconn *conn) -{ - slot->connection = conn; - /* Initially assume connection is idle */ - slot->isFree = true; - ParallelSlotClearHandler(slot); -} - /* * Process (and delete) a query result. Returns true if there's no problem, - * false otherwise. It's up to the handler to decide what cosntitutes a + * false otherwise. It's up to the handler to decide what constitutes a * problem. */ static bool @@ -137,151 +128,316 @@ select_loop(int maxFd, fd_set *workerset) } /* - * ParallelSlotsGetIdle - * Return a connection slot that is ready to execute a command. - * - * This returns the first slot we find that is marked isFree, if one is; - * otherwise, we loop on select() until one socket becomes available. When - * this happens, we read the whole set and mark as free all sockets that - * become available. If an error occurs, NULL is returned. + * Return the offset of a suitable idle slot, or -1 if none are available. If + * the given dbname is not null, only idle slots connected to the given + * database are considered suitable, otherwise all idle connected slots are + * considered suitable. */ -ParallelSlot * -ParallelSlotsGetIdle(ParallelSlot *slots, int numslots) +static int +find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname) { int i; - int firstFree = -1; - /* - * Look for any connection currently free. If there is one, mark it as - * taken and let the caller know the slot to use. - */ - for (i = 0; i < numslots; i++) + for (i = 0; i < sa->numslots; i++) { - if (slots[i].isFree) - { - slots[i].isFree = false; - return slots + i; - } + if (sa->slots[i].inUse) + continue; + + if (sa->slots[i].connection == NULL) + continue; + + if (dbname == NULL || + strcmp(PQdb(sa->slots[i].connection), dbname) == 0) + return i; + } + return -1; +} + +/* + * Return the offset of the first slot without a database connection, or -1 if + * all slots are connected. + */ +static int +find_unconnected_slot(const ParallelSlotArray *sa) +{ + int i; + + for (i = 0; i < sa->numslots; i++) + { + if (sa->slots[i].inUse) + continue; + + if (sa->slots[i].connection == NULL) + return i; + } + + return -1; +} + +/* + * Return the offset of the first idle slot, or -1 if all slots are busy. + */ +static int +find_any_idle_slot(const ParallelSlotArray *sa) +{ + int i; + + for (i = 0; i < sa->numslots; i++) + if (!sa->slots[i].inUse) + return i; + + return -1; +} + +/* + * Wait for any slot's connection to have query results, consume the results, + * and update the slot's status as appropriate. Returns true on success, + * false on cancellation, on error, or if no slots are connected. + */ +static bool +wait_on_slots(ParallelSlotArray *sa) +{ + int i; + fd_set slotset; + int maxFd = 0; + PGconn *cancelconn = NULL; + + /* We must reconstruct the fd_set for each call to select_loop */ + FD_ZERO(&slotset); + + for (i = 0; i < sa->numslots; i++) + { + int sock; + + /* We shouldn't get here if we still have slots without connections */ + Assert(sa->slots[i].connection != NULL); + + sock = PQsocket(sa->slots[i].connection); + + /* + * We don't really expect any connections to lose their sockets after + * startup, but just in case, cope by ignoring them. + */ + if (sock < 0) + continue; + + /* Keep track of the first valid connection we see. */ + if (cancelconn == NULL) + cancelconn = sa->slots[i].connection; + + FD_SET(sock, &slotset); + if (sock > maxFd) + maxFd = sock; } /* - * No free slot found, so wait until one of the connections has finished - * its task and return the available slot. + * If we get this far with no valid connections, processing cannot + * continue. */ - while (firstFree < 0) + if (cancelconn == NULL) + return false; + + SetCancelConn(sa->slots->connection); + i = select_loop(maxFd, &slotset); + ResetCancelConn(); + + /* failure? */ + if (i < 0) + return false; + + for (i = 0; i < sa->numslots; i++) { - fd_set slotset; - int maxFd = 0; + int sock; - /* We must reconstruct the fd_set for each call to select_loop */ - FD_ZERO(&slotset); + sock = PQsocket(sa->slots[i].connection); - for (i = 0; i < numslots; i++) + if (sock >= 0 && FD_ISSET(sock, &slotset)) { - int sock = PQsocket(slots[i].connection); - - /* - * We don't really expect any connections to lose their sockets - * after startup, but just in case, cope by ignoring them. - */ - if (sock < 0) - continue; - - FD_SET(sock, &slotset); - if (sock > maxFd) - maxFd = sock; + /* select() says input is available, so consume it */ + PQconsumeInput(sa->slots[i].connection); } - SetCancelConn(slots->connection); - i = select_loop(maxFd, &slotset); - ResetCancelConn(); - - /* failure? */ - if (i < 0) - return NULL; - - for (i = 0; i < numslots; i++) + /* Collect result(s) as long as any are available */ + while (!PQisBusy(sa->slots[i].connection)) { - int sock = PQsocket(slots[i].connection); + PGresult *result = PQgetResult(sa->slots[i].connection); - if (sock >= 0 && FD_ISSET(sock, &slotset)) + if (result != NULL) { - /* select() says input is available, so consume it */ - PQconsumeInput(slots[i].connection); + /* Handle and discard the command result */ + if (!processQueryResult(&sa->slots[i], result)) + return false; } - - /* Collect result(s) as long as any are available */ - while (!PQisBusy(slots[i].connection)) + else { - PGresult *result = PQgetResult(slots[i].connection); - - if (result != NULL) - { - /* Handle and discard the command result */ - if (!processQueryResult(slots + i, result)) - return NULL; - } - else - { - /* This connection has become idle */ - slots[i].isFree = true; - ParallelSlotClearHandler(slots + i); - if (firstFree < 0) - firstFree = i; - break; - } + /* This connection has become idle */ + sa->slots[i].inUse = false; + ParallelSlotClearHandler(&sa->slots[i]); + break; } } } + return true; +} - slots[firstFree].isFree = false; - return slots + firstFree; +/* + * Open a new database connection using the stored connection parameters and + * optionally a given dbname if not null, execute the stored initial command if + * any, and associate the new connection with the given slot. + */ +static void +connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname) +{ + const char *old_override; + ParallelSlot *slot = &sa->slots[slotno]; + + old_override = sa->cparams->override_dbname; + if (dbname) + sa->cparams->override_dbname = dbname; + slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true); + sa->cparams->override_dbname = old_override; + + if (PQsocket(slot->connection) >= FD_SETSIZE) + { + pg_log_fatal("too many jobs for this platform"); + exit(1); + } + + /* Setup the connection using the supplied command, if any. */ + if (sa->initcmd) + executeCommand(slot->connection, sa->initcmd, sa->echo); } /* - * ParallelSlotsSetup - * Prepare a set of parallel slots to use on a given database. + * ParallelSlotsGetIdle + * Return a connection slot that is ready to execute a command. + * + * The slot returned is chosen as follows: + * + * If any idle slot already has an open connection, and if either dbname is + * null or the existing connection is to the given database, that slot will be + * returned allowing the connection to be reused. + * + * Otherwise, if any idle slot is not yet connected to any database, the slot + * will be returned with it's connection opened using the stored cparams and + * optionally the given dbname if not null. + * + * Otherwise, if any idle slot exists, an idle slot will be chosen and returned + * after having it's connection disconnected and reconnected using the stored + * cparams and optionally the given dbname if not null. * - * This creates and initializes a set of connections to the database - * using the information given by the caller, marking all parallel slots - * as free and ready to use. "conn" is an initial connection set up - * by the caller and is associated with the first slot in the parallel - * set. + * Otherwise, if any slots have connections that are busy, we loop on select() + * until one socket becomes available. When this happens, we read the whole + * set and mark as free all sockets that become available. We then select a + * slot using the same rules as above. + * + * Otherwise, we cannot return a slot, which is an error, and NULL is returned. + * + * For any connection created, if the stored initcmd is not null, it will be + * executed as a command on the newly formed connection before the slot is + * returned. + * + * If an error occurs, NULL is returned. */ ParallelSlot * -ParallelSlotsSetup(const ConnParams *cparams, - const char *progname, bool echo, - PGconn *conn, int numslots) +ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname) { - ParallelSlot *slots; - int i; + int offset; - Assert(conn != NULL); + Assert(sa); + Assert(sa->numslots > 0); - slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * numslots); - init_slot(slots, conn); - if (numslots > 1) + while (1) { - for (i = 1; i < numslots; i++) + /* First choice: a slot already connected to the desired database. */ + offset = find_matching_idle_slot(sa, dbname); + if (offset >= 0) { - conn = connectDatabase(cparams, progname, echo, false, true); - - /* - * Fail and exit immediately if trying to use a socket in an - * unsupported range. POSIX requires open(2) to use the lowest - * unused file descriptor and the hint given relies on that. - */ - if (PQsocket(conn) >= FD_SETSIZE) - { - pg_log_fatal("too many jobs for this platform -- try %d", i); - exit(1); - } + sa->slots[offset].inUse = true; + return &sa->slots[offset]; + } + + /* Second choice: a slot not connected to any database. */ + offset = find_unconnected_slot(sa); + if (offset >= 0) + { + connect_slot(sa, offset, dbname); + sa->slots[offset].inUse = true; + return &sa->slots[offset]; + } - init_slot(slots + i, conn); + /* Third choice: a slot connected to the wrong database. */ + offset = find_any_idle_slot(sa); + if (offset >= 0) + { + disconnectDatabase(sa->slots[offset].connection); + sa->slots[offset].connection = NULL; + connect_slot(sa, offset, dbname); + sa->slots[offset].inUse = true; + return &sa->slots[offset]; } + + /* + * Fourth choice: block until one or more slots become available. If + * any slots hit a fatal error, we'll find out about that here and + * return NULL. + */ + if (!wait_on_slots(sa)) + return NULL; } +} + +/* + * ParallelSlotsSetup + * Prepare a set of parallel slots but do not connect to any database. + * + * This creates and initializes a set of slots, marking all parallel slots as + * free and ready to use. Establishing connections is delayed until requesting + * a free slot. The cparams, progname, echo, and initcmd are stored for later + * use and must remain valid for the lifetime of the returned array. + */ +ParallelSlotArray * +ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname, + bool echo, const char *initcmd) +{ + ParallelSlotArray *sa; - return slots; + Assert(numslots > 0); + Assert(cparams != NULL); + Assert(progname != NULL); + + sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) + + numslots * sizeof(ParallelSlot)); + + sa->numslots = numslots; + sa->cparams = cparams; + sa->progname = progname; + sa->echo = echo; + sa->initcmd = initcmd; + + return sa; +} + +/* + * ParallelSlotsAdoptConn + * Assign an open connection to the slots array for reuse. + * + * This turns over ownership of an open connection to a slots array. The + * caller should not further use or close the connection. All the connection's + * parameters (user, host, port, etc.) except possibly dbname should match + * those of the slots array's cparams, as given in ParallelSlotsSetup. If + * these parameters differ, subsequent behavior is undefined. + */ +void +ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn) +{ + int offset; + + offset = find_unconnected_slot(sa); + if (offset >= 0) + sa->slots[offset].connection = conn; + else + disconnectDatabase(conn); } /* @@ -292,13 +448,13 @@ ParallelSlotsSetup(const ConnParams *cparams, * terminate all connections. */ void -ParallelSlotsTerminate(ParallelSlot *slots, int numslots) +ParallelSlotsTerminate(ParallelSlotArray *sa) { int i; - for (i = 0; i < numslots; i++) + for (i = 0; i < sa->numslots; i++) { - PGconn *conn = slots[i].connection; + PGconn *conn = sa->slots[i].connection; if (conn == NULL) continue; @@ -314,13 +470,15 @@ ParallelSlotsTerminate(ParallelSlot *slots, int numslots) * error has been found on the way. */ bool -ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots) +ParallelSlotsWaitCompletion(ParallelSlotArray *sa) { int i; - for (i = 0; i < numslots; i++) + for (i = 0; i < sa->numslots; i++) { - if (!consumeQueryResult(slots + i)) + if (sa->slots[i].connection == NULL) + continue; + if (!consumeQueryResult(&sa->slots[i])) return false; } @@ -350,6 +508,9 @@ ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots) bool TableCommandResultHandler(PGresult *res, PGconn *conn, void *context) { + Assert(res != NULL); + Assert(conn != NULL); + /* * If it's an error, report it. Errors about a missing table are harmless * so we continue processing; but die for other errors. |