diff options
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r-- | src/backend/replication/walreceiver.c | 89 |
1 files changed, 48 insertions, 41 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 911a66ba887..9261449d706 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -86,7 +86,7 @@ walrcv_disconnect_type walrcv_disconnect = NULL; * corresponding the filename of recvFile. */ static int recvFile = -1; -static TimeLineID recvFileTLI = 0; +static TimeLineID recvFileTLI = 0; static XLogSegNo recvSegNo = 0; static uint32 recvOff = 0; @@ -107,8 +107,8 @@ static struct XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ } LogstreamResult; -static StringInfoData reply_message; -static StringInfoData incoming_message; +static StringInfoData reply_message; +static StringInfoData incoming_message; /* * About SIGTERM handling: @@ -332,12 +332,13 @@ WalReceiverMain(void) /* * Get any missing history files. We do this always, even when we're - * not interested in that timeline, so that if we're promoted to become - * the master later on, we don't select the same timeline that was - * already used in the current master. This isn't bullet-proof - you'll - * need some external software to manage your cluster if you need to - * ensure that a unique timeline id is chosen in every case, but let's - * avoid the confusion of timeline id collisions where we can. + * not interested in that timeline, so that if we're promoted to + * become the master later on, we don't select the same timeline that + * was already used in the current master. This isn't bullet-proof - + * you'll need some external software to manage your cluster if you + * need to ensure that a unique timeline id is chosen in every case, + * but let's avoid the confusion of timeline id collisions where we + * can. */ WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI); @@ -356,18 +357,18 @@ WalReceiverMain(void) ThisTimeLineID = startpointTLI; if (walrcv_startstreaming(startpointTLI, startpoint)) { - bool endofwal = false; + bool endofwal = false; if (first_stream) ereport(LOG, (errmsg("started streaming WAL from primary at %X/%X on timeline %u", - (uint32) (startpoint >> 32), (uint32) startpoint, + (uint32) (startpoint >> 32), (uint32) startpoint, startpointTLI))); else ereport(LOG, - (errmsg("restarted WAL streaming at %X/%X on timeline %u", - (uint32) (startpoint >> 32), (uint32) startpoint, - startpointTLI))); + (errmsg("restarted WAL streaming at %X/%X on timeline %u", + (uint32) (startpoint >> 32), (uint32) startpoint, + startpointTLI))); first_stream = false; /* Initialize LogstreamResult and buffers for processing messages */ @@ -387,7 +388,8 @@ WalReceiverMain(void) /* * Emergency bailout if postmaster has died. This is to avoid - * the necessity for manual cleanup of all postmaster children. + * the necessity for manual cleanup of all postmaster + * children. */ if (!PostmasterIsAlive()) exit(1); @@ -422,7 +424,10 @@ WalReceiverMain(void) { if (len > 0) { - /* Something was received from master, so reset timeout */ + /* + * Something was received from master, so reset + * timeout + */ last_recv_timestamp = GetCurrentTimestamp(); ping_sent = false; XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1); @@ -457,12 +462,13 @@ WalReceiverMain(void) /* * We didn't receive anything new. If we haven't heard * anything from the server for more than - * wal_receiver_timeout / 2, ping the server. Also, if it's - * been longer than wal_receiver_status_interval since the - * last update we sent, send a status update to the master - * anyway, to report any progress in applying WAL. + * wal_receiver_timeout / 2, ping the server. Also, if + * it's been longer than wal_receiver_status_interval + * since the last update we sent, send a status update to + * the master anyway, to report any progress in applying + * WAL. */ - bool requestReply = false; + bool requestReply = false; /* * Check if time since last receive from standby has @@ -482,13 +488,13 @@ WalReceiverMain(void) (errmsg("terminating walreceiver due to timeout"))); /* - * We didn't receive anything new, for half of receiver - * replication timeout. Ping the server. + * We didn't receive anything new, for half of + * receiver replication timeout. Ping the server. */ if (!ping_sent) { timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, - (wal_receiver_timeout/2)); + (wal_receiver_timeout / 2)); if (now >= timeout) { requestReply = true; @@ -511,9 +517,9 @@ WalReceiverMain(void) DisableWalRcvImmediateExit(); /* - * If the server had switched to a new timeline that we didn't know - * about when we began streaming, fetch its timeline history file - * now. + * If the server had switched to a new timeline that we didn't + * know about when we began streaming, fetch its timeline history + * file now. */ WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI); } @@ -614,8 +620,8 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) if (walrcv->walRcvState == WALRCV_STOPPING) { /* - * We should've received SIGTERM if the startup process wants - * us to die, but might as well check it here too. + * We should've received SIGTERM if the startup process wants us + * to die, but might as well check it here too. */ SpinLockRelease(&walrcv->mutex); exit(1); @@ -643,7 +649,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last) { - TimeLineID tli; + TimeLineID tli; for (tli = first; tli <= last; tli++) { @@ -664,8 +670,9 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last) DisableWalRcvImmediateExit(); /* - * Check that the filename on the master matches what we calculated - * ourselves. This is just a sanity check, it should always match. + * Check that the filename on the master matches what we + * calculated ourselves. This is just a sanity check, it should + * always match. */ TLHistoryFileName(expectedfname, tli); if (strcmp(fname, expectedfname) != 0) @@ -791,7 +798,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) int hdrlen; XLogRecPtr dataStart; XLogRecPtr walEnd; - TimestampTz sendTime; + TimestampTz sendTime; bool replyRequested; resetStringInfo(&incoming_message); @@ -812,7 +819,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) dataStart = pq_getmsgint64(&incoming_message); walEnd = pq_getmsgint64(&incoming_message); sendTime = IntegerTimestampToTimestampTz( - pq_getmsgint64(&incoming_message)); + pq_getmsgint64(&incoming_message)); ProcessWalSndrMessage(walEnd, sendTime); buf += hdrlen; @@ -833,7 +840,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) /* read the fields */ walEnd = pq_getmsgint64(&incoming_message); sendTime = IntegerTimestampToTimestampTz( - pq_getmsgint64(&incoming_message)); + pq_getmsgint64(&incoming_message)); replyRequested = pq_getmsgbyte(&incoming_message); ProcessWalSndrMessage(walEnd, sendTime); @@ -890,8 +897,8 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) XLogFileNameP(recvFileTLI, recvSegNo)))); /* - * Create .done file forcibly to prevent the streamed segment from - * being archived later. + * Create .done file forcibly to prevent the streamed segment + * from being archived later. */ XLogFileName(xlogfname, recvFileTLI, recvSegNo); XLogArchiveForceDone(xlogfname); @@ -920,9 +927,9 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0) ereport(PANIC, (errcode_for_file_access(), - errmsg("could not seek in log segment %s, to offset %u: %m", - XLogFileNameP(recvFileTLI, recvSegNo), - startoff))); + errmsg("could not seek in log segment %s, to offset %u: %m", + XLogFileNameP(recvFileTLI, recvSegNo), + startoff))); recvOff = startoff; } @@ -1110,7 +1117,7 @@ XLogWalRcvSendHSFeedback(bool immed) * Send feedback at most once per wal_receiver_status_interval. */ if (!TimestampDifferenceExceeds(sendTime, now, - wal_receiver_status_interval * 1000)) + wal_receiver_status_interval * 1000)) return; sendTime = now; } |