diff options
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 9 | ||||
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 14 | ||||
-rw-r--r-- | src/backend/replication/slot.c | 13 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 4 |
4 files changed, 40 insertions, 0 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 8aac670bd45..b437799c5fd 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -58,6 +58,7 @@ #include "catalog/catalog.h" #include "lib/binaryheap.h" #include "miscadmin.h" +#include "pgstat.h" #include "replication/logical.h" #include "replication/reorderbuffer.h" #include "replication/slot.h" @@ -2275,6 +2276,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, ondisk->size = sz; + pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE); if (write(fd, rb->outbuf, ondisk->size) != ondisk->size) { int save_errno = errno; @@ -2286,6 +2288,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, errmsg("could not write to data file for XID %u: %m", txn->xid))); } + pgstat_report_wait_end(); Assert(ondisk->change.action == change->action); } @@ -2366,7 +2369,9 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, * end of this file. */ ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange)); + pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ); readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange)); + pgstat_report_wait_end(); /* eof */ if (readBytes == 0) @@ -2393,8 +2398,10 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, sizeof(ReorderBufferDiskChange) + ondisk->size); ondisk = (ReorderBufferDiskChange *) rb->outbuf; + pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ); readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange), ondisk->size - sizeof(ReorderBufferDiskChange)); + pgstat_report_wait_end(); if (readBytes < 0) ereport(ERROR, @@ -3047,7 +3054,9 @@ ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname) memset(&key, 0, sizeof(ReorderBufferTupleCidKey)); /* read all mappings till the end of the file */ + pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ); readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData)); + pgstat_report_wait_end(); if (readBytes < 0) ereport(ERROR, diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index e129a6b8e40..3f242a8ed70 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -115,6 +115,8 @@ #include "access/transam.h" #include "access/xact.h" +#include "pgstat.h" + #include "replication/logical.h" #include "replication/reorderbuffer.h" #include "replication/snapbuild.h" @@ -1580,6 +1582,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) ereport(ERROR, (errmsg("could not open file \"%s\": %m", path))); + pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE); if ((write(fd, ondisk, needed_length)) != needed_length) { CloseTransientFile(fd); @@ -1587,6 +1590,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) (errcode_for_file_access(), errmsg("could not write to file \"%s\": %m", tmppath))); } + pgstat_report_wait_end(); /* * fsync the file before renaming so that even if we crash after this we @@ -1596,6 +1600,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) * some noticeable overhead since it's performed synchronously during * decoding? */ + pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC); if (pg_fsync(fd) != 0) { CloseTransientFile(fd); @@ -1603,6 +1608,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) (errcode_for_file_access(), errmsg("could not fsync file \"%s\": %m", tmppath))); } + pgstat_report_wait_end(); CloseTransientFile(fd); fsync_fname("pg_logical/snapshots", true); @@ -1677,7 +1683,9 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) /* read statically sized portion of snapshot */ + pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize); + pgstat_report_wait_end(); if (readBytes != SnapBuildOnDiskConstantSize) { CloseTransientFile(fd); @@ -1703,7 +1711,9 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize); /* read SnapBuild */ + pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild)); + pgstat_report_wait_end(); if (readBytes != sizeof(SnapBuild)) { CloseTransientFile(fd); @@ -1717,7 +1727,9 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) /* restore running xacts information */ sz = sizeof(TransactionId) * ondisk.builder.running.xcnt_space; ondisk.builder.running.xip = MemoryContextAllocZero(builder->context, sz); + pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); readBytes = read(fd, ondisk.builder.running.xip, sz); + pgstat_report_wait_end(); if (readBytes != sz) { CloseTransientFile(fd); @@ -1731,7 +1743,9 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) /* restore committed xacts information */ sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt; ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz); + pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); readBytes = read(fd, ondisk.builder.committed.xip, sz); + pgstat_report_wait_end(); if (readBytes != sz) { CloseTransientFile(fd); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 10d69d04272..5237a9fb078 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -43,6 +43,7 @@ #include "access/xlog_internal.h" #include "common/string.h" #include "miscadmin.h" +#include "pgstat.h" #include "replication/slot.h" #include "storage/fd.h" #include "storage/proc.h" @@ -1100,10 +1101,12 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) SnapBuildOnDiskChecksummedSize); FIN_CRC32C(cp.checksum); + pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE); if ((write(fd, &cp, sizeof(cp))) != sizeof(cp)) { int save_errno = errno; + pgstat_report_wait_end(); CloseTransientFile(fd); errno = save_errno; ereport(elevel, @@ -1112,12 +1115,15 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) tmppath))); return; } + pgstat_report_wait_end(); /* fsync the temporary file */ + pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC); if (pg_fsync(fd) != 0) { int save_errno = errno; + pgstat_report_wait_end(); CloseTransientFile(fd); errno = save_errno; ereport(elevel, @@ -1126,6 +1132,7 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) tmppath))); return; } + pgstat_report_wait_end(); CloseTransientFile(fd); @@ -1202,6 +1209,7 @@ RestoreSlotFromDisk(const char *name) * Sync state file before we're reading from it. We might have crashed * while it wasn't synced yet and we shouldn't continue on that basis. */ + pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC); if (pg_fsync(fd) != 0) { CloseTransientFile(fd); @@ -1210,6 +1218,7 @@ RestoreSlotFromDisk(const char *name) errmsg("could not fsync file \"%s\": %m", path))); } + pgstat_report_wait_end(); /* Also sync the parent directory */ START_CRIT_SECTION(); @@ -1217,7 +1226,9 @@ RestoreSlotFromDisk(const char *name) END_CRIT_SECTION(); /* read part of statefile that's guaranteed to be version independent */ + pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ); readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize); + pgstat_report_wait_end(); if (readBytes != ReplicationSlotOnDiskConstantSize) { int saved_errno = errno; @@ -1253,9 +1264,11 @@ RestoreSlotFromDisk(const char *name) path, cp.length))); /* Now that we know the size, read the entire file */ + pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ); readBytes = read(fd, (char *) &cp + ReplicationSlotOnDiskConstantSize, cp.length); + pgstat_report_wait_end(); if (readBytes != cp.length) { int saved_errno = errno; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 127efecb27d..0f6b828336f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -463,7 +463,9 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd) char rbuf[BLCKSZ]; int nread; + pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ); nread = read(fd, rbuf, sizeof(rbuf)); + pgstat_report_wait_end(); if (nread <= 0) ereport(ERROR, (errcode_for_file_access(), @@ -2126,7 +2128,9 @@ retry: else segbytes = nbytes; + pgstat_report_wait_start(WAIT_EVENT_WAL_READ); readbytes = read(sendFile, p, segbytes); + pgstat_report_wait_end(); if (readbytes <= 0) { ereport(ERROR, |