aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/twophase.c21
-rw-r--r--src/backend/access/transam/xact.c7
-rw-r--r--src/backend/access/transam/xlog.c25
-rw-r--r--src/backend/port/unix_latch.c3
-rw-r--r--src/backend/port/win32_latch.c4
-rw-r--r--src/backend/replication/walsender.c11
-rw-r--r--src/include/replication/walsender.h24
7 files changed, 59 insertions, 36 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index e8fb78b3311..7f198c2e3e0 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1042,13 +1042,6 @@ EndPrepare(GlobalTransaction gxact)
/* If we crash now, we have prepared: WAL replay will fix things */
- /*
- * Wake up all walsenders to send WAL up to the PREPARE record immediately
- * if replication is enabled
- */
- if (max_wal_senders > 0)
- WalSndWakeup();
-
/* write correct CRC and close file */
if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
{
@@ -2045,13 +2038,6 @@ RecordTransactionCommitPrepared(TransactionId xid,
/* Flush XLOG to disk */
XLogFlush(recptr);
- /*
- * Wake up all walsenders to send WAL up to the COMMIT PREPARED record
- * immediately if replication is enabled
- */
- if (max_wal_senders > 0)
- WalSndWakeup();
-
/* Mark the transaction committed in pg_clog */
TransactionIdCommitTree(xid, nchildren, children);
@@ -2133,13 +2119,6 @@ RecordTransactionAbortPrepared(TransactionId xid,
XLogFlush(recptr);
/*
- * Wake up all walsenders to send WAL up to the ABORT PREPARED record
- * immediately if replication is enabled
- */
- if (max_wal_senders > 0)
- WalSndWakeup();
-
- /*
* Mark the transaction aborted in clog. This is not absolutely necessary
* but we may as well do it while we are here.
*/
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 4755ee6ee40..86b1afa80d9 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1142,13 +1142,6 @@ RecordTransactionCommit(void)
XLogFlush(XactLastRecEnd);
/*
- * Wake up all walsenders to send WAL up to the COMMIT record
- * immediately if replication is enabled
- */
- if (max_wal_senders > 0)
- WalSndWakeup();
-
- /*
* Now we may update the CLOG, if we wrote a COMMIT record above
*/
if (markXidCommitted)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index cbfa68a4e7b..a43e2eeaf30 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1025,6 +1025,8 @@ begin:;
END_CRIT_SECTION();
+ /* wakeup the WalSnd now that we released the WALWriteLock */
+ WalSndWakeupProcessRequests();
return RecPtr;
}
@@ -1208,6 +1210,9 @@ begin:;
END_CRIT_SECTION();
+ /* wakeup the WalSnd now that we outside contented locks */
+ WalSndWakeupProcessRequests();
+
return RecPtr;
}
@@ -1792,6 +1797,10 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
if (finishing_seg || (xlog_switch && last_iteration))
{
issue_xlog_fsync(openLogFile, openLogSegNo);
+
+ /* signal that we need to wakeup WalSnd later */
+ WalSndWakeupRequest();
+
LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
if (XLogArchivingActive())
@@ -1854,7 +1863,11 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
openLogFile = XLogFileOpen(openLogSegNo);
openLogOff = 0;
}
+
issue_xlog_fsync(openLogFile, openLogSegNo);
+
+ /* signal that we need to wakeup WalSnd later */
+ WalSndWakeupRequest();
}
LogwrtResult.Flush = LogwrtResult.Write;
}
@@ -2120,6 +2133,9 @@ XLogFlush(XLogRecPtr record)
END_CRIT_SECTION();
+ /* wakeup the WalSnd now that we released the WALWriteLock */
+ WalSndWakeupProcessRequests();
+
/*
* If we still haven't flushed to the request point then we have a
* problem; most likely, the requested flush point is past end of XLOG.
@@ -2245,13 +2261,8 @@ XLogBackgroundFlush(void)
END_CRIT_SECTION();
- /*
- * If we wrote something then we have something to send to standbys also,
- * otherwise the replication delay become around 7s with just async
- * commit.
- */
- if (wrote_something)
- WalSndWakeup();
+ /* wakeup the WalSnd now that we released the WALWriteLock */
+ WalSndWakeupProcessRequests();
return wrote_something;
}
diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c
index 65b2fc56e03..335e9f66afb 100644
--- a/src/backend/port/unix_latch.c
+++ b/src/backend/port/unix_latch.c
@@ -418,6 +418,9 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
* NB: when calling this in a signal handler, be sure to save and restore
* errno around it. (That's standard practice in most signal handlers, of
* course, but we used to omit it in handlers that only set a flag.)
+ *
+ * NB: this function is called from critical sections and signal handlers so
+ * throwing an error is not a good idea.
*/
void
SetLatch(volatile Latch *latch)
diff --git a/src/backend/port/win32_latch.c b/src/backend/port/win32_latch.c
index eb46dcad1ba..1f1ed33dc2d 100644
--- a/src/backend/port/win32_latch.c
+++ b/src/backend/port/win32_latch.c
@@ -247,6 +247,10 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
return result;
}
+/*
+ * The comments above the unix implementation (unix_latch.c) of this function
+ * apply here as well.
+ */
void
SetLatch(volatile Latch *latch)
{
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 616d4e73e3b..912ce9d4503 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -81,6 +81,10 @@ bool am_cascading_walsender = false; /* Am I cascading WAL to
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int replication_timeout = 60 * 1000; /* maximum time to send one
* WAL data message */
+/*
+ * State for WalSndWakeupRequest
+ */
+bool wake_wal_senders = false;
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
@@ -1395,7 +1399,12 @@ WalSndShmemInit(void)
}
}
-/* Wake up all walsenders */
+/*
+ * Wake up all walsenders
+ *
+ * This will be called inside critical sections, so throwing an error is not
+ * adviseable.
+ */
void
WalSndWakeup(void)
{
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 65536016c28..bb85ccf7b22 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -21,6 +21,7 @@ extern bool am_walsender;
extern bool am_cascading_walsender;
extern volatile sig_atomic_t walsender_shutdown_requested;
extern volatile sig_atomic_t walsender_ready_to_stop;
+extern bool wake_wal_senders;
/* user-settable parameters */
extern int max_wal_senders;
@@ -35,4 +36,27 @@ extern void WalSndRqstFileReload(void);
extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS);
+/*
+ * Remember that we want to wakeup walsenders later
+ *
+ * This is separated from doing the actual wakeup because the writeout is done
+ * while holding contended locks.
+ */
+#define WalSndWakeupRequest() \
+ do { wake_wal_senders = true; } while (0)
+
+/*
+ * wakeup walsenders if there is work to be done
+ */
+#define WalSndWakeupProcessRequests() \
+ do \
+ { \
+ if (wake_wal_senders) \
+ { \
+ wake_wal_senders = false; \
+ if (max_wal_senders > 0) \
+ WalSndWakeup(); \
+ } \
+ } while (0)
+
#endif /* _WALSENDER_H */