aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access
diff options
context:
space:
mode:
authorAlvaro Herrera <alvherre@alvh.no-ip.org>2020-05-08 15:30:34 -0400
committerAlvaro Herrera <alvherre@alvh.no-ip.org>2020-05-08 15:40:11 -0400
commitb060dbe0001a1d6bf26cd294710f3cb203868d46 (patch)
tree6e9e980aa63ec1ec3655b93c92b9b5caa6689d38 /src/backend/access
parent871696ba20e0251e86041576373809d1c7ca161d (diff)
downloadpostgresql-b060dbe0001a1d6bf26cd294710f3cb203868d46.tar.gz
postgresql-b060dbe0001a1d6bf26cd294710f3cb203868d46.zip
Rework XLogReader callback system
Code review for 0dc8ead46363, prompted by a bug closed by 91c40548d5f7. XLogReader's system for opening and closing segments had gotten too complicated, with callbacks being passed at both the XLogReaderAllocate level (read_page) as well as at the WALRead level (segment_open). This was confusing and hard to follow, so restructure things so that these callbacks are passed together at XLogReaderAllocate time, and add another callback to the set (segment_close) to make it a coherent whole. Also, ensure XLogReaderState is an argument to all the callbacks, so that they can grab at the ->private data if necessary. Document the whole arrangement more clearly. Author: Álvaro Herrera <alvherre@alvh.no-ip.org> Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Discussion: https://postgr.es/m/20200422175754.GA19858@alvherre.pgsql
Diffstat (limited to 'src/backend/access')
-rw-r--r--src/backend/access/transam/twophase.c5
-rw-r--r--src/backend/access/transam/xlog.c10
-rw-r--r--src/backend/access/transam/xlogreader.c51
-rw-r--r--src/backend/access/transam/xlogutils.c24
4 files changed, 57 insertions, 33 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 2f7d4ed59a8..e1904877faa 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1331,7 +1331,10 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
char *errormsg;
xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
- &read_local_xlog_page, NULL);
+ XL_ROUTINE(.page_read = &read_local_xlog_page,
+ .segment_open = &wal_segment_open,
+ .segment_close = &wal_segment_close),
+ NULL);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 0d3d6709284..a53e6d96334 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1223,7 +1223,7 @@ XLogInsertRecord(XLogRecData *rdata,
if (!debug_reader)
debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
- NULL, NULL);
+ XL_ROUTINE(), NULL);
if (!debug_reader)
{
@@ -6478,8 +6478,12 @@ StartupXLOG(void)
/* Set up XLOG reader facility */
MemSet(&private, 0, sizeof(XLogPageReadPrivate));
- xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
- &XLogPageRead, &private);
+ xlogreader =
+ XLogReaderAllocate(wal_segment_size, NULL,
+ XL_ROUTINE(.page_read = &XLogPageRead,
+ .segment_open = NULL,
+ .segment_close = wal_segment_close),
+ &private);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 79ff976474c..7cee8b92c90 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -71,7 +71,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
*/
XLogReaderState *
XLogReaderAllocate(int wal_segment_size, const char *waldir,
- XLogPageReadCB pagereadfunc, void *private_data)
+ XLogReaderRoutine *routine, void *private_data)
{
XLogReaderState *state;
@@ -81,6 +81,9 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
if (!state)
return NULL;
+ /* initialize caller-provided support functions */
+ state->routine = *routine;
+
state->max_block_id = -1;
/*
@@ -102,7 +105,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
waldir);
- state->read_page = pagereadfunc;
/* system_identifier initialized to zeroes above */
state->private_data = private_data;
/* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
@@ -137,7 +139,7 @@ XLogReaderFree(XLogReaderState *state)
int block_id;
if (state->seg.ws_file != -1)
- close(state->seg.ws_file);
+ state->routine.segment_close(state);
for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
{
@@ -250,7 +252,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
* XLogBeginRead() or XLogFindNextRecord() must be called before the first call
* to XLogReadRecord().
*
- * If the read_page callback fails to read the requested data, NULL is
+ * If the page_read callback fails to read the requested data, NULL is
* returned. The callback is expected to have reported the error; errormsg
* is set to NULL.
*
@@ -559,10 +561,10 @@ err:
/*
* Read a single xlog page including at least [pageptr, reqLen] of valid data
- * via the read_page() callback.
+ * via the page_read() callback.
*
* Returns -1 if the required page cannot be read for some reason; errormsg_buf
- * is set in that case (unless the error occurs in the read_page callback).
+ * is set in that case (unless the error occurs in the page_read callback).
*
* We fetch the page from a reader-local cache if we know we have the required
* data and if there hasn't been any error since caching the data.
@@ -589,7 +591,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
* Data is not in our buffer.
*
* Every time we actually read the segment, even if we looked at parts of
- * it before, we need to do verification as the read_page callback might
+ * it before, we need to do verification as the page_read callback might
* now be rereading data from a different source.
*
* Whenever switching to a new WAL segment, we read the first page of the
@@ -601,9 +603,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
{
XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
- readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
- state->currRecPtr,
- state->readBuf);
+ readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
+ state->currRecPtr,
+ state->readBuf);
if (readLen < 0)
goto err;
@@ -619,9 +621,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
* First, read the requested data length, but at least a short page header
* so that we can validate it.
*/
- readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
- state->currRecPtr,
- state->readBuf);
+ readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
+ state->currRecPtr,
+ state->readBuf);
if (readLen < 0)
goto err;
@@ -638,9 +640,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
/* still not enough */
if (readLen < XLogPageHeaderSize(hdr))
{
- readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
- state->currRecPtr,
- state->readBuf);
+ readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
+ state->currRecPtr,
+ state->readBuf);
if (readLen < 0)
goto err;
}
@@ -1041,11 +1043,14 @@ err:
#endif /* FRONTEND */
/*
+ * Helper function to ease writing of XLogRoutine->page_read callbacks.
+ * If this function is used, caller must supply an open_segment callback in
+ * 'state', as that is used here.
+ *
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
* fetched from timeline 'tli'.
*
- * 'seg/segcxt' identify the last segment used. 'openSegment' is a callback
- * to open the next segment, if necessary.
+ * 'seg/segcxt' identify the last segment used.
*
* Returns true if succeeded, false if an error occurs, in which case
* 'errinfo' receives error details.
@@ -1054,9 +1059,10 @@ err:
* WAL buffers when possible.
*/
bool
-WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
+WALRead(XLogReaderState *state,
+ char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
WALOpenSegment *seg, WALSegmentContext *segcxt,
- WALSegmentOpen openSegment, WALReadError *errinfo)
+ WALReadError *errinfo)
{
char *p;
XLogRecPtr recptr;
@@ -1086,10 +1092,11 @@ WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
XLogSegNo nextSegNo;
if (seg->ws_file >= 0)
- close(seg->ws_file);
+ state->routine.segment_close(state);
XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize);
- seg->ws_file = openSegment(nextSegNo, segcxt, &tli);
+ seg->ws_file = state->routine.segment_open(state, nextSegNo,
+ segcxt, &tli);
/* Update the current segment info. */
seg->ws_tli = tli;
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 6cb143e161d..bbd801513a8 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -783,10 +783,10 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
}
}
-/* openSegment callback for WALRead */
-static int
-wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt,
- TimeLineID *tli_p)
+/* XLogReaderRoutine->segment_open callback for local pg_wal files */
+int
+wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
+ WALSegmentContext *segcxt, TimeLineID *tli_p)
{
TimeLineID tli = *tli_p;
char path[MAXPGPATH];
@@ -811,8 +811,17 @@ wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt,
return -1; /* keep compiler quiet */
}
+/* stock XLogReaderRoutine->segment_close callback */
+void
+wal_segment_close(XLogReaderState *state)
+{
+ close(state->seg.ws_file);
+ /* need to check errno? */
+ state->seg.ws_file = -1;
+}
+
/*
- * read_page callback for reading local xlog files
+ * XLogReaderRoutine->page_read callback for reading local xlog files
*
* Public because it would likely be very helpful for someone writing another
* output method outside walsender, e.g. in a bgworker.
@@ -937,8 +946,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
* as 'count', read the whole page anyway. It's guaranteed to be
* zero-padded up to the page boundary if it's incomplete.
*/
- if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg,
- &state->segcxt, wal_segment_open, &errinfo))
+ if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli,
+ &state->seg, &state->segcxt,
+ &errinfo))
WALReadRaiseError(&errinfo);
/* number of valid bytes in the buffer */