aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/catalog/system_views.sql1
-rw-r--r--src/backend/replication/basebackup.c3
-rw-r--r--src/backend/replication/walsender.c49
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_proc.h2
-rw-r--r--src/include/replication/walsender.h11
6 files changed, 64 insertions, 4 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index aa89240e85f..718e996e6b8 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -501,6 +501,7 @@ CREATE VIEW pg_stat_replication AS
S.client_addr,
S.client_port,
S.backend_start,
+ W.state,
W.sent_location
FROM pg_stat_get_activity(NULL) AS S, pg_authid U,
pg_stat_get_wal_senders() AS W
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index c09700f7980..144b17c66b6 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -24,6 +24,7 @@
#include "libpq/pqformat.h"
#include "nodes/pg_list.h"
#include "replication/basebackup.h"
+#include "replication/walsender.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "utils/builtins.h"
@@ -115,6 +116,8 @@ SendBaseBackup(const char *options)
ALLOCSET_DEFAULT_MAXSIZE);
old_context = MemoryContextSwitchTo(backup_context);
+ WalSndSetState(WALSNDSTATE_BACKUP);
+
if (backup_label == NULL)
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 559e7349fc5..a0f20ab41f0 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -179,6 +179,7 @@ WalSndHandshake(void)
{
int firstchar;
+ WalSndSetState(WALSNDSTATE_STARTUP);
set_ps_display("idle", false);
/* Wait for a command to arrive */
@@ -482,6 +483,9 @@ WalSndLoop(void)
if (!XLogSend(output_message, &caughtup))
break;
}
+
+ /* Update our state to indicate if we're behind or not */
+ WalSndSetState(caughtup ? WALSNDSTATE_STREAMING : WALSNDSTATE_CATCHUP);
}
/*
@@ -533,6 +537,7 @@ InitWalSnd(void)
*/
walsnd->pid = MyProcPid;
MemSet(&walsnd->sentPtr, 0, sizeof(XLogRecPtr));
+ walsnd->state = WALSNDSTATE_STARTUP;
SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */
OwnLatch((Latch *) &walsnd->latch);
@@ -960,6 +965,45 @@ WalSndWakeup(void)
SetLatch(&WalSndCtl->walsnds[i].latch);
}
+/* Set state for current walsender (only called in walsender) */
+void
+WalSndSetState(WalSndState state)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+
+ Assert(am_walsender);
+
+ if (walsnd->state == state)
+ return;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->state = state;
+ SpinLockRelease(&walsnd->mutex);
+}
+
+/*
+ * Return a string constant representing the state. This is used
+ * in system views, and should *not* be translated.
+ */
+static const char *
+WalSndGetStateString(WalSndState state)
+{
+ switch (state)
+ {
+ case WALSNDSTATE_STARTUP:
+ return "STARTUP";
+ case WALSNDSTATE_BACKUP:
+ return "BACKUP";
+ case WALSNDSTATE_CATCHUP:
+ return "CATCHUP";
+ case WALSNDSTATE_STREAMING:
+ return "STREAMING";
+ }
+ return "UNKNOWN";
+}
+
+
/*
* Returns activity of walsenders, including pids and xlog locations sent to
* standby servers.
@@ -967,7 +1011,7 @@ WalSndWakeup(void)
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_WAL_SENDERS_COLS 2
+#define PG_STAT_GET_WAL_SENDERS_COLS 3
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -1021,7 +1065,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
memset(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(walsnd->pid);
- values[1] = CStringGetTextDatum(sent_location);
+ values[1] = CStringGetTextDatum(WalSndGetStateString(walsnd->state));
+ values[2] = CStringGetTextDatum(sent_location);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 7a03b1c1173..df3c95b5f91 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 201101081
+#define CATALOG_VERSION_NO 201101111
#endif
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 2eadd2ced8d..f8b5d4da3da 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -3075,7 +3075,7 @@ DATA(insert OID = 1936 ( pg_stat_get_backend_idset PGNSP PGUID 12 1 100 0 f f
DESCR("statistics: currently active backend IDs");
DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,23}" "{i,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ ));
DESCR("statistics: information about currently active backends");
-DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25}" "{o,o}" "{procpid,sent_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
+DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25}" "{o,o,o}" "{procpid,state,sent_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
DESCR("statistics: information about currently active replication");
DATA(insert OID = 2026 ( pg_backend_pid PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ ));
DESCR("statistics: current backend PID");
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index d6767b9dcf3..0b4a143f827 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -16,12 +16,22 @@
#include "storage/latch.h"
#include "storage/spin.h"
+
+typedef enum WalSndState
+{
+ WALSNDSTATE_STARTUP = 0,
+ WALSNDSTATE_BACKUP,
+ WALSNDSTATE_CATCHUP,
+ WALSNDSTATE_STREAMING
+} WalSndState;
+
/*
* Each walsender has a WalSnd struct in shared memory.
*/
typedef struct WalSnd
{
pid_t pid; /* this walsender's process id, or 0 */
+ WalSndState state; /* this walsender's state */
XLogRecPtr sentPtr; /* WAL has been sent up to this point */
slock_t mutex; /* locks shared variables shown above */
@@ -53,6 +63,7 @@ extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);
extern void WalSndWakeup(void);
+extern void WalSndSetState(WalSndState state);
extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS);