aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bin/pg_basebackup/pg_basebackup.c18
-rw-r--r--src/bin/pg_basebackup/pg_receivexlog.c34
-rw-r--r--src/bin/pg_basebackup/receivelog.c187
-rw-r--r--src/bin/pg_basebackup/receivelog.h33
4 files changed, 136 insertions, 136 deletions
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index ab9692a6d56..94852877a2d 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -372,10 +372,20 @@ typedef struct
static int
LogStreamerMain(logstreamer_param *param)
{
- if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
- param->sysidentifier, param->xlogdir,
- reached_end_position, standby_message_timeout,
- NULL, false, true))
+ StreamCtl stream;
+
+ MemSet(&stream, sizeof(stream), 0);
+ stream.startpos = param->startptr;
+ stream.timeline = param->timeline;
+ stream.sysidentifier = param->sysidentifier;
+ stream.stream_stop = reached_end_position;
+ stream.standby_message_timeout = standby_message_timeout;
+ stream.synchronous = false;
+ stream.mark_done = true;
+ stream.basedir = param->xlogdir;
+ stream.partial_suffix = NULL;
+
+ if (!ReceiveXlogStream(param->bgconn, &stream))
/*
* Any errors will already have been reported in the function process,
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index f96b547b0f3..7f7ee9dc9ba 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -276,10 +276,11 @@ FindStreamingStart(uint32 *tli)
static void
StreamLog(void)
{
- XLogRecPtr startpos,
- serverpos;
- TimeLineID starttli,
- servertli;
+ XLogRecPtr serverpos;
+ TimeLineID servertli;
+ StreamCtl stream;
+
+ MemSet(&stream, 0, sizeof(stream));
/*
* Connect in replication mode to the server
@@ -311,17 +312,17 @@ StreamLog(void)
/*
* Figure out where to start streaming.
*/
- startpos = FindStreamingStart(&starttli);
- if (startpos == InvalidXLogRecPtr)
+ stream.startpos = FindStreamingStart(&stream.timeline);
+ if (stream.startpos == InvalidXLogRecPtr)
{
- startpos = serverpos;
- starttli = servertli;
+ stream.startpos = serverpos;
+ stream.timeline = servertli;
}
/*
* Always start streaming at the beginning of a segment
*/
- startpos -= startpos % XLOG_SEG_SIZE;
+ stream.startpos -= stream.startpos % XLOG_SEG_SIZE;
/*
* Start the replication
@@ -329,12 +330,17 @@ StreamLog(void)
if (verbose)
fprintf(stderr,
_("%s: starting log streaming at %X/%X (timeline %u)\n"),
- progname, (uint32) (startpos >> 32), (uint32) startpos,
- starttli);
+ progname, (uint32) (stream.startpos >> 32), (uint32) stream.startpos,
+ stream.timeline);
+
+ stream.stream_stop = stop_streaming;
+ stream.standby_message_timeout = standby_message_timeout;
+ stream.synchronous = synchronous;
+ stream.mark_done = false;
+ stream.basedir = basedir;
+ stream.partial_suffix = ".partial";
- ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
- stop_streaming, standby_message_timeout, ".partial",
- synchronous, false);
+ ReceiveXlogStream(conn, &stream);
PQfinish(conn);
conn = NULL;
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 01c42fc0639..595213f0420 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -33,27 +33,18 @@ static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
static bool still_sending = true; /* feedback still needs to be sent? */
-static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
- uint32 timeline, char *basedir,
- stream_stop_callback stream_stop, int standby_message_timeout,
- char *partial_suffix, XLogRecPtr *stoppos,
- bool synchronous, bool mark_done);
+static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
+ XLogRecPtr *stoppos);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
XLogRecPtr blockpos, int64 *last_status);
-static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
- XLogRecPtr *blockpos, uint32 timeline,
- char *basedir, stream_stop_callback stream_stop,
- char *partial_suffix, bool mark_done);
-static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
- XLogRecPtr blockpos, char *basedir, char *partial_suffix,
- XLogRecPtr *stoppos, bool mark_done);
-static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
- uint32 timeline, char *basedir,
- stream_stop_callback stream_stop,
- char *partial_suffix, XLogRecPtr *stoppos,
- bool mark_done);
+static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
+ XLogRecPtr *blockpos);
+static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
+ XLogRecPtr blockpos, XLogRecPtr *stoppos);
+static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
+ XLogRecPtr *stoppos);
static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
int64 last_status);
@@ -99,8 +90,7 @@ mark_file_as_archived(const char *basedir, const char *fname)
* partial_suffix) is stored in current_walfile_name.
*/
static bool
-open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
- char *partial_suffix)
+open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
{
int f;
char fn[MAXPGPATH];
@@ -110,10 +100,10 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
XLogSegNo segno;
XLByteToSeg(startpoint, segno);
- XLogFileName(current_walfile_name, timeline, segno);
+ XLogFileName(current_walfile_name, stream->timeline, segno);
- snprintf(fn, sizeof(fn), "%s/%s%s", basedir, current_walfile_name,
- partial_suffix ? partial_suffix : "");
+ snprintf(fn, sizeof(fn), "%s/%s%s", stream->basedir, current_walfile_name,
+ stream->partial_suffix ? stream->partial_suffix : "");
f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (f == -1)
{
@@ -185,7 +175,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
* and returns false, otherwise returns true.
*/
static bool
-close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_done)
+close_walfile(StreamCtl *stream, XLogRecPtr pos)
{
off_t currpos;
@@ -220,13 +210,13 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
/*
* If we finished writing a .partial file, rename it into place.
*/
- if (currpos == XLOG_SEG_SIZE && partial_suffix)
+ if (currpos == XLOG_SEG_SIZE && stream->partial_suffix)
{
char oldfn[MAXPGPATH];
char newfn[MAXPGPATH];
- snprintf(oldfn, sizeof(oldfn), "%s/%s%s", basedir, current_walfile_name, partial_suffix);
- snprintf(newfn, sizeof(newfn), "%s/%s", basedir, current_walfile_name);
+ snprintf(oldfn, sizeof(oldfn), "%s/%s%s", stream->basedir, current_walfile_name, stream->partial_suffix);
+ snprintf(newfn, sizeof(newfn), "%s/%s", stream->basedir, current_walfile_name);
if (rename(oldfn, newfn) != 0)
{
fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"),
@@ -234,10 +224,10 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
return false;
}
}
- else if (partial_suffix)
+ else if (stream->partial_suffix)
fprintf(stderr,
_("%s: not renaming \"%s%s\", segment is not complete\n"),
- progname, current_walfile_name, partial_suffix);
+ progname, current_walfile_name, stream->partial_suffix);
/*
* Mark file as archived if requested by the caller - pg_basebackup needs
@@ -245,10 +235,10 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
* new node. This is in line with walreceiver.c always doing a
* XLogArchiveForceDone() after a complete segment.
*/
- if (currpos == XLOG_SEG_SIZE && mark_done)
+ if (currpos == XLOG_SEG_SIZE && stream->mark_done)
{
/* writes error message if failed */
- if (!mark_file_as_archived(basedir, current_walfile_name))
+ if (!mark_file_as_archived(stream->basedir, current_walfile_name))
return false;
}
@@ -261,7 +251,7 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
* Check if a timeline history file exists.
*/
static bool
-existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
+existsTimeLineHistoryFile(StreamCtl *stream)
{
char path[MAXPGPATH];
char histfname[MAXFNAMELEN];
@@ -271,12 +261,12 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
* Timeline 1 never has a history file. We treat that as if it existed,
* since we never need to stream it.
*/
- if (tli == 1)
+ if (stream->timeline == 1)
return true;
- TLHistoryFileName(histfname, tli);
+ TLHistoryFileName(histfname, stream->timeline);
- snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
+ snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
fd = open(path, O_RDONLY | PG_BINARY, 0);
if (fd < 0)
@@ -294,8 +284,7 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
}
static bool
-writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
- char *content, bool mark_done)
+writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
{
int size = strlen(content);
char path[MAXPGPATH];
@@ -307,15 +296,15 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
* Check that the server's idea of how timeline history files should be
* named matches ours.
*/
- TLHistoryFileName(histfname, tli);
+ TLHistoryFileName(histfname, stream->timeline);
if (strcmp(histfname, filename) != 0)
{
fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"),
- progname, tli, filename);
+ progname, stream->timeline, filename);
return false;
}
- snprintf(path, sizeof(path), "%s/%s", basedir, histfname);
+ snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
/*
* Write into a temp file name.
@@ -375,10 +364,10 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
}
/* Maintain archive_status, check close_walfile() for details. */
- if (mark_done)
+ if (stream->mark_done)
{
/* writes error message if failed */
- if (!mark_file_as_archived(basedir, histfname))
+ if (!mark_file_as_archived(stream->basedir, histfname))
return false;
}
@@ -468,6 +457,8 @@ CheckServerVersionForStreaming(PGconn *conn)
/*
* Receive a log stream starting at the specified position.
*
+ * Individual parameters are passed through the StreamCtl structure.
+ *
* If sysidentifier is specified, validate that both the system
* identifier and the timeline matches the specified ones
* (by sending an extra IDENTIFY_SYSTEM command)
@@ -498,11 +489,7 @@ CheckServerVersionForStreaming(PGconn *conn)
* Note: The log position *must* be at a log segment start!
*/
bool
-ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
- char *sysidentifier, char *basedir,
- stream_stop_callback stream_stop,
- int standby_message_timeout, char *partial_suffix,
- bool synchronous, bool mark_done)
+ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
{
char query[128];
char slotcmd[128];
@@ -539,7 +526,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
slotcmd[0] = 0;
}
- if (sysidentifier != NULL)
+ if (stream->sysidentifier != NULL)
{
/* Validate system identifier hasn't changed */
res = PQexec(conn, "IDENTIFY_SYSTEM");
@@ -559,7 +546,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
PQclear(res);
return false;
}
- if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0)
+ if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
{
fprintf(stderr,
_("%s: system identifier does not match between base backup and streaming connection\n"),
@@ -567,11 +554,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
PQclear(res);
return false;
}
- if (timeline > atoi(PQgetvalue(res, 0, 1)))
+ if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
{
fprintf(stderr,
_("%s: starting timeline %u is not present in the server\n"),
- progname, timeline);
+ progname, stream->timeline);
PQclear(res);
return false;
}
@@ -582,7 +569,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* initialize flush position to starting point, it's the caller's
* responsibility that that's sane.
*/
- lastFlushPosition = startpos;
+ lastFlushPosition = stream->startpos;
while (1)
{
@@ -590,9 +577,9 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Fetch the timeline history file for this timeline, if we don't have
* it already.
*/
- if (!existsTimeLineHistoryFile(basedir, timeline))
+ if (!existsTimeLineHistoryFile(stream))
{
- snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline);
+ snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
@@ -615,10 +602,9 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
}
/* Write the history file to disk */
- writeTimeLineHistoryFile(basedir, timeline,
+ writeTimeLineHistoryFile(stream,
PQgetvalue(res, 0, 0),
- PQgetvalue(res, 0, 1),
- mark_done);
+ PQgetvalue(res, 0, 1));
PQclear(res);
}
@@ -627,14 +613,14 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Before we start streaming from the requested location, check if the
* callback tells us to stop here.
*/
- if (stream_stop(startpos, timeline, false))
+ if (stream->stream_stop(stream->startpos, stream->timeline, false))
return true;
/* Initiate the replication stream at specified location */
snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
slotcmd,
- (uint32) (startpos >> 32), (uint32) startpos,
- timeline);
+ (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
+ stream->timeline);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
@@ -646,9 +632,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
PQclear(res);
/* Stream the WAL */
- res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
- standby_message_timeout, partial_suffix,
- &stoppos, synchronous, mark_done);
+ res = HandleCopyStream(conn, stream, &stoppos);
if (res == NULL)
goto error;
@@ -676,26 +660,26 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
uint32 newtimeline;
bool parsed;
- parsed = ReadEndOfStreamingResult(res, &startpos, &newtimeline);
+ parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
PQclear(res);
if (!parsed)
goto error;
/* Sanity check the values the server gave us */
- if (newtimeline <= timeline)
+ if (newtimeline <= stream->timeline)
{
fprintf(stderr,
_("%s: server reported unexpected next timeline %u, following timeline %u\n"),
- progname, newtimeline, timeline);
+ progname, newtimeline, stream->timeline);
goto error;
}
- if (startpos > stoppos)
+ if (stream->startpos > stoppos)
{
fprintf(stderr,
_("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
progname,
- timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
- newtimeline, (uint32) (startpos >> 32), (uint32) startpos);
+ stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
+ newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
goto error;
}
@@ -715,8 +699,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Loop back to start streaming from the new timeline. Always
* start streaming at the beginning of a segment.
*/
- timeline = newtimeline;
- startpos = startpos - (startpos % XLOG_SEG_SIZE);
+ stream->timeline = newtimeline;
+ stream->startpos = stream->startpos - (stream->startpos % XLOG_SEG_SIZE);
continue;
}
else if (PQresultStatus(res) == PGRES_COMMAND_OK)
@@ -729,7 +713,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Check if the callback thinks it's OK to stop here. If not,
* complain.
*/
- if (stream_stop(stoppos, timeline, false))
+ if (stream->stream_stop(stoppos, stream->timeline, false))
return true;
else
{
@@ -810,14 +794,12 @@ ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
* On any other sort of error, returns NULL.
*/
static PGresult *
-HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
- char *basedir, stream_stop_callback stream_stop,
- int standby_message_timeout, char *partial_suffix,
- XLogRecPtr *stoppos, bool synchronous, bool mark_done)
+HandleCopyStream(PGconn *conn, StreamCtl *stream,
+ XLogRecPtr *stoppos)
{
char *copybuf = NULL;
int64 last_status = -1;
- XLogRecPtr blockpos = startpos;
+ XLogRecPtr blockpos = stream->startpos;
still_sending = true;
@@ -830,9 +812,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/*
* Check if we should continue streaming, or abort at this point.
*/
- if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
- stream_stop, partial_suffix, stoppos,
- mark_done))
+ if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
goto error;
now = feGetCurrentTimestamp();
@@ -841,7 +821,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* If synchronous option is true, issue sync command as soon as there
* are WAL data which has not been flushed yet.
*/
- if (synchronous && lastFlushPosition < blockpos && walfile != -1)
+ if (stream->synchronous && lastFlushPosition < blockpos && walfile != -1)
{
if (fsync(walfile) != 0)
{
@@ -863,9 +843,9 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/*
* Potentially send a status message to the master
*/
- if (still_sending && standby_message_timeout > 0 &&
+ if (still_sending && stream->standby_message_timeout > 0 &&
feTimestampDifferenceExceeds(last_status, now,
- standby_message_timeout))
+ stream->standby_message_timeout))
{
/* Time to send feedback! */
if (!sendFeedback(conn, blockpos, now, false))
@@ -876,7 +856,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/*
* Calculate how long send/receive loops should sleep
*/
- sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
+ sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
last_status);
r = CopyStreamReceive(conn, sleeptime, &copybuf);
@@ -886,9 +866,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
goto error;
if (r == -2)
{
- PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
- basedir, partial_suffix,
- stoppos, mark_done);
+ PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
if (res == NULL)
goto error;
@@ -905,18 +883,14 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
}
else if (copybuf[0] == 'w')
{
- if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
- timeline, basedir, stream_stop,
- partial_suffix, mark_done))
+ if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
goto error;
/*
* Check if we should continue streaming, or abort at this
* point.
*/
- if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
- stream_stop, partial_suffix, stoppos,
- mark_done))
+ if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
goto error;
}
else
@@ -1114,10 +1088,8 @@ ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
* Process XLogData message.
*/
static bool
-ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
- XLogRecPtr *blockpos, uint32 timeline,
- char *basedir, stream_stop_callback stream_stop,
- char *partial_suffix, bool mark_done)
+ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
+ XLogRecPtr *blockpos)
{
int xlogoff;
int bytes_left;
@@ -1197,8 +1169,7 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
if (walfile == -1)
{
- if (!open_walfile(*blockpos, timeline,
- basedir, partial_suffix))
+ if (!open_walfile(stream, *blockpos))
{
/* Error logged by open_walfile */
return false;
@@ -1225,13 +1196,13 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
/* Did we reach the end of a WAL segment? */
if (*blockpos % XLOG_SEG_SIZE == 0)
{
- if (!close_walfile(basedir, partial_suffix, *blockpos, mark_done))
+ if (!close_walfile(stream, *blockpos))
/* Error message written in close_walfile() */
return false;
xlogoff = 0;
- if (still_sending && stream_stop(*blockpos, timeline, true))
+ if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
{
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
@@ -1253,9 +1224,8 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
* Handle end of the copy stream.
*/
static PGresult *
-HandleEndOfCopyStream(PGconn *conn, char *copybuf,
- XLogRecPtr blockpos, char *basedir, char *partial_suffix,
- XLogRecPtr *stoppos, bool mark_done)
+HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
+ XLogRecPtr blockpos, XLogRecPtr *stoppos)
{
PGresult *res = PQgetResult(conn);
@@ -1266,7 +1236,7 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
*/
if (still_sending)
{
- if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
+ if (!close_walfile(stream, blockpos))
{
/* Error message written in close_walfile() */
PQclear(res);
@@ -1296,13 +1266,12 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
* Check if we should continue streaming, or abort at this point.
*/
static bool
-CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
- char *basedir, stream_stop_callback stream_stop,
- char *partial_suffix, XLogRecPtr *stoppos, bool mark_done)
+CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
+ XLogRecPtr *stoppos)
{
- if (still_sending && stream_stop(blockpos, timeline, false))
+ if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
{
- if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
+ if (!close_walfile(stream, blockpos))
{
/* Potential error message is written by close_walfile */
return false;
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
index 8d4dbf285b4..554ff8b5b28 100644
--- a/src/bin/pg_basebackup/receivelog.h
+++ b/src/bin/pg_basebackup/receivelog.h
@@ -22,16 +22,31 @@
*/
typedef bool (*stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished);
+/*
+ * Global parameters when receiving xlog stream. For details about the individual fields,
+ * see the function comment for ReceiveXlogStream().
+ */
+typedef struct StreamCtl
+{
+ XLogRecPtr startpos; /* Start position for streaming */
+ TimeLineID timeline; /* Timeline to stream data from */
+ char *sysidentifier; /* Validate this system identifier and
+ * timeline */
+ int standby_message_timeout; /* Send status messages this
+ * often */
+ bool synchronous; /* Flush data on write */
+ bool mark_done; /* Mark segment as done in generated archive */
+
+ stream_stop_callback stream_stop; /* Stop streaming when returns true */
+
+ char *basedir; /* Received segments written to this dir */
+ char *partial_suffix; /* Suffix appended to partially received files */
+} StreamCtl;
+
+
+
extern bool CheckServerVersionForStreaming(PGconn *conn);
extern bool ReceiveXlogStream(PGconn *conn,
- XLogRecPtr startpos,
- uint32 timeline,
- char *sysidentifier,
- char *basedir,
- stream_stop_callback stream_stop,
- int standby_message_timeout,
- char *partial_suffix,
- bool synchronous,
- bool mark_done);
+ StreamCtl *stream);
#endif /* RECEIVELOG_H */