diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/access/transam/xlog.c | 5 | ||||
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 7 | ||||
-rw-r--r-- | src/backend/replication/walreceiverfuncs.c | 7 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 60 |
4 files changed, 53 insertions, 26 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 959f4231873..f7dd61c4c75 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -9598,7 +9598,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, } else { - ptr = RecPtr; + ptr = tliRecPtr; tli = tliOfPointInHistory(tliRecPtr, expectedTLEs); if (curFileTLI > 0 && tli < curFileTLI) @@ -9607,7 +9607,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, tli, curFileTLI); } curFileTLI = tli; - RequestXLogStreaming(curFileTLI, ptr, PrimaryConnInfo); + RequestXLogStreaming(tli, ptr, PrimaryConnInfo); + receivedUpto = 0; } /* * Move to XLOG_FROM_STREAM state in either case. We'll get diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index e6e670e9e4b..f7cc6e3c2f5 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -224,8 +224,11 @@ libpqrcv_endstreaming(TimeLineID *next_tli) res = PQgetResult(streamConn); if (PQresultStatus(res) == PGRES_TUPLES_OK) { - /* Read the next timeline's ID */ - if (PQnfields(res) != 1 || PQntuples(res) != 1) + /* + * Read the next timeline's ID. The server also sends the timeline's + * starting point, but it is ignored. + */ + if (PQnfields(res) < 2 || PQntuples(res) != 1) ereport(ERROR, (errmsg("unexpected result set after end-of-streaming"))); *next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0); diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index d414808c9f2..e5ad84393fa 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -260,12 +260,13 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo) walrcv->startTime = now; /* - * If this is the first startup of walreceiver, we initialize receivedUpto - * and latestChunkStart to receiveStart. + * If this is the first startup of walreceiver (on this timeline), + * initialize receivedUpto and latestChunkStart to the starting point. */ - if (walrcv->receiveStart == 0) + if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli) { walrcv->receivedUpto = recptr; + walrcv->receivedTLI = tli; walrcv->latestChunkStart = recptr; } walrcv->receiveStart = recptr; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index c05bb1e0818..1dcb0f57f44 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -567,16 +567,21 @@ StartReplication(StartReplicationCmd *cmd) */ if (sendTimeLineIsHistoric) { - char str[11]; - snprintf(str, sizeof(str), "%u", sendTimeLineNextTLI); + char tli_str[11]; + char startpos_str[8+1+8+1]; - pq_beginmessage(&buf, 'T'); /* RowDescription */ - pq_sendint(&buf, 1, 2); /* 1 field */ + snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI); + snprintf(startpos_str, sizeof(startpos_str), "%X/%X", + (uint32) (sendTimeLineValidUpto >> 32), + (uint32) sendTimeLineValidUpto); + + pq_beginmessage(&buf, 'T'); /* RowDescription */ + pq_sendint(&buf, 2, 2); /* 2 fields */ /* Field header */ pq_sendstring(&buf, "next_tli"); - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ /* * int8 may seem like a surprising data type for this, but in theory * int4 would not be wide enough for this, as TimeLineID is unsigned. @@ -585,13 +590,26 @@ StartReplication(StartReplicationCmd *cmd) pq_sendint(&buf, -1, 2); pq_sendint(&buf, 0, 4); pq_sendint(&buf, 0, 2); + + pq_sendstring(&buf, "next_tli_startpos"); + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); + pq_sendint(&buf, 0, 4); + pq_sendint(&buf, 0, 2); pq_endmessage(&buf); /* Data row */ pq_beginmessage(&buf, 'D'); - pq_sendint(&buf, 1, 2); /* number of columns */ - pq_sendint(&buf, strlen(str), 4); /* length */ - pq_sendbytes(&buf, str, strlen(str)); + pq_sendint(&buf, 2, 2); /* number of columns */ + + pq_sendint(&buf, strlen(tli_str), 4); /* length */ + pq_sendbytes(&buf, tli_str, strlen(tli_str)); + + pq_sendint(&buf, strlen(startpos_str), 4); /* length */ + pq_sendbytes(&buf, startpos_str, strlen(startpos_str)); + pq_endmessage(&buf); } @@ -1462,19 +1480,10 @@ XLogSend(bool *caughtup) history = readTimeLineHistory(ThisTimeLineID); sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI); - Assert(sentPtr <= sendTimeLineValidUpto); + Assert(sendTimeLine < sendTimeLineNextTLI); list_free_deep(history); - /* the current send pointer should be <= the switchpoint */ - if (!(sentPtr <= sendTimeLineValidUpto)) - elog(ERROR, "server switched off timeline %u at %X/%X, but walsender already streamed up to %X/%X", - sendTimeLine, - (uint32) (sendTimeLineValidUpto >> 32), - (uint32) sendTimeLineValidUpto, - (uint32) (sentPtr >> 32), - (uint32) sentPtr); - sendTimeLineIsHistoric = true; SendRqstPtr = sendTimeLineValidUpto; @@ -1498,6 +1507,15 @@ XLogSend(bool *caughtup) /* * If this is a historic timeline and we've reached the point where we * forked to the next timeline, stop streaming. + * + * Note: We might already have sent WAL > sendTimeLineValidUpto. The + * startup process will normally replay all WAL that has been received from + * the master, before promoting, but if the WAL streaming is terminated at + * a WAL page boundary, the valid portion of the timeline might end in the + * middle of a WAL record. We might've already sent the first half of that + * partial WAL record to the cascading standby, so that sentPtr > + * sendTimeLineValidUpto. That's OK; the cascading standby can't replay the + * partial WAL record either, so it can still follow our timeline switch. */ if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) { @@ -1511,6 +1529,10 @@ XLogSend(bool *caughtup) streamingDoneSending = true; *caughtup = true; + + elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)", + (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto, + (uint32) (sentPtr >> 32), (uint32) sentPtr); return; } |