diff options
author | Alvaro Herrera <alvherre@alvh.no-ip.org> | 2020-05-08 15:30:34 -0400 |
---|---|---|
committer | Alvaro Herrera <alvherre@alvh.no-ip.org> | 2020-05-08 15:40:11 -0400 |
commit | b060dbe0001a1d6bf26cd294710f3cb203868d46 (patch) | |
tree | 6e9e980aa63ec1ec3655b93c92b9b5caa6689d38 /src/backend/access | |
parent | 871696ba20e0251e86041576373809d1c7ca161d (diff) | |
download | postgresql-b060dbe0001a1d6bf26cd294710f3cb203868d46.tar.gz postgresql-b060dbe0001a1d6bf26cd294710f3cb203868d46.zip |
Rework XLogReader callback system
Code review for 0dc8ead46363, prompted by a bug closed by 91c40548d5f7.
XLogReader's system for opening and closing segments had gotten too
complicated, with callbacks being passed at both the XLogReaderAllocate
level (read_page) as well as at the WALRead level (segment_open). This
was confusing and hard to follow, so restructure things so that these
callbacks are passed together at XLogReaderAllocate time, and add
another callback to the set (segment_close) to make it a coherent whole.
Also, ensure XLogReaderState is an argument to all the callbacks, so
that they can grab at the ->private data if necessary.
Document the whole arrangement more clearly.
Author: Álvaro Herrera <alvherre@alvh.no-ip.org>
Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Discussion: https://postgr.es/m/20200422175754.GA19858@alvherre.pgsql
Diffstat (limited to 'src/backend/access')
-rw-r--r-- | src/backend/access/transam/twophase.c | 5 | ||||
-rw-r--r-- | src/backend/access/transam/xlog.c | 10 | ||||
-rw-r--r-- | src/backend/access/transam/xlogreader.c | 51 | ||||
-rw-r--r-- | src/backend/access/transam/xlogutils.c | 24 |
4 files changed, 57 insertions, 33 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 2f7d4ed59a8..e1904877faa 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1331,7 +1331,10 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) char *errormsg; xlogreader = XLogReaderAllocate(wal_segment_size, NULL, - &read_local_xlog_page, NULL); + XL_ROUTINE(.page_read = &read_local_xlog_page, + .segment_open = &wal_segment_open, + .segment_close = &wal_segment_close), + NULL); if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 0d3d6709284..a53e6d96334 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1223,7 +1223,7 @@ XLogInsertRecord(XLogRecData *rdata, if (!debug_reader) debug_reader = XLogReaderAllocate(wal_segment_size, NULL, - NULL, NULL); + XL_ROUTINE(), NULL); if (!debug_reader) { @@ -6478,8 +6478,12 @@ StartupXLOG(void) /* Set up XLOG reader facility */ MemSet(&private, 0, sizeof(XLogPageReadPrivate)); - xlogreader = XLogReaderAllocate(wal_segment_size, NULL, - &XLogPageRead, &private); + xlogreader = + XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &XLogPageRead, + .segment_open = NULL, + .segment_close = wal_segment_close), + &private); if (!xlogreader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 79ff976474c..7cee8b92c90 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -71,7 +71,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) */ XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, - XLogPageReadCB pagereadfunc, void *private_data) + XLogReaderRoutine *routine, void *private_data) { XLogReaderState *state; @@ -81,6 +81,9 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, if (!state) return NULL; + /* initialize caller-provided support functions */ + state->routine = *routine; + state->max_block_id = -1; /* @@ -102,7 +105,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size, waldir); - state->read_page = pagereadfunc; /* system_identifier initialized to zeroes above */ state->private_data = private_data; /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */ @@ -137,7 +139,7 @@ XLogReaderFree(XLogReaderState *state) int block_id; if (state->seg.ws_file != -1) - close(state->seg.ws_file); + state->routine.segment_close(state); for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++) { @@ -250,7 +252,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) * XLogBeginRead() or XLogFindNextRecord() must be called before the first call * to XLogReadRecord(). * - * If the read_page callback fails to read the requested data, NULL is + * If the page_read callback fails to read the requested data, NULL is * returned. The callback is expected to have reported the error; errormsg * is set to NULL. * @@ -559,10 +561,10 @@ err: /* * Read a single xlog page including at least [pageptr, reqLen] of valid data - * via the read_page() callback. + * via the page_read() callback. * * Returns -1 if the required page cannot be read for some reason; errormsg_buf - * is set in that case (unless the error occurs in the read_page callback). + * is set in that case (unless the error occurs in the page_read callback). * * We fetch the page from a reader-local cache if we know we have the required * data and if there hasn't been any error since caching the data. @@ -589,7 +591,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) * Data is not in our buffer. * * Every time we actually read the segment, even if we looked at parts of - * it before, we need to do verification as the read_page callback might + * it before, we need to do verification as the page_read callback might * now be rereading data from a different source. * * Whenever switching to a new WAL segment, we read the first page of the @@ -601,9 +603,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) { XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; - readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ, - state->currRecPtr, - state->readBuf); + readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ, + state->currRecPtr, + state->readBuf); if (readLen < 0) goto err; @@ -619,9 +621,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) * First, read the requested data length, but at least a short page header * so that we can validate it. */ - readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), - state->currRecPtr, - state->readBuf); + readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), + state->currRecPtr, + state->readBuf); if (readLen < 0) goto err; @@ -638,9 +640,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) /* still not enough */ if (readLen < XLogPageHeaderSize(hdr)) { - readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr), - state->currRecPtr, - state->readBuf); + readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr), + state->currRecPtr, + state->readBuf); if (readLen < 0) goto err; } @@ -1041,11 +1043,14 @@ err: #endif /* FRONTEND */ /* + * Helper function to ease writing of XLogRoutine->page_read callbacks. + * If this function is used, caller must supply an open_segment callback in + * 'state', as that is used here. + * * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL * fetched from timeline 'tli'. * - * 'seg/segcxt' identify the last segment used. 'openSegment' is a callback - * to open the next segment, if necessary. + * 'seg/segcxt' identify the last segment used. * * Returns true if succeeded, false if an error occurs, in which case * 'errinfo' receives error details. @@ -1054,9 +1059,10 @@ err: * WAL buffers when possible. */ bool -WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, +WALRead(XLogReaderState *state, + char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALOpenSegment *seg, WALSegmentContext *segcxt, - WALSegmentOpen openSegment, WALReadError *errinfo) + WALReadError *errinfo) { char *p; XLogRecPtr recptr; @@ -1086,10 +1092,11 @@ WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, XLogSegNo nextSegNo; if (seg->ws_file >= 0) - close(seg->ws_file); + state->routine.segment_close(state); XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize); - seg->ws_file = openSegment(nextSegNo, segcxt, &tli); + seg->ws_file = state->routine.segment_open(state, nextSegNo, + segcxt, &tli); /* Update the current segment info. */ seg->ws_tli = tli; diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 6cb143e161d..bbd801513a8 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -783,10 +783,10 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa } } -/* openSegment callback for WALRead */ -static int -wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt, - TimeLineID *tli_p) +/* XLogReaderRoutine->segment_open callback for local pg_wal files */ +int +wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, + WALSegmentContext *segcxt, TimeLineID *tli_p) { TimeLineID tli = *tli_p; char path[MAXPGPATH]; @@ -811,8 +811,17 @@ wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt, return -1; /* keep compiler quiet */ } +/* stock XLogReaderRoutine->segment_close callback */ +void +wal_segment_close(XLogReaderState *state) +{ + close(state->seg.ws_file); + /* need to check errno? */ + state->seg.ws_file = -1; +} + /* - * read_page callback for reading local xlog files + * XLogReaderRoutine->page_read callback for reading local xlog files * * Public because it would likely be very helpful for someone writing another * output method outside walsender, e.g. in a bgworker. @@ -937,8 +946,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, * as 'count', read the whole page anyway. It's guaranteed to be * zero-padded up to the page boundary if it's incomplete. */ - if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg, - &state->segcxt, wal_segment_open, &errinfo)) + if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli, + &state->seg, &state->segcxt, + &errinfo)) WALReadRaiseError(&errinfo); /* number of valid bytes in the buffer */ |