aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>2013-05-08 20:10:17 +0300
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>2013-05-08 20:30:17 +0300
commit2ffa66f4975c99e52984f7ee81b47d137b5b4751 (patch)
treed48756b7959f7bb63d9ea8513ff68e0dfa88d4ed /src/backend
parentcb953d8b1bf7386ff20300cd80b29b7e8657dcbd (diff)
downloadpostgresql-2ffa66f4975c99e52984f7ee81b47d137b5b4751.tar.gz
postgresql-2ffa66f4975c99e52984f7ee81b47d137b5b4751.zip
Fix walsender failure at promotion.
If a standby server has a cascading standby server connected to it, it's possible that WAL has already been sent up to the next WAL page boundary, splitting a WAL record in the middle, when the first standby server is promoted. Don't throw an assertion failure or error in walsender if that happens. Also, fix a variant of the same bug in pg_receivexlog: if it had already received WAL on previous timeline up to a segment boundary, when the upstream standby server is promoted so that the timeline switch record falls on the previous segment, pg_receivexlog would miss the segment containing the timeline switch. To fix that, have walsender send the position of the timeline switch at end-of-streaming, in addition to the next timeline's ID. It was previously assumed that the switch happened exactly where the streaming stopped. Note: this is an incompatible change in the streaming protocol. You might get an error if you try to stream over timeline switches, if the client is running 9.3beta1 and the server is more recent. It should be fine after a reconnect, however. Reported by Fujii Masao.
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/access/transam/xlog.c5
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c7
-rw-r--r--src/backend/replication/walreceiverfuncs.c7
-rw-r--r--src/backend/replication/walsender.c60
4 files changed, 53 insertions, 26 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 959f4231873..f7dd61c4c75 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9598,7 +9598,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
}
else
{
- ptr = RecPtr;
+ ptr = tliRecPtr;
tli = tliOfPointInHistory(tliRecPtr, expectedTLEs);
if (curFileTLI > 0 && tli < curFileTLI)
@@ -9607,7 +9607,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
tli, curFileTLI);
}
curFileTLI = tli;
- RequestXLogStreaming(curFileTLI, ptr, PrimaryConnInfo);
+ RequestXLogStreaming(tli, ptr, PrimaryConnInfo);
+ receivedUpto = 0;
}
/*
* Move to XLOG_FROM_STREAM state in either case. We'll get
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index e6e670e9e4b..f7cc6e3c2f5 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -224,8 +224,11 @@ libpqrcv_endstreaming(TimeLineID *next_tli)
res = PQgetResult(streamConn);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
- /* Read the next timeline's ID */
- if (PQnfields(res) != 1 || PQntuples(res) != 1)
+ /*
+ * Read the next timeline's ID. The server also sends the timeline's
+ * starting point, but it is ignored.
+ */
+ if (PQnfields(res) < 2 || PQntuples(res) != 1)
ereport(ERROR,
(errmsg("unexpected result set after end-of-streaming")));
*next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0);
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index d414808c9f2..e5ad84393fa 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -260,12 +260,13 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo)
walrcv->startTime = now;
/*
- * If this is the first startup of walreceiver, we initialize receivedUpto
- * and latestChunkStart to receiveStart.
+ * If this is the first startup of walreceiver (on this timeline),
+ * initialize receivedUpto and latestChunkStart to the starting point.
*/
- if (walrcv->receiveStart == 0)
+ if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
{
walrcv->receivedUpto = recptr;
+ walrcv->receivedTLI = tli;
walrcv->latestChunkStart = recptr;
}
walrcv->receiveStart = recptr;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index c05bb1e0818..1dcb0f57f44 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -567,16 +567,21 @@ StartReplication(StartReplicationCmd *cmd)
*/
if (sendTimeLineIsHistoric)
{
- char str[11];
- snprintf(str, sizeof(str), "%u", sendTimeLineNextTLI);
+ char tli_str[11];
+ char startpos_str[8+1+8+1];
- pq_beginmessage(&buf, 'T'); /* RowDescription */
- pq_sendint(&buf, 1, 2); /* 1 field */
+ snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI);
+ snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
+ (uint32) (sendTimeLineValidUpto >> 32),
+ (uint32) sendTimeLineValidUpto);
+
+ pq_beginmessage(&buf, 'T'); /* RowDescription */
+ pq_sendint(&buf, 2, 2); /* 2 fields */
/* Field header */
pq_sendstring(&buf, "next_tli");
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
/*
* int8 may seem like a surprising data type for this, but in theory
* int4 would not be wide enough for this, as TimeLineID is unsigned.
@@ -585,13 +590,26 @@ StartReplication(StartReplicationCmd *cmd)
pq_sendint(&buf, -1, 2);
pq_sendint(&buf, 0, 4);
pq_sendint(&buf, 0, 2);
+
+ pq_sendstring(&buf, "next_tli_startpos");
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2);
+ pq_sendint(&buf, 0, 4);
+ pq_sendint(&buf, 0, 2);
pq_endmessage(&buf);
/* Data row */
pq_beginmessage(&buf, 'D');
- pq_sendint(&buf, 1, 2); /* number of columns */
- pq_sendint(&buf, strlen(str), 4); /* length */
- pq_sendbytes(&buf, str, strlen(str));
+ pq_sendint(&buf, 2, 2); /* number of columns */
+
+ pq_sendint(&buf, strlen(tli_str), 4); /* length */
+ pq_sendbytes(&buf, tli_str, strlen(tli_str));
+
+ pq_sendint(&buf, strlen(startpos_str), 4); /* length */
+ pq_sendbytes(&buf, startpos_str, strlen(startpos_str));
+
pq_endmessage(&buf);
}
@@ -1462,19 +1480,10 @@ XLogSend(bool *caughtup)
history = readTimeLineHistory(ThisTimeLineID);
sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
- Assert(sentPtr <= sendTimeLineValidUpto);
+
Assert(sendTimeLine < sendTimeLineNextTLI);
list_free_deep(history);
- /* the current send pointer should be <= the switchpoint */
- if (!(sentPtr <= sendTimeLineValidUpto))
- elog(ERROR, "server switched off timeline %u at %X/%X, but walsender already streamed up to %X/%X",
- sendTimeLine,
- (uint32) (sendTimeLineValidUpto >> 32),
- (uint32) sendTimeLineValidUpto,
- (uint32) (sentPtr >> 32),
- (uint32) sentPtr);
-
sendTimeLineIsHistoric = true;
SendRqstPtr = sendTimeLineValidUpto;
@@ -1498,6 +1507,15 @@ XLogSend(bool *caughtup)
/*
* If this is a historic timeline and we've reached the point where we
* forked to the next timeline, stop streaming.
+ *
+ * Note: We might already have sent WAL > sendTimeLineValidUpto. The
+ * startup process will normally replay all WAL that has been received from
+ * the master, before promoting, but if the WAL streaming is terminated at
+ * a WAL page boundary, the valid portion of the timeline might end in the
+ * middle of a WAL record. We might've already sent the first half of that
+ * partial WAL record to the cascading standby, so that sentPtr >
+ * sendTimeLineValidUpto. That's OK; the cascading standby can't replay the
+ * partial WAL record either, so it can still follow our timeline switch.
*/
if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
{
@@ -1511,6 +1529,10 @@ XLogSend(bool *caughtup)
streamingDoneSending = true;
*caughtup = true;
+
+ elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
+ (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
+ (uint32) (sentPtr >> 32), (uint32) sentPtr);
return;
}