aboutsummaryrefslogtreecommitdiff
path: root/src/fe_utils/parallel_slot.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fe_utils/parallel_slot.c')
-rw-r--r--src/fe_utils/parallel_slot.c407
1 files changed, 284 insertions, 123 deletions
diff --git a/src/fe_utils/parallel_slot.c b/src/fe_utils/parallel_slot.c
index b625deb2545..69581157c29 100644
--- a/src/fe_utils/parallel_slot.c
+++ b/src/fe_utils/parallel_slot.c
@@ -25,25 +25,16 @@
#include "common/logging.h"
#include "fe_utils/cancel.h"
#include "fe_utils/parallel_slot.h"
+#include "fe_utils/query_utils.h"
#define ERRCODE_UNDEFINED_TABLE "42P01"
-static void init_slot(ParallelSlot *slot, PGconn *conn);
static int select_loop(int maxFd, fd_set *workerset);
static bool processQueryResult(ParallelSlot *slot, PGresult *result);
-static void
-init_slot(ParallelSlot *slot, PGconn *conn)
-{
- slot->connection = conn;
- /* Initially assume connection is idle */
- slot->isFree = true;
- ParallelSlotClearHandler(slot);
-}
-
/*
* Process (and delete) a query result. Returns true if there's no problem,
- * false otherwise. It's up to the handler to decide what cosntitutes a
+ * false otherwise. It's up to the handler to decide what constitutes a
* problem.
*/
static bool
@@ -137,151 +128,316 @@ select_loop(int maxFd, fd_set *workerset)
}
/*
- * ParallelSlotsGetIdle
- * Return a connection slot that is ready to execute a command.
- *
- * This returns the first slot we find that is marked isFree, if one is;
- * otherwise, we loop on select() until one socket becomes available. When
- * this happens, we read the whole set and mark as free all sockets that
- * become available. If an error occurs, NULL is returned.
+ * Return the offset of a suitable idle slot, or -1 if none are available. If
+ * the given dbname is not null, only idle slots connected to the given
+ * database are considered suitable, otherwise all idle connected slots are
+ * considered suitable.
*/
-ParallelSlot *
-ParallelSlotsGetIdle(ParallelSlot *slots, int numslots)
+static int
+find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
{
int i;
- int firstFree = -1;
- /*
- * Look for any connection currently free. If there is one, mark it as
- * taken and let the caller know the slot to use.
- */
- for (i = 0; i < numslots; i++)
+ for (i = 0; i < sa->numslots; i++)
{
- if (slots[i].isFree)
- {
- slots[i].isFree = false;
- return slots + i;
- }
+ if (sa->slots[i].inUse)
+ continue;
+
+ if (sa->slots[i].connection == NULL)
+ continue;
+
+ if (dbname == NULL ||
+ strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
+ return i;
+ }
+ return -1;
+}
+
+/*
+ * Return the offset of the first slot without a database connection, or -1 if
+ * all slots are connected.
+ */
+static int
+find_unconnected_slot(const ParallelSlotArray *sa)
+{
+ int i;
+
+ for (i = 0; i < sa->numslots; i++)
+ {
+ if (sa->slots[i].inUse)
+ continue;
+
+ if (sa->slots[i].connection == NULL)
+ return i;
+ }
+
+ return -1;
+}
+
+/*
+ * Return the offset of the first idle slot, or -1 if all slots are busy.
+ */
+static int
+find_any_idle_slot(const ParallelSlotArray *sa)
+{
+ int i;
+
+ for (i = 0; i < sa->numslots; i++)
+ if (!sa->slots[i].inUse)
+ return i;
+
+ return -1;
+}
+
+/*
+ * Wait for any slot's connection to have query results, consume the results,
+ * and update the slot's status as appropriate. Returns true on success,
+ * false on cancellation, on error, or if no slots are connected.
+ */
+static bool
+wait_on_slots(ParallelSlotArray *sa)
+{
+ int i;
+ fd_set slotset;
+ int maxFd = 0;
+ PGconn *cancelconn = NULL;
+
+ /* We must reconstruct the fd_set for each call to select_loop */
+ FD_ZERO(&slotset);
+
+ for (i = 0; i < sa->numslots; i++)
+ {
+ int sock;
+
+ /* We shouldn't get here if we still have slots without connections */
+ Assert(sa->slots[i].connection != NULL);
+
+ sock = PQsocket(sa->slots[i].connection);
+
+ /*
+ * We don't really expect any connections to lose their sockets after
+ * startup, but just in case, cope by ignoring them.
+ */
+ if (sock < 0)
+ continue;
+
+ /* Keep track of the first valid connection we see. */
+ if (cancelconn == NULL)
+ cancelconn = sa->slots[i].connection;
+
+ FD_SET(sock, &slotset);
+ if (sock > maxFd)
+ maxFd = sock;
}
/*
- * No free slot found, so wait until one of the connections has finished
- * its task and return the available slot.
+ * If we get this far with no valid connections, processing cannot
+ * continue.
*/
- while (firstFree < 0)
+ if (cancelconn == NULL)
+ return false;
+
+ SetCancelConn(sa->slots->connection);
+ i = select_loop(maxFd, &slotset);
+ ResetCancelConn();
+
+ /* failure? */
+ if (i < 0)
+ return false;
+
+ for (i = 0; i < sa->numslots; i++)
{
- fd_set slotset;
- int maxFd = 0;
+ int sock;
- /* We must reconstruct the fd_set for each call to select_loop */
- FD_ZERO(&slotset);
+ sock = PQsocket(sa->slots[i].connection);
- for (i = 0; i < numslots; i++)
+ if (sock >= 0 && FD_ISSET(sock, &slotset))
{
- int sock = PQsocket(slots[i].connection);
-
- /*
- * We don't really expect any connections to lose their sockets
- * after startup, but just in case, cope by ignoring them.
- */
- if (sock < 0)
- continue;
-
- FD_SET(sock, &slotset);
- if (sock > maxFd)
- maxFd = sock;
+ /* select() says input is available, so consume it */
+ PQconsumeInput(sa->slots[i].connection);
}
- SetCancelConn(slots->connection);
- i = select_loop(maxFd, &slotset);
- ResetCancelConn();
-
- /* failure? */
- if (i < 0)
- return NULL;
-
- for (i = 0; i < numslots; i++)
+ /* Collect result(s) as long as any are available */
+ while (!PQisBusy(sa->slots[i].connection))
{
- int sock = PQsocket(slots[i].connection);
+ PGresult *result = PQgetResult(sa->slots[i].connection);
- if (sock >= 0 && FD_ISSET(sock, &slotset))
+ if (result != NULL)
{
- /* select() says input is available, so consume it */
- PQconsumeInput(slots[i].connection);
+ /* Handle and discard the command result */
+ if (!processQueryResult(&sa->slots[i], result))
+ return false;
}
-
- /* Collect result(s) as long as any are available */
- while (!PQisBusy(slots[i].connection))
+ else
{
- PGresult *result = PQgetResult(slots[i].connection);
-
- if (result != NULL)
- {
- /* Handle and discard the command result */
- if (!processQueryResult(slots + i, result))
- return NULL;
- }
- else
- {
- /* This connection has become idle */
- slots[i].isFree = true;
- ParallelSlotClearHandler(slots + i);
- if (firstFree < 0)
- firstFree = i;
- break;
- }
+ /* This connection has become idle */
+ sa->slots[i].inUse = false;
+ ParallelSlotClearHandler(&sa->slots[i]);
+ break;
}
}
}
+ return true;
+}
- slots[firstFree].isFree = false;
- return slots + firstFree;
+/*
+ * Open a new database connection using the stored connection parameters and
+ * optionally a given dbname if not null, execute the stored initial command if
+ * any, and associate the new connection with the given slot.
+ */
+static void
+connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
+{
+ const char *old_override;
+ ParallelSlot *slot = &sa->slots[slotno];
+
+ old_override = sa->cparams->override_dbname;
+ if (dbname)
+ sa->cparams->override_dbname = dbname;
+ slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
+ sa->cparams->override_dbname = old_override;
+
+ if (PQsocket(slot->connection) >= FD_SETSIZE)
+ {
+ pg_log_fatal("too many jobs for this platform");
+ exit(1);
+ }
+
+ /* Setup the connection using the supplied command, if any. */
+ if (sa->initcmd)
+ executeCommand(slot->connection, sa->initcmd, sa->echo);
}
/*
- * ParallelSlotsSetup
- * Prepare a set of parallel slots to use on a given database.
+ * ParallelSlotsGetIdle
+ * Return a connection slot that is ready to execute a command.
+ *
+ * The slot returned is chosen as follows:
+ *
+ * If any idle slot already has an open connection, and if either dbname is
+ * null or the existing connection is to the given database, that slot will be
+ * returned allowing the connection to be reused.
+ *
+ * Otherwise, if any idle slot is not yet connected to any database, the slot
+ * will be returned with it's connection opened using the stored cparams and
+ * optionally the given dbname if not null.
+ *
+ * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
+ * after having it's connection disconnected and reconnected using the stored
+ * cparams and optionally the given dbname if not null.
*
- * This creates and initializes a set of connections to the database
- * using the information given by the caller, marking all parallel slots
- * as free and ready to use. "conn" is an initial connection set up
- * by the caller and is associated with the first slot in the parallel
- * set.
+ * Otherwise, if any slots have connections that are busy, we loop on select()
+ * until one socket becomes available. When this happens, we read the whole
+ * set and mark as free all sockets that become available. We then select a
+ * slot using the same rules as above.
+ *
+ * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
+ *
+ * For any connection created, if the stored initcmd is not null, it will be
+ * executed as a command on the newly formed connection before the slot is
+ * returned.
+ *
+ * If an error occurs, NULL is returned.
*/
ParallelSlot *
-ParallelSlotsSetup(const ConnParams *cparams,
- const char *progname, bool echo,
- PGconn *conn, int numslots)
+ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
{
- ParallelSlot *slots;
- int i;
+ int offset;
- Assert(conn != NULL);
+ Assert(sa);
+ Assert(sa->numslots > 0);
- slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * numslots);
- init_slot(slots, conn);
- if (numslots > 1)
+ while (1)
{
- for (i = 1; i < numslots; i++)
+ /* First choice: a slot already connected to the desired database. */
+ offset = find_matching_idle_slot(sa, dbname);
+ if (offset >= 0)
{
- conn = connectDatabase(cparams, progname, echo, false, true);
-
- /*
- * Fail and exit immediately if trying to use a socket in an
- * unsupported range. POSIX requires open(2) to use the lowest
- * unused file descriptor and the hint given relies on that.
- */
- if (PQsocket(conn) >= FD_SETSIZE)
- {
- pg_log_fatal("too many jobs for this platform -- try %d", i);
- exit(1);
- }
+ sa->slots[offset].inUse = true;
+ return &sa->slots[offset];
+ }
+
+ /* Second choice: a slot not connected to any database. */
+ offset = find_unconnected_slot(sa);
+ if (offset >= 0)
+ {
+ connect_slot(sa, offset, dbname);
+ sa->slots[offset].inUse = true;
+ return &sa->slots[offset];
+ }
- init_slot(slots + i, conn);
+ /* Third choice: a slot connected to the wrong database. */
+ offset = find_any_idle_slot(sa);
+ if (offset >= 0)
+ {
+ disconnectDatabase(sa->slots[offset].connection);
+ sa->slots[offset].connection = NULL;
+ connect_slot(sa, offset, dbname);
+ sa->slots[offset].inUse = true;
+ return &sa->slots[offset];
}
+
+ /*
+ * Fourth choice: block until one or more slots become available. If
+ * any slots hit a fatal error, we'll find out about that here and
+ * return NULL.
+ */
+ if (!wait_on_slots(sa))
+ return NULL;
}
+}
+
+/*
+ * ParallelSlotsSetup
+ * Prepare a set of parallel slots but do not connect to any database.
+ *
+ * This creates and initializes a set of slots, marking all parallel slots as
+ * free and ready to use. Establishing connections is delayed until requesting
+ * a free slot. The cparams, progname, echo, and initcmd are stored for later
+ * use and must remain valid for the lifetime of the returned array.
+ */
+ParallelSlotArray *
+ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname,
+ bool echo, const char *initcmd)
+{
+ ParallelSlotArray *sa;
- return slots;
+ Assert(numslots > 0);
+ Assert(cparams != NULL);
+ Assert(progname != NULL);
+
+ sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) +
+ numslots * sizeof(ParallelSlot));
+
+ sa->numslots = numslots;
+ sa->cparams = cparams;
+ sa->progname = progname;
+ sa->echo = echo;
+ sa->initcmd = initcmd;
+
+ return sa;
+}
+
+/*
+ * ParallelSlotsAdoptConn
+ * Assign an open connection to the slots array for reuse.
+ *
+ * This turns over ownership of an open connection to a slots array. The
+ * caller should not further use or close the connection. All the connection's
+ * parameters (user, host, port, etc.) except possibly dbname should match
+ * those of the slots array's cparams, as given in ParallelSlotsSetup. If
+ * these parameters differ, subsequent behavior is undefined.
+ */
+void
+ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
+{
+ int offset;
+
+ offset = find_unconnected_slot(sa);
+ if (offset >= 0)
+ sa->slots[offset].connection = conn;
+ else
+ disconnectDatabase(conn);
}
/*
@@ -292,13 +448,13 @@ ParallelSlotsSetup(const ConnParams *cparams,
* terminate all connections.
*/
void
-ParallelSlotsTerminate(ParallelSlot *slots, int numslots)
+ParallelSlotsTerminate(ParallelSlotArray *sa)
{
int i;
- for (i = 0; i < numslots; i++)
+ for (i = 0; i < sa->numslots; i++)
{
- PGconn *conn = slots[i].connection;
+ PGconn *conn = sa->slots[i].connection;
if (conn == NULL)
continue;
@@ -314,13 +470,15 @@ ParallelSlotsTerminate(ParallelSlot *slots, int numslots)
* error has been found on the way.
*/
bool
-ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots)
+ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
{
int i;
- for (i = 0; i < numslots; i++)
+ for (i = 0; i < sa->numslots; i++)
{
- if (!consumeQueryResult(slots + i))
+ if (sa->slots[i].connection == NULL)
+ continue;
+ if (!consumeQueryResult(&sa->slots[i]))
return false;
}
@@ -350,6 +508,9 @@ ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots)
bool
TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
{
+ Assert(res != NULL);
+ Assert(conn != NULL);
+
/*
* If it's an error, report it. Errors about a missing table are harmless
* so we continue processing; but die for other errors.