diff options
Diffstat (limited to 'src/backend/commands')
-rw-r--r-- | src/backend/commands/Makefile | 3 | ||||
-rw-r--r-- | src/backend/commands/wait.c | 295 |
2 files changed, 297 insertions, 1 deletions
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index d4815d3ce65..9b310926c12 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -57,6 +57,7 @@ OBJS = \ user.o \ vacuum.o \ variable.o \ - view.o + view.o \ + wait.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c new file mode 100644 index 00000000000..b7aee5b794b --- /dev/null +++ b/src/backend/commands/wait.c @@ -0,0 +1,295 @@ +/*------------------------------------------------------------------------- + * + * wait.c + * Implements WAIT FOR clause for BEGIN and START TRANSACTION commands. + * This clause allows waiting for given LSN to be replayed on standby. + * + * Copyright (c) 2020, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/commands/wait.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <math.h> + +#include "access/xlog.h" +#include "access/xlogdefs.h" +#include "commands/wait.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "storage/backendid.h" +#include "storage/pmsignal.h" +#include "storage/proc.h" +#include "storage/shmem.h" +#include "storage/sinvaladt.h" +#include "storage/spin.h" +#include "utils/builtins.h" +#include "utils/pg_lsn.h" +#include "utils/timestamp.h" + +/* + * Shared memory structure representing information about LSNs, which backends + * are waiting for replay. + */ +typedef struct +{ + slock_t mutex; /* mutex protecting the fields below */ + int max_backend_id; /* max backend_id present in lsns[] */ + pg_atomic_uint64 min_lsn; /* minimal waited LSN */ + /* per-backend array of waited LSNs */ + XLogRecPtr lsns[FLEXIBLE_ARRAY_MEMBER]; +} WaitLSNState; + +static WaitLSNState * state; + +/* + * Add the wait event of the current backend to shared memory array + */ +static void +WaitLSNAdd(XLogRecPtr lsn_to_wait) +{ + SpinLockAcquire(&state->mutex); + if (state->max_backend_id < MyBackendId) + state->max_backend_id = MyBackendId; + + state->lsns[MyBackendId] = lsn_to_wait; + + if (lsn_to_wait < state->min_lsn.value) + state->min_lsn.value = lsn_to_wait; + SpinLockRelease(&state->mutex); +} + +/* + * Delete wait event of the current backend from the shared memory array. + */ +void +WaitLSNDelete(void) +{ + int i; + XLogRecPtr deleted_lsn; + + SpinLockAcquire(&state->mutex); + + deleted_lsn = state->lsns[MyBackendId]; + state->lsns[MyBackendId] = InvalidXLogRecPtr; + + /* If we are deleting the minimal LSN, then choose the next min_lsn */ + if (!XLogRecPtrIsInvalid(deleted_lsn) && + deleted_lsn == state->min_lsn.value) + { + state->min_lsn.value = InvalidXLogRecPtr; + for (i = 2; i <= state->max_backend_id; i++) + { + if (!XLogRecPtrIsInvalid(state->lsns[i]) && + (state->lsns[i] < state->min_lsn.value || + XLogRecPtrIsInvalid(state->min_lsn.value))) + { + state->min_lsn.value = state->lsns[i]; + } + } + } + + /* If deleting from the end of the array, shorten the array's used part */ + if (state->max_backend_id == MyBackendId) + { + for (i = (MyBackendId); i >= 2; i--) + if (!XLogRecPtrIsInvalid(state->lsns[i])) + { + state->max_backend_id = i; + break; + } + } + + SpinLockRelease(&state->mutex); +} + +/* + * Report amount of shared memory space needed for WaitLSNState + */ +Size +WaitLSNShmemSize(void) +{ + Size size; + + size = offsetof(WaitLSNState, lsns); + size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr))); + return size; +} + +/* + * Initialize an shared memory structure for waiting for LSN + */ +void +WaitLSNShmemInit(void) +{ + bool found; + uint32 i; + + state = (WaitLSNState *) ShmemInitStruct("pg_wait_lsn", + WaitLSNShmemSize(), + &found); + if (!found) + { + SpinLockInit(&state->mutex); + + for (i = 0; i < (MaxBackends + 1); i++) + state->lsns[i] = InvalidXLogRecPtr; + + state->max_backend_id = 0; + pg_atomic_init_u64(&state->min_lsn, InvalidXLogRecPtr); + } +} + +/* + * Set latches in shared memory to signal that new LSN has been replayed + */ +void +WaitLSNSetLatch(XLogRecPtr cur_lsn) +{ + uint32 i; + int max_backend_id; + PGPROC *backend; + + SpinLockAcquire(&state->mutex); + max_backend_id = state->max_backend_id; + + for (i = 2; i <= max_backend_id; i++) + { + backend = BackendIdGetProc(i); + + if (backend && state->lsns[i] != 0 && + state->lsns[i] <= cur_lsn) + { + SetLatch(&backend->procLatch); + } + } + SpinLockRelease(&state->mutex); +} + +/* + * Get minimal LSN that some backend is waiting for + */ +XLogRecPtr +WaitLSNGetMin(void) +{ + return state->min_lsn.value; +} + +/* + * On WAIT use a latch to wait till LSN is replayed, postmaster dies or timeout + * happens. Timeout is specified in milliseconds. Returns true if LSN was + * reached and false otherwise. + */ +bool +WaitLSNUtility(XLogRecPtr target_lsn, const int timeout_ms) +{ + XLogRecPtr cur_lsn; + int latch_events; + float8 endtime; + bool res = false; + bool wait_forever = (timeout_ms <= 0); + + endtime = GetNowFloat() + timeout_ms / 1000.0; + + latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH; + + /* Check if we already reached the needed LSN */ + cur_lsn = GetXLogReplayRecPtr(NULL); + if (cur_lsn >= target_lsn) + return true; + + WaitLSNAdd(target_lsn); + ResetLatch(MyLatch); + + /* Recheck if LSN was reached while WaitLSNAdd() and ResetLatch() */ + cur_lsn = GetXLogReplayRecPtr(NULL); + if (cur_lsn >= target_lsn) + return true; + + for (;;) + { + int rc; + float8 time_left = 0; + long time_left_ms = 0; + + time_left = endtime - GetNowFloat(); + + /* Use 1 second as the default timeout to check for interrupts */ + if (wait_forever || time_left < 0 || time_left > 1.0) + time_left_ms = 1000; + else + time_left_ms = (long) ceil(time_left * 1000.0); + + /* If interrupt, LockErrorCleanup() will do WaitLSNDelete() for us */ + CHECK_FOR_INTERRUPTS(); + + /* If postmaster dies, finish immediately */ + if (!PostmasterIsAlive()) + break; + + rc = WaitLatch(MyLatch, latch_events, time_left_ms, + WAIT_EVENT_CLIENT_READ); + + ResetLatch(MyLatch); + + if (rc & WL_LATCH_SET) + cur_lsn = GetXLogReplayRecPtr(NULL); + + if (rc & WL_TIMEOUT) + { + time_left = endtime - GetNowFloat(); + /* If the time specified by user has passed, stop waiting */ + if (!wait_forever && time_left <= 0.0) + break; + cur_lsn = GetXLogReplayRecPtr(NULL); + } + + /* If LSN has been replayed */ + if (target_lsn <= cur_lsn) + break; + } + + WaitLSNDelete(); + + if (cur_lsn < target_lsn) + ereport(WARNING, + (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION), + errmsg("didn't start transaction because LSN was not reached"), + errhint("Try to increase wait timeout."))); + else + res = true; + + return res; +} + +/* + * Implementation of WAIT FOR clause for BEGIN and START TRANSACTION commands + */ +int +WaitLSNMain(WaitClause *stmt, DestReceiver *dest) +{ + TupleDesc tupdesc; + TupOutputState *tstate; + XLogRecPtr target_lsn; + bool res = false; + + target_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, + CStringGetDatum(stmt->lsn))); + res = WaitLSNUtility(target_lsn, stmt->timeout); + + /* Need a tuple descriptor representing a single TEXT column */ + tupdesc = CreateTemplateTupleDesc(1); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0); + + /* Prepare for projection of tuples */ + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple); + + /* Send the result */ + do_text_output_oneline(tstate, res ? "t" : "f"); + end_tup_output(tstate); + return res; +} |