diff options
author | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2013-05-08 20:10:17 +0300 |
---|---|---|
committer | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2013-05-08 20:30:17 +0300 |
commit | 2ffa66f4975c99e52984f7ee81b47d137b5b4751 (patch) | |
tree | d48756b7959f7bb63d9ea8513ff68e0dfa88d4ed /src/backend | |
parent | cb953d8b1bf7386ff20300cd80b29b7e8657dcbd (diff) | |
download | postgresql-2ffa66f4975c99e52984f7ee81b47d137b5b4751.tar.gz postgresql-2ffa66f4975c99e52984f7ee81b47d137b5b4751.zip |
Fix walsender failure at promotion.
If a standby server has a cascading standby server connected to it, it's
possible that WAL has already been sent up to the next WAL page boundary,
splitting a WAL record in the middle, when the first standby server is
promoted. Don't throw an assertion failure or error in walsender if that
happens.
Also, fix a variant of the same bug in pg_receivexlog: if it had already
received WAL on previous timeline up to a segment boundary, when the
upstream standby server is promoted so that the timeline switch record falls
on the previous segment, pg_receivexlog would miss the segment containing
the timeline switch. To fix that, have walsender send the position of the
timeline switch at end-of-streaming, in addition to the next timeline's ID.
It was previously assumed that the switch happened exactly where the
streaming stopped.
Note: this is an incompatible change in the streaming protocol. You might
get an error if you try to stream over timeline switches, if the client is
running 9.3beta1 and the server is more recent. It should be fine after a
reconnect, however.
Reported by Fujii Masao.
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/access/transam/xlog.c | 5 | ||||
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 7 | ||||
-rw-r--r-- | src/backend/replication/walreceiverfuncs.c | 7 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 60 |
4 files changed, 53 insertions, 26 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 959f4231873..f7dd61c4c75 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -9598,7 +9598,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, } else { - ptr = RecPtr; + ptr = tliRecPtr; tli = tliOfPointInHistory(tliRecPtr, expectedTLEs); if (curFileTLI > 0 && tli < curFileTLI) @@ -9607,7 +9607,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, tli, curFileTLI); } curFileTLI = tli; - RequestXLogStreaming(curFileTLI, ptr, PrimaryConnInfo); + RequestXLogStreaming(tli, ptr, PrimaryConnInfo); + receivedUpto = 0; } /* * Move to XLOG_FROM_STREAM state in either case. We'll get diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index e6e670e9e4b..f7cc6e3c2f5 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -224,8 +224,11 @@ libpqrcv_endstreaming(TimeLineID *next_tli) res = PQgetResult(streamConn); if (PQresultStatus(res) == PGRES_TUPLES_OK) { - /* Read the next timeline's ID */ - if (PQnfields(res) != 1 || PQntuples(res) != 1) + /* + * Read the next timeline's ID. The server also sends the timeline's + * starting point, but it is ignored. + */ + if (PQnfields(res) < 2 || PQntuples(res) != 1) ereport(ERROR, (errmsg("unexpected result set after end-of-streaming"))); *next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0); diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index d414808c9f2..e5ad84393fa 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -260,12 +260,13 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo) walrcv->startTime = now; /* - * If this is the first startup of walreceiver, we initialize receivedUpto - * and latestChunkStart to receiveStart. + * If this is the first startup of walreceiver (on this timeline), + * initialize receivedUpto and latestChunkStart to the starting point. */ - if (walrcv->receiveStart == 0) + if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli) { walrcv->receivedUpto = recptr; + walrcv->receivedTLI = tli; walrcv->latestChunkStart = recptr; } walrcv->receiveStart = recptr; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index c05bb1e0818..1dcb0f57f44 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -567,16 +567,21 @@ StartReplication(StartReplicationCmd *cmd) */ if (sendTimeLineIsHistoric) { - char str[11]; - snprintf(str, sizeof(str), "%u", sendTimeLineNextTLI); + char tli_str[11]; + char startpos_str[8+1+8+1]; - pq_beginmessage(&buf, 'T'); /* RowDescription */ - pq_sendint(&buf, 1, 2); /* 1 field */ + snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI); + snprintf(startpos_str, sizeof(startpos_str), "%X/%X", + (uint32) (sendTimeLineValidUpto >> 32), + (uint32) sendTimeLineValidUpto); + + pq_beginmessage(&buf, 'T'); /* RowDescription */ + pq_sendint(&buf, 2, 2); /* 2 fields */ /* Field header */ pq_sendstring(&buf, "next_tli"); - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ /* * int8 may seem like a surprising data type for this, but in theory * int4 would not be wide enough for this, as TimeLineID is unsigned. @@ -585,13 +590,26 @@ StartReplication(StartReplicationCmd *cmd) pq_sendint(&buf, -1, 2); pq_sendint(&buf, 0, 4); pq_sendint(&buf, 0, 2); + + pq_sendstring(&buf, "next_tli_startpos"); + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); + pq_sendint(&buf, 0, 4); + pq_sendint(&buf, 0, 2); pq_endmessage(&buf); /* Data row */ pq_beginmessage(&buf, 'D'); - pq_sendint(&buf, 1, 2); /* number of columns */ - pq_sendint(&buf, strlen(str), 4); /* length */ - pq_sendbytes(&buf, str, strlen(str)); + pq_sendint(&buf, 2, 2); /* number of columns */ + + pq_sendint(&buf, strlen(tli_str), 4); /* length */ + pq_sendbytes(&buf, tli_str, strlen(tli_str)); + + pq_sendint(&buf, strlen(startpos_str), 4); /* length */ + pq_sendbytes(&buf, startpos_str, strlen(startpos_str)); + pq_endmessage(&buf); } @@ -1462,19 +1480,10 @@ XLogSend(bool *caughtup) history = readTimeLineHistory(ThisTimeLineID); sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI); - Assert(sentPtr <= sendTimeLineValidUpto); + Assert(sendTimeLine < sendTimeLineNextTLI); list_free_deep(history); - /* the current send pointer should be <= the switchpoint */ - if (!(sentPtr <= sendTimeLineValidUpto)) - elog(ERROR, "server switched off timeline %u at %X/%X, but walsender already streamed up to %X/%X", - sendTimeLine, - (uint32) (sendTimeLineValidUpto >> 32), - (uint32) sendTimeLineValidUpto, - (uint32) (sentPtr >> 32), - (uint32) sentPtr); - sendTimeLineIsHistoric = true; SendRqstPtr = sendTimeLineValidUpto; @@ -1498,6 +1507,15 @@ XLogSend(bool *caughtup) /* * If this is a historic timeline and we've reached the point where we * forked to the next timeline, stop streaming. + * + * Note: We might already have sent WAL > sendTimeLineValidUpto. The + * startup process will normally replay all WAL that has been received from + * the master, before promoting, but if the WAL streaming is terminated at + * a WAL page boundary, the valid portion of the timeline might end in the + * middle of a WAL record. We might've already sent the first half of that + * partial WAL record to the cascading standby, so that sentPtr > + * sendTimeLineValidUpto. That's OK; the cascading standby can't replay the + * partial WAL record either, so it can still follow our timeline switch. */ if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) { @@ -1511,6 +1529,10 @@ XLogSend(bool *caughtup) streamingDoneSending = true; *caughtup = true; + + elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)", + (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto, + (uint32) (sentPtr >> 32), (uint32) sentPtr); return; } |