aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/access/transam/xlog.c5
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c7
-rw-r--r--src/backend/replication/walreceiverfuncs.c7
-rw-r--r--src/backend/replication/walsender.c60
4 files changed, 53 insertions, 26 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 959f4231873..f7dd61c4c75 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9598,7 +9598,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
}
else
{
- ptr = RecPtr;
+ ptr = tliRecPtr;
tli = tliOfPointInHistory(tliRecPtr, expectedTLEs);
if (curFileTLI > 0 && tli < curFileTLI)
@@ -9607,7 +9607,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
tli, curFileTLI);
}
curFileTLI = tli;
- RequestXLogStreaming(curFileTLI, ptr, PrimaryConnInfo);
+ RequestXLogStreaming(tli, ptr, PrimaryConnInfo);
+ receivedUpto = 0;
}
/*
* Move to XLOG_FROM_STREAM state in either case. We'll get
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index e6e670e9e4b..f7cc6e3c2f5 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -224,8 +224,11 @@ libpqrcv_endstreaming(TimeLineID *next_tli)
res = PQgetResult(streamConn);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
- /* Read the next timeline's ID */
- if (PQnfields(res) != 1 || PQntuples(res) != 1)
+ /*
+ * Read the next timeline's ID. The server also sends the timeline's
+ * starting point, but it is ignored.
+ */
+ if (PQnfields(res) < 2 || PQntuples(res) != 1)
ereport(ERROR,
(errmsg("unexpected result set after end-of-streaming")));
*next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0);
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index d414808c9f2..e5ad84393fa 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -260,12 +260,13 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo)
walrcv->startTime = now;
/*
- * If this is the first startup of walreceiver, we initialize receivedUpto
- * and latestChunkStart to receiveStart.
+ * If this is the first startup of walreceiver (on this timeline),
+ * initialize receivedUpto and latestChunkStart to the starting point.
*/
- if (walrcv->receiveStart == 0)
+ if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
{
walrcv->receivedUpto = recptr;
+ walrcv->receivedTLI = tli;
walrcv->latestChunkStart = recptr;
}
walrcv->receiveStart = recptr;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index c05bb1e0818..1dcb0f57f44 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -567,16 +567,21 @@ StartReplication(StartReplicationCmd *cmd)
*/
if (sendTimeLineIsHistoric)
{
- char str[11];
- snprintf(str, sizeof(str), "%u", sendTimeLineNextTLI);
+ char tli_str[11];
+ char startpos_str[8+1+8+1];
- pq_beginmessage(&buf, 'T'); /* RowDescription */
- pq_sendint(&buf, 1, 2); /* 1 field */
+ snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI);
+ snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
+ (uint32) (sendTimeLineValidUpto >> 32),
+ (uint32) sendTimeLineValidUpto);
+
+ pq_beginmessage(&buf, 'T'); /* RowDescription */
+ pq_sendint(&buf, 2, 2); /* 2 fields */
/* Field header */
pq_sendstring(&buf, "next_tli");
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
/*
* int8 may seem like a surprising data type for this, but in theory
* int4 would not be wide enough for this, as TimeLineID is unsigned.
@@ -585,13 +590,26 @@ StartReplication(StartReplicationCmd *cmd)
pq_sendint(&buf, -1, 2);
pq_sendint(&buf, 0, 4);
pq_sendint(&buf, 0, 2);
+
+ pq_sendstring(&buf, "next_tli_startpos");
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2);
+ pq_sendint(&buf, 0, 4);
+ pq_sendint(&buf, 0, 2);
pq_endmessage(&buf);
/* Data row */
pq_beginmessage(&buf, 'D');
- pq_sendint(&buf, 1, 2); /* number of columns */
- pq_sendint(&buf, strlen(str), 4); /* length */
- pq_sendbytes(&buf, str, strlen(str));
+ pq_sendint(&buf, 2, 2); /* number of columns */
+
+ pq_sendint(&buf, strlen(tli_str), 4); /* length */
+ pq_sendbytes(&buf, tli_str, strlen(tli_str));
+
+ pq_sendint(&buf, strlen(startpos_str), 4); /* length */
+ pq_sendbytes(&buf, startpos_str, strlen(startpos_str));
+
pq_endmessage(&buf);
}
@@ -1462,19 +1480,10 @@ XLogSend(bool *caughtup)
history = readTimeLineHistory(ThisTimeLineID);
sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
- Assert(sentPtr <= sendTimeLineValidUpto);
+
Assert(sendTimeLine < sendTimeLineNextTLI);
list_free_deep(history);
- /* the current send pointer should be <= the switchpoint */
- if (!(sentPtr <= sendTimeLineValidUpto))
- elog(ERROR, "server switched off timeline %u at %X/%X, but walsender already streamed up to %X/%X",
- sendTimeLine,
- (uint32) (sendTimeLineValidUpto >> 32),
- (uint32) sendTimeLineValidUpto,
- (uint32) (sentPtr >> 32),
- (uint32) sentPtr);
-
sendTimeLineIsHistoric = true;
SendRqstPtr = sendTimeLineValidUpto;
@@ -1498,6 +1507,15 @@ XLogSend(bool *caughtup)
/*
* If this is a historic timeline and we've reached the point where we
* forked to the next timeline, stop streaming.
+ *
+ * Note: We might already have sent WAL > sendTimeLineValidUpto. The
+ * startup process will normally replay all WAL that has been received from
+ * the master, before promoting, but if the WAL streaming is terminated at
+ * a WAL page boundary, the valid portion of the timeline might end in the
+ * middle of a WAL record. We might've already sent the first half of that
+ * partial WAL record to the cascading standby, so that sentPtr >
+ * sendTimeLineValidUpto. That's OK; the cascading standby can't replay the
+ * partial WAL record either, so it can still follow our timeline switch.
*/
if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
{
@@ -1511,6 +1529,10 @@ XLogSend(bool *caughtup)
streamingDoneSending = true;
*caughtup = true;
+
+ elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
+ (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
+ (uint32) (sentPtr >> 32), (uint32) sentPtr);
return;
}