aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/aio/aio_funcs.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/aio/aio_funcs.c')
-rw-r--r--src/backend/storage/aio/aio_funcs.c230
1 files changed, 230 insertions, 0 deletions
diff --git a/src/backend/storage/aio/aio_funcs.c b/src/backend/storage/aio/aio_funcs.c
new file mode 100644
index 00000000000..584e683371a
--- /dev/null
+++ b/src/backend/storage/aio/aio_funcs.c
@@ -0,0 +1,230 @@
+/*-------------------------------------------------------------------------
+ *
+ * aio_funcs.c
+ * AIO - SQL interface for AIO
+ *
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/storage/aio/aio_funcs.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "fmgr.h"
+#include "funcapi.h"
+#include "nodes/execnodes.h"
+#include "port/atomics.h"
+#include "storage/aio_internal.h"
+#include "storage/lock.h"
+#include "storage/proc.h"
+#include "storage/procnumber.h"
+#include "utils/builtins.h"
+#include "utils/fmgrprotos.h"
+#include "utils/tuplestore.h"
+
+
+/*
+ * Byte length of an iovec.
+ */
+static size_t
+iov_byte_length(const struct iovec *iov, int cnt)
+{
+ size_t len = 0;
+
+ for (int i = 0; i < cnt; i++)
+ {
+ len += iov[i].iov_len;
+ }
+
+ return len;
+}
+
+Datum
+pg_get_aios(PG_FUNCTION_ARGS)
+{
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+
+ InitMaterializedSRF(fcinfo, 0);
+
+#define PG_GET_AIOS_COLS 15
+
+ for (uint64 i = 0; i < pgaio_ctl->io_handle_count; i++)
+ {
+ PgAioHandle *live_ioh = &pgaio_ctl->io_handles[i];
+ uint32 ioh_id = pgaio_io_get_id(live_ioh);
+ Datum values[PG_GET_AIOS_COLS] = {0};
+ bool nulls[PG_GET_AIOS_COLS] = {0};
+ ProcNumber owner;
+ PGPROC *owner_proc;
+ int32 owner_pid;
+ PgAioHandleState start_state;
+ uint64 start_generation;
+ PgAioHandle ioh_copy;
+ struct iovec iov_copy[PG_IOV_MAX];
+
+
+ /*
+ * There is no lock that could prevent the state of the IO to advance
+ * concurrently - and we don't want to introduce one, as that would
+ * introduce atomics into a very common path. Instead we
+ *
+ * 1) Determine the state + generation of the IO.
+ *
+ * 2) Copy the IO to local memory.
+ *
+ * 3) Check if state or generation of the IO changed. If the state
+ * changed, retry, if the generation changed don't display the IO.
+ */
+
+ /* 1) from above */
+ start_generation = live_ioh->generation;
+
+ /*
+ * Retry at this point, so we can accept changing states, but not
+ * changing generations.
+ */
+retry:
+ pg_read_barrier();
+ start_state = live_ioh->state;
+
+ if (start_state == PGAIO_HS_IDLE)
+ continue;
+
+ /* 2) from above */
+ memcpy(&ioh_copy, live_ioh, sizeof(PgAioHandle));
+
+ /*
+ * Safe to copy even if no iovec is used - we always reserve the
+ * required space.
+ */
+ memcpy(&iov_copy, &pgaio_ctl->iovecs[ioh_copy.iovec_off],
+ PG_IOV_MAX * sizeof(struct iovec));
+
+ /*
+ * Copy information about owner before 3) below, if the process exited
+ * it'd have to wait for the IO to finish first, which we would detect
+ * in 3).
+ */
+ owner = ioh_copy.owner_procno;
+ owner_proc = GetPGProcByNumber(owner);
+ owner_pid = owner_proc->pid;
+
+ /* 3) from above */
+ pg_read_barrier();
+
+ /*
+ * The IO completed and a new one was started with the same ID. Don't
+ * display it - it really started after this function was called.
+ * There be a risk of a livelock if we just retried endlessly, if IOs
+ * complete very quickly.
+ */
+ if (live_ioh->generation != start_generation)
+ continue;
+
+ /*
+ * The IO's state changed while we were "rendering" it. Just start
+ * from scratch. There's no risk of a livelock here, as an IO has a
+ * limited sets of states it can be in, and state changes go only in a
+ * single direction.
+ */
+ if (live_ioh->state != start_state)
+ goto retry;
+
+ /*
+ * Now that we have copied the IO into local memory and checked that
+ * it's still in the same state, we are not allowed to access "live"
+ * memory anymore. To make it slightly easier to catch such cases, set
+ * the "live" pointers to NULL.
+ */
+ live_ioh = NULL;
+ owner_proc = NULL;
+
+
+ /* column: owning pid */
+ if (owner_pid != 0)
+ values[0] = Int32GetDatum(owner_pid);
+ else
+ nulls[0] = false;
+
+ /* column: IO's id */
+ values[1] = ioh_id;
+
+ /* column: IO's generation */
+ values[2] = Int64GetDatum(start_generation);
+
+ /* column: IO's state */
+ values[3] = CStringGetTextDatum(pgaio_io_get_state_name(&ioh_copy));
+
+ /*
+ * If the IO is in PGAIO_HS_HANDED_OUT state, none of the following
+ * fields are valid yet (or are in the process of being set).
+ * Therefore we don't want to display any other columns.
+ */
+ if (start_state == PGAIO_HS_HANDED_OUT)
+ {
+ memset(nulls + 4, 1, (lengthof(nulls) - 4) * sizeof(bool));
+ goto display;
+ }
+
+ /* column: IO's operation */
+ values[4] = CStringGetTextDatum(pgaio_io_get_op_name(&ioh_copy));
+
+ /* columns: details about the IO's operation (offset, length) */
+ switch (ioh_copy.op)
+ {
+ case PGAIO_OP_INVALID:
+ nulls[5] = true;
+ nulls[6] = true;
+ break;
+ case PGAIO_OP_READV:
+ values[5] = Int64GetDatum(ioh_copy.op_data.read.offset);
+ values[6] =
+ Int64GetDatum(iov_byte_length(iov_copy, ioh_copy.op_data.read.iov_length));
+ break;
+ case PGAIO_OP_WRITEV:
+ values[5] = Int64GetDatum(ioh_copy.op_data.write.offset);
+ values[6] =
+ Int64GetDatum(iov_byte_length(iov_copy, ioh_copy.op_data.write.iov_length));
+ break;
+ }
+
+ /* column: IO's target */
+ values[7] = CStringGetTextDatum(pgaio_io_get_target_name(&ioh_copy));
+
+ /* column: length of IO's data array */
+ values[8] = Int16GetDatum(ioh_copy.handle_data_len);
+
+ /* column: raw result (i.e. some form of syscall return value) */
+ if (start_state == PGAIO_HS_COMPLETED_IO
+ || start_state == PGAIO_HS_COMPLETED_SHARED
+ || start_state == PGAIO_HS_COMPLETED_LOCAL)
+ values[9] = Int32GetDatum(ioh_copy.result);
+ else
+ nulls[9] = true;
+
+ /*
+ * column: result in the higher level representation (unknown if not
+ * finished)
+ */
+ values[10] =
+ CStringGetTextDatum(pgaio_result_status_string(ioh_copy.distilled_result.status));
+
+ /* column: target description */
+ values[11] = CStringGetTextDatum(pgaio_io_get_target_description(&ioh_copy));
+
+ /* columns: one for each flag */
+ values[12] = BoolGetDatum(ioh_copy.flags & PGAIO_HF_SYNCHRONOUS);
+ values[13] = BoolGetDatum(ioh_copy.flags & PGAIO_HF_REFERENCES_LOCAL);
+ values[14] = BoolGetDatum(ioh_copy.flags & PGAIO_HF_BUFFERED);
+
+display:
+ tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+ }
+
+ return (Datum) 0;
+}