aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/wait.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/wait.c')
-rw-r--r--src/backend/commands/wait.c295
1 files changed, 295 insertions, 0 deletions
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 00000000000..b7aee5b794b
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,295 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ * Implements WAIT FOR clause for BEGIN and START TRANSACTION commands.
+ * This clause allows waiting for given LSN to be replayed on standby.
+ *
+ * Copyright (c) 2020, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <math.h>
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/backendid.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/sinvaladt.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+
+/*
+ * Shared memory structure representing information about LSNs, which backends
+ * are waiting for replay.
+ */
+typedef struct
+{
+ slock_t mutex; /* mutex protecting the fields below */
+ int max_backend_id; /* max backend_id present in lsns[] */
+ pg_atomic_uint64 min_lsn; /* minimal waited LSN */
+ /* per-backend array of waited LSNs */
+ XLogRecPtr lsns[FLEXIBLE_ARRAY_MEMBER];
+} WaitLSNState;
+
+static WaitLSNState * state;
+
+/*
+ * Add the wait event of the current backend to shared memory array
+ */
+static void
+WaitLSNAdd(XLogRecPtr lsn_to_wait)
+{
+ SpinLockAcquire(&state->mutex);
+ if (state->max_backend_id < MyBackendId)
+ state->max_backend_id = MyBackendId;
+
+ state->lsns[MyBackendId] = lsn_to_wait;
+
+ if (lsn_to_wait < state->min_lsn.value)
+ state->min_lsn.value = lsn_to_wait;
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete wait event of the current backend from the shared memory array.
+ */
+void
+WaitLSNDelete(void)
+{
+ int i;
+ XLogRecPtr deleted_lsn;
+
+ SpinLockAcquire(&state->mutex);
+
+ deleted_lsn = state->lsns[MyBackendId];
+ state->lsns[MyBackendId] = InvalidXLogRecPtr;
+
+ /* If we are deleting the minimal LSN, then choose the next min_lsn */
+ if (!XLogRecPtrIsInvalid(deleted_lsn) &&
+ deleted_lsn == state->min_lsn.value)
+ {
+ state->min_lsn.value = InvalidXLogRecPtr;
+ for (i = 2; i <= state->max_backend_id; i++)
+ {
+ if (!XLogRecPtrIsInvalid(state->lsns[i]) &&
+ (state->lsns[i] < state->min_lsn.value ||
+ XLogRecPtrIsInvalid(state->min_lsn.value)))
+ {
+ state->min_lsn.value = state->lsns[i];
+ }
+ }
+ }
+
+ /* If deleting from the end of the array, shorten the array's used part */
+ if (state->max_backend_id == MyBackendId)
+ {
+ for (i = (MyBackendId); i >= 2; i--)
+ if (!XLogRecPtrIsInvalid(state->lsns[i]))
+ {
+ state->max_backend_id = i;
+ break;
+ }
+ }
+
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitLSNState
+ */
+Size
+WaitLSNShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(WaitLSNState, lsns);
+ size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr)));
+ return size;
+}
+
+/*
+ * Initialize an shared memory structure for waiting for LSN
+ */
+void
+WaitLSNShmemInit(void)
+{
+ bool found;
+ uint32 i;
+
+ state = (WaitLSNState *) ShmemInitStruct("pg_wait_lsn",
+ WaitLSNShmemSize(),
+ &found);
+ if (!found)
+ {
+ SpinLockInit(&state->mutex);
+
+ for (i = 0; i < (MaxBackends + 1); i++)
+ state->lsns[i] = InvalidXLogRecPtr;
+
+ state->max_backend_id = 0;
+ pg_atomic_init_u64(&state->min_lsn, InvalidXLogRecPtr);
+ }
+}
+
+/*
+ * Set latches in shared memory to signal that new LSN has been replayed
+ */
+void
+WaitLSNSetLatch(XLogRecPtr cur_lsn)
+{
+ uint32 i;
+ int max_backend_id;
+ PGPROC *backend;
+
+ SpinLockAcquire(&state->mutex);
+ max_backend_id = state->max_backend_id;
+
+ for (i = 2; i <= max_backend_id; i++)
+ {
+ backend = BackendIdGetProc(i);
+
+ if (backend && state->lsns[i] != 0 &&
+ state->lsns[i] <= cur_lsn)
+ {
+ SetLatch(&backend->procLatch);
+ }
+ }
+ SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Get minimal LSN that some backend is waiting for
+ */
+XLogRecPtr
+WaitLSNGetMin(void)
+{
+ return state->min_lsn.value;
+}
+
+/*
+ * On WAIT use a latch to wait till LSN is replayed, postmaster dies or timeout
+ * happens. Timeout is specified in milliseconds. Returns true if LSN was
+ * reached and false otherwise.
+ */
+bool
+WaitLSNUtility(XLogRecPtr target_lsn, const int timeout_ms)
+{
+ XLogRecPtr cur_lsn;
+ int latch_events;
+ float8 endtime;
+ bool res = false;
+ bool wait_forever = (timeout_ms <= 0);
+
+ endtime = GetNowFloat() + timeout_ms / 1000.0;
+
+ latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+ /* Check if we already reached the needed LSN */
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+ if (cur_lsn >= target_lsn)
+ return true;
+
+ WaitLSNAdd(target_lsn);
+ ResetLatch(MyLatch);
+
+ /* Recheck if LSN was reached while WaitLSNAdd() and ResetLatch() */
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+ if (cur_lsn >= target_lsn)
+ return true;
+
+ for (;;)
+ {
+ int rc;
+ float8 time_left = 0;
+ long time_left_ms = 0;
+
+ time_left = endtime - GetNowFloat();
+
+ /* Use 1 second as the default timeout to check for interrupts */
+ if (wait_forever || time_left < 0 || time_left > 1.0)
+ time_left_ms = 1000;
+ else
+ time_left_ms = (long) ceil(time_left * 1000.0);
+
+ /* If interrupt, LockErrorCleanup() will do WaitLSNDelete() for us */
+ CHECK_FOR_INTERRUPTS();
+
+ /* If postmaster dies, finish immediately */
+ if (!PostmasterIsAlive())
+ break;
+
+ rc = WaitLatch(MyLatch, latch_events, time_left_ms,
+ WAIT_EVENT_CLIENT_READ);
+
+ ResetLatch(MyLatch);
+
+ if (rc & WL_LATCH_SET)
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+
+ if (rc & WL_TIMEOUT)
+ {
+ time_left = endtime - GetNowFloat();
+ /* If the time specified by user has passed, stop waiting */
+ if (!wait_forever && time_left <= 0.0)
+ break;
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+ }
+
+ /* If LSN has been replayed */
+ if (target_lsn <= cur_lsn)
+ break;
+ }
+
+ WaitLSNDelete();
+
+ if (cur_lsn < target_lsn)
+ ereport(WARNING,
+ (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
+ errmsg("didn't start transaction because LSN was not reached"),
+ errhint("Try to increase wait timeout.")));
+ else
+ res = true;
+
+ return res;
+}
+
+/*
+ * Implementation of WAIT FOR clause for BEGIN and START TRANSACTION commands
+ */
+int
+WaitLSNMain(WaitClause *stmt, DestReceiver *dest)
+{
+ TupleDesc tupdesc;
+ TupOutputState *tstate;
+ XLogRecPtr target_lsn;
+ bool res = false;
+
+ target_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
+ CStringGetDatum(stmt->lsn)));
+ res = WaitLSNUtility(target_lsn, stmt->timeout);
+
+ /* Need a tuple descriptor representing a single TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+
+ /* Prepare for projection of tuples */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
+
+ /* Send the result */
+ do_text_output_oneline(tstate, res ? "t" : "f");
+ end_tup_output(tstate);
+ return res;
+}