diff options
Diffstat (limited to 'src/backend/storage/aio/method_worker.c')
-rw-r--r-- | src/backend/storage/aio/method_worker.c | 445 |
1 files changed, 440 insertions, 5 deletions
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c index 0ef9ef93e2b..b6fbcc68bb1 100644 --- a/src/backend/storage/aio/method_worker.c +++ b/src/backend/storage/aio/method_worker.c @@ -3,6 +3,21 @@ * method_worker.c * AIO - perform AIO using worker processes * + * IO workers consume IOs from a shared memory submission queue, run + * traditional synchronous system calls, and perform the shared completion + * handling immediately. Client code submits most requests by pushing IOs + * into the submission queue, and waits (if necessary) using condition + * variables. Some IOs cannot be performed in another process due to lack of + * infrastructure for reopening the file, and must processed synchronously by + * the client code when submitted. + * + * So that the submitter can make just one system call when submitting a batch + * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This + * could be improved by using futexes instead of latches to wake N waiters. + * + * This method of AIO is available in all builds on all operating systems, and + * is the default. + * * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * @@ -16,25 +31,339 @@ #include "libpq/pqsignal.h" #include "miscadmin.h" +#include "port/pg_bitutils.h" #include "postmaster/auxprocess.h" #include "postmaster/interrupt.h" +#include "storage/aio.h" +#include "storage/aio_internal.h" #include "storage/aio_subsys.h" #include "storage/io_worker.h" #include "storage/ipc.h" #include "storage/latch.h" #include "storage/proc.h" #include "tcop/tcopprot.h" +#include "utils/ps_status.h" #include "utils/wait_event.h" +/* How many workers should each worker wake up if needed? */ +#define IO_WORKER_WAKEUP_FANOUT 2 + + +typedef struct AioWorkerSubmissionQueue +{ + uint32 size; + uint32 mask; + uint32 head; + uint32 tail; + uint32 ios[FLEXIBLE_ARRAY_MEMBER]; +} AioWorkerSubmissionQueue; + +typedef struct AioWorkerSlot +{ + Latch *latch; + bool in_use; +} AioWorkerSlot; + +typedef struct AioWorkerControl +{ + uint64 idle_worker_mask; + AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]; +} AioWorkerControl; + + +static size_t pgaio_worker_shmem_size(void); +static void pgaio_worker_shmem_init(bool first_time); + +static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh); +static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios); + + +const IoMethodOps pgaio_worker_ops = { + .shmem_size = pgaio_worker_shmem_size, + .shmem_init = pgaio_worker_shmem_init, + + .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution, + .submit = pgaio_worker_submit, +}; + + /* GUCs */ int io_workers = 3; +static int io_worker_queue_size = 64; +static int MyIoWorkerId; +static AioWorkerSubmissionQueue *io_worker_submission_queue; +static AioWorkerControl *io_worker_control; + + +static size_t +pgaio_worker_queue_shmem_size(int *queue_size) +{ + /* Round size up to next power of two so we can make a mask. */ + *queue_size = pg_nextpower2_32(io_worker_queue_size); + + return offsetof(AioWorkerSubmissionQueue, ios) + + sizeof(uint32) * *queue_size; +} + +static size_t +pgaio_worker_control_shmem_size(void) +{ + return offsetof(AioWorkerControl, workers) + + sizeof(AioWorkerSlot) * MAX_IO_WORKERS; +} + +static size_t +pgaio_worker_shmem_size(void) +{ + size_t sz; + int queue_size; + + sz = pgaio_worker_queue_shmem_size(&queue_size); + sz = add_size(sz, pgaio_worker_control_shmem_size()); + + return sz; +} + +static void +pgaio_worker_shmem_init(bool first_time) +{ + bool found; + int queue_size; + + io_worker_submission_queue = + ShmemInitStruct("AioWorkerSubmissionQueue", + pgaio_worker_queue_shmem_size(&queue_size), + &found); + if (!found) + { + io_worker_submission_queue->size = queue_size; + io_worker_submission_queue->head = 0; + io_worker_submission_queue->tail = 0; + } + + io_worker_control = + ShmemInitStruct("AioWorkerControl", + pgaio_worker_control_shmem_size(), + &found); + if (!found) + { + io_worker_control->idle_worker_mask = 0; + for (int i = 0; i < MAX_IO_WORKERS; ++i) + { + io_worker_control->workers[i].latch = NULL; + io_worker_control->workers[i].in_use = false; + } + } +} + +static int +pgaio_choose_idle_worker(void) +{ + int worker; + + if (io_worker_control->idle_worker_mask == 0) + return -1; + + /* Find the lowest bit position, and clear it. */ + worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask); + io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker); + + return worker; +} + +static bool +pgaio_worker_submission_queue_insert(PgAioHandle *ioh) +{ + AioWorkerSubmissionQueue *queue; + uint32 new_head; + + queue = io_worker_submission_queue; + new_head = (queue->head + 1) & (queue->size - 1); + if (new_head == queue->tail) + { + pgaio_debug(DEBUG3, "io queue is full, at %u elements", + io_worker_submission_queue->size); + return false; /* full */ + } + + queue->ios[queue->head] = pgaio_io_get_id(ioh); + queue->head = new_head; + + return true; +} + +static uint32 +pgaio_worker_submission_queue_consume(void) +{ + AioWorkerSubmissionQueue *queue; + uint32 result; + + queue = io_worker_submission_queue; + if (queue->tail == queue->head) + return UINT32_MAX; /* empty */ + + result = queue->ios[queue->tail]; + queue->tail = (queue->tail + 1) & (queue->size - 1); + + return result; +} + +static uint32 +pgaio_worker_submission_queue_depth(void) +{ + uint32 head; + uint32 tail; + + head = io_worker_submission_queue->head; + tail = io_worker_submission_queue->tail; + + if (tail > head) + head += io_worker_submission_queue->size; + + Assert(head >= tail); + + return head - tail; +} + +static bool +pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh) +{ + return + !IsUnderPostmaster + || ioh->flags & PGAIO_HF_REFERENCES_LOCAL + || !pgaio_io_can_reopen(ioh); +} + +static void +pgaio_worker_submit_internal(int nios, PgAioHandle *ios[]) +{ + PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE]; + int nsync = 0; + Latch *wakeup = NULL; + int worker; + + Assert(nios <= PGAIO_SUBMIT_BATCH_SIZE); + + LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); + for (int i = 0; i < nios; ++i) + { + Assert(!pgaio_worker_needs_synchronous_execution(ios[i])); + if (!pgaio_worker_submission_queue_insert(ios[i])) + { + /* + * We'll do it synchronously, but only after we've sent as many as + * we can to workers, to maximize concurrency. + */ + synchronous_ios[nsync++] = ios[i]; + continue; + } + + if (wakeup == NULL) + { + /* Choose an idle worker to wake up if we haven't already. */ + worker = pgaio_choose_idle_worker(); + if (worker >= 0) + wakeup = io_worker_control->workers[worker].latch; + + pgaio_debug_io(DEBUG4, ios[i], + "choosing worker %d", + worker); + } + } + LWLockRelease(AioWorkerSubmissionQueueLock); + + if (wakeup) + SetLatch(wakeup); + + /* Run whatever is left synchronously. */ + if (nsync > 0) + { + for (int i = 0; i < nsync; ++i) + { + pgaio_io_perform_synchronously(synchronous_ios[i]); + } + } +} + +static int +pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios) +{ + for (int i = 0; i < num_staged_ios; i++) + { + PgAioHandle *ioh = staged_ios[i]; + + pgaio_io_prepare_submit(ioh); + } + + pgaio_worker_submit_internal(num_staged_ios, staged_ios); + + return num_staged_ios; +} + +/* + * on_shmem_exit() callback that releases the worker's slot in + * io_worker_control. + */ +static void +pgaio_worker_die(int code, Datum arg) +{ + LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); + Assert(io_worker_control->workers[MyIoWorkerId].in_use); + Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch); + + io_worker_control->workers[MyIoWorkerId].in_use = false; + io_worker_control->workers[MyIoWorkerId].latch = NULL; + LWLockRelease(AioWorkerSubmissionQueueLock); +} + +/* + * Register the worker in shared memory, assign MyWorkerId and register a + * shutdown callback to release registration. + */ +static void +pgaio_worker_register(void) +{ + MyIoWorkerId = -1; + + /* + * XXX: This could do with more fine-grained locking. But it's also not + * very common for the number of workers to change at the moment... + */ + LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); + + for (int i = 0; i < MAX_IO_WORKERS; ++i) + { + if (!io_worker_control->workers[i].in_use) + { + Assert(io_worker_control->workers[i].latch == NULL); + io_worker_control->workers[i].in_use = true; + MyIoWorkerId = i; + break; + } + else + Assert(io_worker_control->workers[i].latch != NULL); + } + + if (MyIoWorkerId == -1) + elog(ERROR, "couldn't find a free worker slot"); + + io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId); + io_worker_control->workers[MyIoWorkerId].latch = MyLatch; + LWLockRelease(AioWorkerSubmissionQueueLock); + + on_shmem_exit(pgaio_worker_die, 0); +} + void IoWorkerMain(const void *startup_data, size_t startup_data_len) { sigjmp_buf local_sigjmp_buf; + PgAioHandle *volatile error_ioh = NULL; + volatile int error_errno = 0; + char cmd[128]; MyBackendType = B_IO_WORKER; AuxiliaryProcessMainCommon(); @@ -53,6 +382,12 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) pqsignal(SIGUSR1, procsignal_sigusr1_handler); pqsignal(SIGUSR2, SignalHandlerForShutdownRequest); + /* also registers a shutdown callback to unregister */ + pgaio_worker_register(); + + sprintf(cmd, "io worker: %d", MyIoWorkerId); + set_ps_display(cmd); + /* see PostgresMain() */ if (sigsetjmp(local_sigjmp_buf, 1) != 0) { @@ -61,6 +396,27 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) EmitErrorReport(); + /* + * In the - very unlikely - case that the IO failed in a way that + * raises an error we need to mark the IO as failed. + * + * Need to do just enough error recovery so that we can mark the IO as + * failed and then exit (postmaster will start a new worker). + */ + LWLockReleaseAll(); + + if (error_ioh != NULL) + { + /* should never fail without setting error_errno */ + Assert(error_errno != 0); + + errno = error_errno; + + START_CRIT_SECTION(); + pgaio_io_process_completion(error_ioh, -error_errno); + END_CRIT_SECTION(); + } + proc_exit(1); } @@ -71,9 +427,89 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) while (!ShutdownRequestPending) { - WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1, - WAIT_EVENT_IO_WORKER_MAIN); - ResetLatch(MyLatch); + uint32 io_index; + Latch *latches[IO_WORKER_WAKEUP_FANOUT]; + int nlatches = 0; + int nwakeups = 0; + int worker; + + /* Try to get a job to do. */ + LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); + if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX) + { + /* + * Nothing to do. Mark self idle. + * + * XXX: Invent some kind of back pressure to reduce useless + * wakeups? + */ + io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId); + } + else + { + /* Got one. Clear idle flag. */ + io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId); + + /* See if we can wake up some peers. */ + nwakeups = Min(pgaio_worker_submission_queue_depth(), + IO_WORKER_WAKEUP_FANOUT); + for (int i = 0; i < nwakeups; ++i) + { + if ((worker = pgaio_choose_idle_worker()) < 0) + break; + latches[nlatches++] = io_worker_control->workers[worker].latch; + } + } + LWLockRelease(AioWorkerSubmissionQueueLock); + + for (int i = 0; i < nlatches; ++i) + SetLatch(latches[i]); + + if (io_index != UINT32_MAX) + { + PgAioHandle *ioh = NULL; + + ioh = &pgaio_ctl->io_handles[io_index]; + error_ioh = ioh; + + pgaio_debug_io(DEBUG4, ioh, + "worker %d processing IO", + MyIoWorkerId); + + /* + * It's very unlikely, but possible, that reopen fails. E.g. due + * to memory allocations failing or file permissions changing or + * such. In that case we need to fail the IO. + * + * There's not really a good errno we can report here. + */ + error_errno = ENOENT; + pgaio_io_reopen(ioh); + + /* + * To be able to exercise the reopen-fails path, allow injection + * points to trigger a failure at this point. + */ + pgaio_io_call_inj(ioh, "AIO_WORKER_AFTER_REOPEN"); + + error_errno = 0; + error_ioh = NULL; + + /* + * We don't expect this to ever fail with ERROR or FATAL, no need + * to keep error_ioh set to the IO. + * pgaio_io_perform_synchronously() contains a critical section to + * ensure we don't accidentally fail. + */ + pgaio_io_perform_synchronously(ioh); + } + else + { + WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1, + WAIT_EVENT_IO_WORKER_MAIN); + ResetLatch(MyLatch); + } + CHECK_FOR_INTERRUPTS(); } @@ -83,6 +519,5 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) bool pgaio_workers_enabled(void) { - /* placeholder for future commit */ - return false; + return io_method == IOMETHOD_WORKER; } |