aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/aio/method_worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/aio/method_worker.c')
-rw-r--r--src/backend/storage/aio/method_worker.c445
1 files changed, 440 insertions, 5 deletions
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index 0ef9ef93e2b..b6fbcc68bb1 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -3,6 +3,21 @@
* method_worker.c
* AIO - perform AIO using worker processes
*
+ * IO workers consume IOs from a shared memory submission queue, run
+ * traditional synchronous system calls, and perform the shared completion
+ * handling immediately. Client code submits most requests by pushing IOs
+ * into the submission queue, and waits (if necessary) using condition
+ * variables. Some IOs cannot be performed in another process due to lack of
+ * infrastructure for reopening the file, and must processed synchronously by
+ * the client code when submitted.
+ *
+ * So that the submitter can make just one system call when submitting a batch
+ * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
+ * could be improved by using futexes instead of latches to wake N waiters.
+ *
+ * This method of AIO is available in all builds on all operating systems, and
+ * is the default.
+ *
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
@@ -16,25 +31,339 @@
#include "libpq/pqsignal.h"
#include "miscadmin.h"
+#include "port/pg_bitutils.h"
#include "postmaster/auxprocess.h"
#include "postmaster/interrupt.h"
+#include "storage/aio.h"
+#include "storage/aio_internal.h"
#include "storage/aio_subsys.h"
#include "storage/io_worker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
+#include "utils/ps_status.h"
#include "utils/wait_event.h"
+/* How many workers should each worker wake up if needed? */
+#define IO_WORKER_WAKEUP_FANOUT 2
+
+
+typedef struct AioWorkerSubmissionQueue
+{
+ uint32 size;
+ uint32 mask;
+ uint32 head;
+ uint32 tail;
+ uint32 ios[FLEXIBLE_ARRAY_MEMBER];
+} AioWorkerSubmissionQueue;
+
+typedef struct AioWorkerSlot
+{
+ Latch *latch;
+ bool in_use;
+} AioWorkerSlot;
+
+typedef struct AioWorkerControl
+{
+ uint64 idle_worker_mask;
+ AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
+} AioWorkerControl;
+
+
+static size_t pgaio_worker_shmem_size(void);
+static void pgaio_worker_shmem_init(bool first_time);
+
+static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
+static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
+
+
+const IoMethodOps pgaio_worker_ops = {
+ .shmem_size = pgaio_worker_shmem_size,
+ .shmem_init = pgaio_worker_shmem_init,
+
+ .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
+ .submit = pgaio_worker_submit,
+};
+
+
/* GUCs */
int io_workers = 3;
+static int io_worker_queue_size = 64;
+static int MyIoWorkerId;
+static AioWorkerSubmissionQueue *io_worker_submission_queue;
+static AioWorkerControl *io_worker_control;
+
+
+static size_t
+pgaio_worker_queue_shmem_size(int *queue_size)
+{
+ /* Round size up to next power of two so we can make a mask. */
+ *queue_size = pg_nextpower2_32(io_worker_queue_size);
+
+ return offsetof(AioWorkerSubmissionQueue, ios) +
+ sizeof(uint32) * *queue_size;
+}
+
+static size_t
+pgaio_worker_control_shmem_size(void)
+{
+ return offsetof(AioWorkerControl, workers) +
+ sizeof(AioWorkerSlot) * MAX_IO_WORKERS;
+}
+
+static size_t
+pgaio_worker_shmem_size(void)
+{
+ size_t sz;
+ int queue_size;
+
+ sz = pgaio_worker_queue_shmem_size(&queue_size);
+ sz = add_size(sz, pgaio_worker_control_shmem_size());
+
+ return sz;
+}
+
+static void
+pgaio_worker_shmem_init(bool first_time)
+{
+ bool found;
+ int queue_size;
+
+ io_worker_submission_queue =
+ ShmemInitStruct("AioWorkerSubmissionQueue",
+ pgaio_worker_queue_shmem_size(&queue_size),
+ &found);
+ if (!found)
+ {
+ io_worker_submission_queue->size = queue_size;
+ io_worker_submission_queue->head = 0;
+ io_worker_submission_queue->tail = 0;
+ }
+
+ io_worker_control =
+ ShmemInitStruct("AioWorkerControl",
+ pgaio_worker_control_shmem_size(),
+ &found);
+ if (!found)
+ {
+ io_worker_control->idle_worker_mask = 0;
+ for (int i = 0; i < MAX_IO_WORKERS; ++i)
+ {
+ io_worker_control->workers[i].latch = NULL;
+ io_worker_control->workers[i].in_use = false;
+ }
+ }
+}
+
+static int
+pgaio_choose_idle_worker(void)
+{
+ int worker;
+
+ if (io_worker_control->idle_worker_mask == 0)
+ return -1;
+
+ /* Find the lowest bit position, and clear it. */
+ worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
+ io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
+
+ return worker;
+}
+
+static bool
+pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
+{
+ AioWorkerSubmissionQueue *queue;
+ uint32 new_head;
+
+ queue = io_worker_submission_queue;
+ new_head = (queue->head + 1) & (queue->size - 1);
+ if (new_head == queue->tail)
+ {
+ pgaio_debug(DEBUG3, "io queue is full, at %u elements",
+ io_worker_submission_queue->size);
+ return false; /* full */
+ }
+
+ queue->ios[queue->head] = pgaio_io_get_id(ioh);
+ queue->head = new_head;
+
+ return true;
+}
+
+static uint32
+pgaio_worker_submission_queue_consume(void)
+{
+ AioWorkerSubmissionQueue *queue;
+ uint32 result;
+
+ queue = io_worker_submission_queue;
+ if (queue->tail == queue->head)
+ return UINT32_MAX; /* empty */
+
+ result = queue->ios[queue->tail];
+ queue->tail = (queue->tail + 1) & (queue->size - 1);
+
+ return result;
+}
+
+static uint32
+pgaio_worker_submission_queue_depth(void)
+{
+ uint32 head;
+ uint32 tail;
+
+ head = io_worker_submission_queue->head;
+ tail = io_worker_submission_queue->tail;
+
+ if (tail > head)
+ head += io_worker_submission_queue->size;
+
+ Assert(head >= tail);
+
+ return head - tail;
+}
+
+static bool
+pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
+{
+ return
+ !IsUnderPostmaster
+ || ioh->flags & PGAIO_HF_REFERENCES_LOCAL
+ || !pgaio_io_can_reopen(ioh);
+}
+
+static void
+pgaio_worker_submit_internal(int nios, PgAioHandle *ios[])
+{
+ PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
+ int nsync = 0;
+ Latch *wakeup = NULL;
+ int worker;
+
+ Assert(nios <= PGAIO_SUBMIT_BATCH_SIZE);
+
+ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+ for (int i = 0; i < nios; ++i)
+ {
+ Assert(!pgaio_worker_needs_synchronous_execution(ios[i]));
+ if (!pgaio_worker_submission_queue_insert(ios[i]))
+ {
+ /*
+ * We'll do it synchronously, but only after we've sent as many as
+ * we can to workers, to maximize concurrency.
+ */
+ synchronous_ios[nsync++] = ios[i];
+ continue;
+ }
+
+ if (wakeup == NULL)
+ {
+ /* Choose an idle worker to wake up if we haven't already. */
+ worker = pgaio_choose_idle_worker();
+ if (worker >= 0)
+ wakeup = io_worker_control->workers[worker].latch;
+
+ pgaio_debug_io(DEBUG4, ios[i],
+ "choosing worker %d",
+ worker);
+ }
+ }
+ LWLockRelease(AioWorkerSubmissionQueueLock);
+
+ if (wakeup)
+ SetLatch(wakeup);
+
+ /* Run whatever is left synchronously. */
+ if (nsync > 0)
+ {
+ for (int i = 0; i < nsync; ++i)
+ {
+ pgaio_io_perform_synchronously(synchronous_ios[i]);
+ }
+ }
+}
+
+static int
+pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
+{
+ for (int i = 0; i < num_staged_ios; i++)
+ {
+ PgAioHandle *ioh = staged_ios[i];
+
+ pgaio_io_prepare_submit(ioh);
+ }
+
+ pgaio_worker_submit_internal(num_staged_ios, staged_ios);
+
+ return num_staged_ios;
+}
+
+/*
+ * on_shmem_exit() callback that releases the worker's slot in
+ * io_worker_control.
+ */
+static void
+pgaio_worker_die(int code, Datum arg)
+{
+ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+ Assert(io_worker_control->workers[MyIoWorkerId].in_use);
+ Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
+
+ io_worker_control->workers[MyIoWorkerId].in_use = false;
+ io_worker_control->workers[MyIoWorkerId].latch = NULL;
+ LWLockRelease(AioWorkerSubmissionQueueLock);
+}
+
+/*
+ * Register the worker in shared memory, assign MyWorkerId and register a
+ * shutdown callback to release registration.
+ */
+static void
+pgaio_worker_register(void)
+{
+ MyIoWorkerId = -1;
+
+ /*
+ * XXX: This could do with more fine-grained locking. But it's also not
+ * very common for the number of workers to change at the moment...
+ */
+ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+
+ for (int i = 0; i < MAX_IO_WORKERS; ++i)
+ {
+ if (!io_worker_control->workers[i].in_use)
+ {
+ Assert(io_worker_control->workers[i].latch == NULL);
+ io_worker_control->workers[i].in_use = true;
+ MyIoWorkerId = i;
+ break;
+ }
+ else
+ Assert(io_worker_control->workers[i].latch != NULL);
+ }
+
+ if (MyIoWorkerId == -1)
+ elog(ERROR, "couldn't find a free worker slot");
+
+ io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
+ io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
+ LWLockRelease(AioWorkerSubmissionQueueLock);
+
+ on_shmem_exit(pgaio_worker_die, 0);
+}
+
void
IoWorkerMain(const void *startup_data, size_t startup_data_len)
{
sigjmp_buf local_sigjmp_buf;
+ PgAioHandle *volatile error_ioh = NULL;
+ volatile int error_errno = 0;
+ char cmd[128];
MyBackendType = B_IO_WORKER;
AuxiliaryProcessMainCommon();
@@ -53,6 +382,12 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
+ /* also registers a shutdown callback to unregister */
+ pgaio_worker_register();
+
+ sprintf(cmd, "io worker: %d", MyIoWorkerId);
+ set_ps_display(cmd);
+
/* see PostgresMain() */
if (sigsetjmp(local_sigjmp_buf, 1) != 0)
{
@@ -61,6 +396,27 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
EmitErrorReport();
+ /*
+ * In the - very unlikely - case that the IO failed in a way that
+ * raises an error we need to mark the IO as failed.
+ *
+ * Need to do just enough error recovery so that we can mark the IO as
+ * failed and then exit (postmaster will start a new worker).
+ */
+ LWLockReleaseAll();
+
+ if (error_ioh != NULL)
+ {
+ /* should never fail without setting error_errno */
+ Assert(error_errno != 0);
+
+ errno = error_errno;
+
+ START_CRIT_SECTION();
+ pgaio_io_process_completion(error_ioh, -error_errno);
+ END_CRIT_SECTION();
+ }
+
proc_exit(1);
}
@@ -71,9 +427,89 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
while (!ShutdownRequestPending)
{
- WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
- WAIT_EVENT_IO_WORKER_MAIN);
- ResetLatch(MyLatch);
+ uint32 io_index;
+ Latch *latches[IO_WORKER_WAKEUP_FANOUT];
+ int nlatches = 0;
+ int nwakeups = 0;
+ int worker;
+
+ /* Try to get a job to do. */
+ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+ if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX)
+ {
+ /*
+ * Nothing to do. Mark self idle.
+ *
+ * XXX: Invent some kind of back pressure to reduce useless
+ * wakeups?
+ */
+ io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
+ }
+ else
+ {
+ /* Got one. Clear idle flag. */
+ io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
+
+ /* See if we can wake up some peers. */
+ nwakeups = Min(pgaio_worker_submission_queue_depth(),
+ IO_WORKER_WAKEUP_FANOUT);
+ for (int i = 0; i < nwakeups; ++i)
+ {
+ if ((worker = pgaio_choose_idle_worker()) < 0)
+ break;
+ latches[nlatches++] = io_worker_control->workers[worker].latch;
+ }
+ }
+ LWLockRelease(AioWorkerSubmissionQueueLock);
+
+ for (int i = 0; i < nlatches; ++i)
+ SetLatch(latches[i]);
+
+ if (io_index != UINT32_MAX)
+ {
+ PgAioHandle *ioh = NULL;
+
+ ioh = &pgaio_ctl->io_handles[io_index];
+ error_ioh = ioh;
+
+ pgaio_debug_io(DEBUG4, ioh,
+ "worker %d processing IO",
+ MyIoWorkerId);
+
+ /*
+ * It's very unlikely, but possible, that reopen fails. E.g. due
+ * to memory allocations failing or file permissions changing or
+ * such. In that case we need to fail the IO.
+ *
+ * There's not really a good errno we can report here.
+ */
+ error_errno = ENOENT;
+ pgaio_io_reopen(ioh);
+
+ /*
+ * To be able to exercise the reopen-fails path, allow injection
+ * points to trigger a failure at this point.
+ */
+ pgaio_io_call_inj(ioh, "AIO_WORKER_AFTER_REOPEN");
+
+ error_errno = 0;
+ error_ioh = NULL;
+
+ /*
+ * We don't expect this to ever fail with ERROR or FATAL, no need
+ * to keep error_ioh set to the IO.
+ * pgaio_io_perform_synchronously() contains a critical section to
+ * ensure we don't accidentally fail.
+ */
+ pgaio_io_perform_synchronously(ioh);
+ }
+ else
+ {
+ WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
+ WAIT_EVENT_IO_WORKER_MAIN);
+ ResetLatch(MyLatch);
+ }
+
CHECK_FOR_INTERRUPTS();
}
@@ -83,6 +519,5 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
bool
pgaio_workers_enabled(void)
{
- /* placeholder for future commit */
- return false;
+ return io_method == IOMETHOD_WORKER;
}