aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c9
-rw-r--r--src/backend/replication/logical/snapbuild.c14
-rw-r--r--src/backend/replication/slot.c13
-rw-r--r--src/backend/replication/walsender.c4
4 files changed, 40 insertions, 0 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 8aac670bd45..b437799c5fd 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -58,6 +58,7 @@
#include "catalog/catalog.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
+#include "pgstat.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/slot.h"
@@ -2275,6 +2276,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
ondisk->size = sz;
+ pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
{
int save_errno = errno;
@@ -2286,6 +2288,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
errmsg("could not write to data file for XID %u: %m",
txn->xid)));
}
+ pgstat_report_wait_end();
Assert(ondisk->change.action == change->action);
}
@@ -2366,7 +2369,9 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
* end of this file.
*/
ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
+ pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
+ pgstat_report_wait_end();
/* eof */
if (readBytes == 0)
@@ -2393,8 +2398,10 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
sizeof(ReorderBufferDiskChange) + ondisk->size);
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+ pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
ondisk->size - sizeof(ReorderBufferDiskChange));
+ pgstat_report_wait_end();
if (readBytes < 0)
ereport(ERROR,
@@ -3047,7 +3054,9 @@ ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
/* read all mappings till the end of the file */
+ pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
+ pgstat_report_wait_end();
if (readBytes < 0)
ereport(ERROR,
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index e129a6b8e40..3f242a8ed70 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -115,6 +115,8 @@
#include "access/transam.h"
#include "access/xact.h"
+#include "pgstat.h"
+
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
@@ -1580,6 +1582,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
ereport(ERROR,
(errmsg("could not open file \"%s\": %m", path)));
+ pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
if ((write(fd, ondisk, needed_length)) != needed_length)
{
CloseTransientFile(fd);
@@ -1587,6 +1590,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
(errcode_for_file_access(),
errmsg("could not write to file \"%s\": %m", tmppath)));
}
+ pgstat_report_wait_end();
/*
* fsync the file before renaming so that even if we crash after this we
@@ -1596,6 +1600,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
* some noticeable overhead since it's performed synchronously during
* decoding?
*/
+ pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
if (pg_fsync(fd) != 0)
{
CloseTransientFile(fd);
@@ -1603,6 +1608,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
(errcode_for_file_access(),
errmsg("could not fsync file \"%s\": %m", tmppath)));
}
+ pgstat_report_wait_end();
CloseTransientFile(fd);
fsync_fname("pg_logical/snapshots", true);
@@ -1677,7 +1683,9 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
/* read statically sized portion of snapshot */
+ pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize);
+ pgstat_report_wait_end();
if (readBytes != SnapBuildOnDiskConstantSize)
{
CloseTransientFile(fd);
@@ -1703,7 +1711,9 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
/* read SnapBuild */
+ pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
+ pgstat_report_wait_end();
if (readBytes != sizeof(SnapBuild))
{
CloseTransientFile(fd);
@@ -1717,7 +1727,9 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
/* restore running xacts information */
sz = sizeof(TransactionId) * ondisk.builder.running.xcnt_space;
ondisk.builder.running.xip = MemoryContextAllocZero(builder->context, sz);
+ pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
readBytes = read(fd, ondisk.builder.running.xip, sz);
+ pgstat_report_wait_end();
if (readBytes != sz)
{
CloseTransientFile(fd);
@@ -1731,7 +1743,9 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
/* restore committed xacts information */
sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
+ pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
readBytes = read(fd, ondisk.builder.committed.xip, sz);
+ pgstat_report_wait_end();
if (readBytes != sz)
{
CloseTransientFile(fd);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 10d69d04272..5237a9fb078 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -43,6 +43,7 @@
#include "access/xlog_internal.h"
#include "common/string.h"
#include "miscadmin.h"
+#include "pgstat.h"
#include "replication/slot.h"
#include "storage/fd.h"
#include "storage/proc.h"
@@ -1100,10 +1101,12 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
SnapBuildOnDiskChecksummedSize);
FIN_CRC32C(cp.checksum);
+ pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
{
int save_errno = errno;
+ pgstat_report_wait_end();
CloseTransientFile(fd);
errno = save_errno;
ereport(elevel,
@@ -1112,12 +1115,15 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
tmppath)));
return;
}
+ pgstat_report_wait_end();
/* fsync the temporary file */
+ pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
if (pg_fsync(fd) != 0)
{
int save_errno = errno;
+ pgstat_report_wait_end();
CloseTransientFile(fd);
errno = save_errno;
ereport(elevel,
@@ -1126,6 +1132,7 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
tmppath)));
return;
}
+ pgstat_report_wait_end();
CloseTransientFile(fd);
@@ -1202,6 +1209,7 @@ RestoreSlotFromDisk(const char *name)
* Sync state file before we're reading from it. We might have crashed
* while it wasn't synced yet and we shouldn't continue on that basis.
*/
+ pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
if (pg_fsync(fd) != 0)
{
CloseTransientFile(fd);
@@ -1210,6 +1218,7 @@ RestoreSlotFromDisk(const char *name)
errmsg("could not fsync file \"%s\": %m",
path)));
}
+ pgstat_report_wait_end();
/* Also sync the parent directory */
START_CRIT_SECTION();
@@ -1217,7 +1226,9 @@ RestoreSlotFromDisk(const char *name)
END_CRIT_SECTION();
/* read part of statefile that's guaranteed to be version independent */
+ pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
+ pgstat_report_wait_end();
if (readBytes != ReplicationSlotOnDiskConstantSize)
{
int saved_errno = errno;
@@ -1253,9 +1264,11 @@ RestoreSlotFromDisk(const char *name)
path, cp.length)));
/* Now that we know the size, read the entire file */
+ pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
readBytes = read(fd,
(char *) &cp + ReplicationSlotOnDiskConstantSize,
cp.length);
+ pgstat_report_wait_end();
if (readBytes != cp.length)
{
int saved_errno = errno;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 127efecb27d..0f6b828336f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -463,7 +463,9 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd)
char rbuf[BLCKSZ];
int nread;
+ pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
nread = read(fd, rbuf, sizeof(rbuf));
+ pgstat_report_wait_end();
if (nread <= 0)
ereport(ERROR,
(errcode_for_file_access(),
@@ -2126,7 +2128,9 @@ retry:
else
segbytes = nbytes;
+ pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
readbytes = read(sendFile, p, segbytes);
+ pgstat_report_wait_end();
if (readbytes <= 0)
{
ereport(ERROR,