diff options
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/basebackup.c | 64 | ||||
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 11 | ||||
-rw-r--r-- | src/backend/replication/syncrep.c | 2 | ||||
-rw-r--r-- | src/backend/replication/walreceiver.c | 89 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 182 |
5 files changed, 184 insertions, 164 deletions
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index ab5262adfbf..12b5e24cac5 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -58,7 +58,7 @@ static void base_backup_cleanup(int code, Datum arg); static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir); static void parse_basebackup_options(List *options, basebackup_options *opt); static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); -static int compareWalFileNames(const void *a, const void *b); +static int compareWalFileNames(const void *a, const void *b); /* Was the backup currently in-progress initiated in recovery mode? */ static bool backup_started_in_recovery = false; @@ -249,8 +249,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) * I'd rather not worry about timelines here, so scan pg_xlog and * include all WAL files in the range between 'startptr' and 'endptr', * regardless of the timeline the file is stamped with. If there are - * some spurious WAL files belonging to timelines that don't belong - * in this server's history, they will be included too. Normally there + * some spurious WAL files belonging to timelines that don't belong in + * this server's history, they will be included too. Normally there * shouldn't be such files, but if there are, there's little harm in * including them. */ @@ -262,7 +262,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) dir = AllocateDir("pg_xlog"); if (!dir) ereport(ERROR, - (errmsg("could not open directory \"%s\": %m", "pg_xlog"))); + (errmsg("could not open directory \"%s\": %m", "pg_xlog"))); while ((de = ReadDir(dir, "pg_xlog")) != NULL) { /* Does it look like a WAL segment, and is it in the range? */ @@ -290,9 +290,9 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) CheckXLogRemoved(startsegno, ThisTimeLineID); /* - * Put the WAL filenames into an array, and sort. We send the files - * in order from oldest to newest, to reduce the chance that a file - * is recycled before we get a chance to send it over. + * Put the WAL filenames into an array, and sort. We send the files in + * order from oldest to newest, to reduce the chance that a file is + * recycled before we get a chance to send it over. */ nWalFiles = list_length(walFileList); walFiles = palloc(nWalFiles * sizeof(char *)); @@ -310,28 +310,31 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) XLogFromFileName(walFiles[0], &tli, &segno); if (segno != startsegno) { - char startfname[MAXFNAMELEN]; + char startfname[MAXFNAMELEN]; + XLogFileName(startfname, ThisTimeLineID, startsegno); ereport(ERROR, (errmsg("could not find WAL file \"%s\"", startfname))); } for (i = 0; i < nWalFiles; i++) { - XLogSegNo currsegno = segno; - XLogSegNo nextsegno = segno + 1; + XLogSegNo currsegno = segno; + XLogSegNo nextsegno = segno + 1; XLogFromFileName(walFiles[i], &tli, &segno); if (!(nextsegno == segno || currsegno == segno)) { - char nextfname[MAXFNAMELEN]; + char nextfname[MAXFNAMELEN]; + XLogFileName(nextfname, ThisTimeLineID, nextsegno); ereport(ERROR, - (errmsg("could not find WAL file \"%s\"", nextfname))); + (errmsg("could not find WAL file \"%s\"", nextfname))); } } if (segno != endsegno) { - char endfname[MAXFNAMELEN]; + char endfname[MAXFNAMELEN]; + XLogFileName(endfname, ThisTimeLineID, endsegno); ereport(ERROR, (errmsg("could not find WAL file \"%s\"", endfname))); @@ -373,7 +376,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) CheckXLogRemoved(segno, tli); ereport(ERROR, (errcode_for_file_access(), - errmsg("unexpected WAL file size \"%s\"", walFiles[i]))); + errmsg("unexpected WAL file size \"%s\"", walFiles[i]))); } _tarWriteHeader(pathbuf, NULL, &statbuf); @@ -396,7 +399,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) CheckXLogRemoved(segno, tli); ereport(ERROR, (errcode_for_file_access(), - errmsg("unexpected WAL file size \"%s\"", walFiles[i]))); + errmsg("unexpected WAL file size \"%s\"", walFiles[i]))); } /* XLogSegSize is a multiple of 512, so no need for padding */ @@ -408,13 +411,14 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) * file is required for recovery, and even that only if there happens * to be a timeline switch in the first WAL segment that contains the * checkpoint record, or if we're taking a base backup from a standby - * server and the target timeline changes while the backup is taken. + * server and the target timeline changes while the backup is taken. * But they are small and highly useful for debugging purposes, so * better include them all, always. */ foreach(lc, historyFileList) { - char *fname = lfirst(lc); + char *fname = lfirst(lc); + snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", fname); if (lstat(pathbuf, &statbuf) != 0) @@ -438,8 +442,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) static int compareWalFileNames(const void *a, const void *b) { - char *fna = *((char **) a); - char *fnb = *((char **) b); + char *fna = *((char **) a); + char *fnb = *((char **) b); return strcmp(fna + 8, fnb + 8); } @@ -657,11 +661,12 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli) pq_sendstring(&buf, "tli"); pq_sendint(&buf, 0, 4); /* table oid */ pq_sendint(&buf, 0, 2); /* attnum */ + /* * int8 may seem like a surprising data type for this, but in thory int4 * would not be wide enough for this, as TimeLineID is unsigned. */ - pq_sendint(&buf, INT8OID, 4); /* type oid */ + pq_sendint(&buf, INT8OID, 4); /* type oid */ pq_sendint(&buf, -1, 2); pq_sendint(&buf, 0, 4); pq_sendint(&buf, 0, 2); @@ -729,7 +734,7 @@ sendFileWithContent(const char *filename, const char *content) /* * Include the tablespace directory pointed to by 'path' in the output tar - * stream. If 'sizeonly' is true, we just calculate a total length and return + * stream. If 'sizeonly' is true, we just calculate a total length and return * it, without actually sending anything. */ static int64 @@ -747,7 +752,8 @@ sendTablespace(char *path, bool sizeonly) TABLESPACE_VERSION_DIRECTORY); /* - * Store a directory entry in the tar file so we get the permissions right. + * Store a directory entry in the tar file so we get the permissions + * right. */ if (lstat(pathbuf, &statbuf) != 0) { @@ -762,7 +768,7 @@ sendTablespace(char *path, bool sizeonly) } if (!sizeonly) _tarWriteHeader(TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf); - size = 512; /* Size of the header just added */ + size = 512; /* Size of the header just added */ /* Send all the files in the tablespace version directory */ size += sendDir(pathbuf, strlen(path), sizeonly); @@ -818,9 +824,9 @@ sendDir(char *path, int basepathlen, bool sizeonly) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("the standby was promoted during online backup"), - errhint("This means that the backup being taken is corrupt " - "and should not be used. " - "Try taking another online backup."))); + errhint("This means that the backup being taken is corrupt " + "and should not be used. " + "Try taking another online backup."))); snprintf(pathbuf, MAXPGPATH, "%s/%s", path, de->d_name); @@ -923,7 +929,7 @@ sendDir(char *path, int basepathlen, bool sizeonly) } else if (S_ISREG(statbuf.st_mode)) { - bool sent = false; + bool sent = false; if (!sizeonly) sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf, @@ -933,7 +939,7 @@ sendDir(char *path, int basepathlen, bool sizeonly) { /* Add size, rounded up to 512byte block */ size += ((statbuf.st_size + 511) & ~511); - size += 512; /* Size of the header of the file */ + size += 512; /* Size of the header of the file */ } } else @@ -967,7 +973,7 @@ sendDir(char *path, int basepathlen, bool sizeonly) * and the file did not exist. */ static bool -sendFile(char *readfilename, char *tarfilename, struct stat *statbuf, +sendFile(char *readfilename, char *tarfilename, struct stat * statbuf, bool missing_ok) { FILE *fp; diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index f7cc6e3c2f5..6bc0aa1c12c 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -51,7 +51,7 @@ static void libpqrcv_identify_system(TimeLineID *primary_tli); static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len); static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint); static void libpqrcv_endstreaming(TimeLineID *next_tli); -static int libpqrcv_receive(int timeout, char **buffer); +static int libpqrcv_receive(int timeout, char **buffer); static void libpqrcv_send(const char *buffer, int nbytes); static void libpqrcv_disconnect(void); @@ -209,12 +209,13 @@ libpqrcv_endstreaming(TimeLineID *next_tli) if (PQputCopyEnd(streamConn, NULL) <= 0 || PQflush(streamConn)) ereport(ERROR, - (errmsg("could not send end-of-streaming message to primary: %s", - PQerrorMessage(streamConn)))); + (errmsg("could not send end-of-streaming message to primary: %s", + PQerrorMessage(streamConn)))); /* * After COPY is finished, we should receive a result set indicating the - * next timeline's ID, or just CommandComplete if the server was shut down. + * next timeline's ID, or just CommandComplete if the server was shut + * down. * * If we had not yet received CopyDone from the backend, PGRES_COPY_IN * would also be possible. However, at the moment this function is only @@ -456,7 +457,7 @@ libpqrcv_disconnect(void) * 0 if no data was available within timeout, or wait was interrupted * by signal. * - * -1 if the server ended the COPY. + * -1 if the server ended the COPY. * * ereports on error. */ diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 975ee214ab4..5424281b425 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -443,7 +443,7 @@ SyncRepReleaseWaiters(void) elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, - numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); + numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); /* * If we are managing the highest priority standby, though we weren't diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 911a66ba887..9261449d706 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -86,7 +86,7 @@ walrcv_disconnect_type walrcv_disconnect = NULL; * corresponding the filename of recvFile. */ static int recvFile = -1; -static TimeLineID recvFileTLI = 0; +static TimeLineID recvFileTLI = 0; static XLogSegNo recvSegNo = 0; static uint32 recvOff = 0; @@ -107,8 +107,8 @@ static struct XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ } LogstreamResult; -static StringInfoData reply_message; -static StringInfoData incoming_message; +static StringInfoData reply_message; +static StringInfoData incoming_message; /* * About SIGTERM handling: @@ -332,12 +332,13 @@ WalReceiverMain(void) /* * Get any missing history files. We do this always, even when we're - * not interested in that timeline, so that if we're promoted to become - * the master later on, we don't select the same timeline that was - * already used in the current master. This isn't bullet-proof - you'll - * need some external software to manage your cluster if you need to - * ensure that a unique timeline id is chosen in every case, but let's - * avoid the confusion of timeline id collisions where we can. + * not interested in that timeline, so that if we're promoted to + * become the master later on, we don't select the same timeline that + * was already used in the current master. This isn't bullet-proof - + * you'll need some external software to manage your cluster if you + * need to ensure that a unique timeline id is chosen in every case, + * but let's avoid the confusion of timeline id collisions where we + * can. */ WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI); @@ -356,18 +357,18 @@ WalReceiverMain(void) ThisTimeLineID = startpointTLI; if (walrcv_startstreaming(startpointTLI, startpoint)) { - bool endofwal = false; + bool endofwal = false; if (first_stream) ereport(LOG, (errmsg("started streaming WAL from primary at %X/%X on timeline %u", - (uint32) (startpoint >> 32), (uint32) startpoint, + (uint32) (startpoint >> 32), (uint32) startpoint, startpointTLI))); else ereport(LOG, - (errmsg("restarted WAL streaming at %X/%X on timeline %u", - (uint32) (startpoint >> 32), (uint32) startpoint, - startpointTLI))); + (errmsg("restarted WAL streaming at %X/%X on timeline %u", + (uint32) (startpoint >> 32), (uint32) startpoint, + startpointTLI))); first_stream = false; /* Initialize LogstreamResult and buffers for processing messages */ @@ -387,7 +388,8 @@ WalReceiverMain(void) /* * Emergency bailout if postmaster has died. This is to avoid - * the necessity for manual cleanup of all postmaster children. + * the necessity for manual cleanup of all postmaster + * children. */ if (!PostmasterIsAlive()) exit(1); @@ -422,7 +424,10 @@ WalReceiverMain(void) { if (len > 0) { - /* Something was received from master, so reset timeout */ + /* + * Something was received from master, so reset + * timeout + */ last_recv_timestamp = GetCurrentTimestamp(); ping_sent = false; XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1); @@ -457,12 +462,13 @@ WalReceiverMain(void) /* * We didn't receive anything new. If we haven't heard * anything from the server for more than - * wal_receiver_timeout / 2, ping the server. Also, if it's - * been longer than wal_receiver_status_interval since the - * last update we sent, send a status update to the master - * anyway, to report any progress in applying WAL. + * wal_receiver_timeout / 2, ping the server. Also, if + * it's been longer than wal_receiver_status_interval + * since the last update we sent, send a status update to + * the master anyway, to report any progress in applying + * WAL. */ - bool requestReply = false; + bool requestReply = false; /* * Check if time since last receive from standby has @@ -482,13 +488,13 @@ WalReceiverMain(void) (errmsg("terminating walreceiver due to timeout"))); /* - * We didn't receive anything new, for half of receiver - * replication timeout. Ping the server. + * We didn't receive anything new, for half of + * receiver replication timeout. Ping the server. */ if (!ping_sent) { timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, - (wal_receiver_timeout/2)); + (wal_receiver_timeout / 2)); if (now >= timeout) { requestReply = true; @@ -511,9 +517,9 @@ WalReceiverMain(void) DisableWalRcvImmediateExit(); /* - * If the server had switched to a new timeline that we didn't know - * about when we began streaming, fetch its timeline history file - * now. + * If the server had switched to a new timeline that we didn't + * know about when we began streaming, fetch its timeline history + * file now. */ WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI); } @@ -614,8 +620,8 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) if (walrcv->walRcvState == WALRCV_STOPPING) { /* - * We should've received SIGTERM if the startup process wants - * us to die, but might as well check it here too. + * We should've received SIGTERM if the startup process wants us + * to die, but might as well check it here too. */ SpinLockRelease(&walrcv->mutex); exit(1); @@ -643,7 +649,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last) { - TimeLineID tli; + TimeLineID tli; for (tli = first; tli <= last; tli++) { @@ -664,8 +670,9 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last) DisableWalRcvImmediateExit(); /* - * Check that the filename on the master matches what we calculated - * ourselves. This is just a sanity check, it should always match. + * Check that the filename on the master matches what we + * calculated ourselves. This is just a sanity check, it should + * always match. */ TLHistoryFileName(expectedfname, tli); if (strcmp(fname, expectedfname) != 0) @@ -791,7 +798,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) int hdrlen; XLogRecPtr dataStart; XLogRecPtr walEnd; - TimestampTz sendTime; + TimestampTz sendTime; bool replyRequested; resetStringInfo(&incoming_message); @@ -812,7 +819,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) dataStart = pq_getmsgint64(&incoming_message); walEnd = pq_getmsgint64(&incoming_message); sendTime = IntegerTimestampToTimestampTz( - pq_getmsgint64(&incoming_message)); + pq_getmsgint64(&incoming_message)); ProcessWalSndrMessage(walEnd, sendTime); buf += hdrlen; @@ -833,7 +840,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) /* read the fields */ walEnd = pq_getmsgint64(&incoming_message); sendTime = IntegerTimestampToTimestampTz( - pq_getmsgint64(&incoming_message)); + pq_getmsgint64(&incoming_message)); replyRequested = pq_getmsgbyte(&incoming_message); ProcessWalSndrMessage(walEnd, sendTime); @@ -890,8 +897,8 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) XLogFileNameP(recvFileTLI, recvSegNo)))); /* - * Create .done file forcibly to prevent the streamed segment from - * being archived later. + * Create .done file forcibly to prevent the streamed segment + * from being archived later. */ XLogFileName(xlogfname, recvFileTLI, recvSegNo); XLogArchiveForceDone(xlogfname); @@ -920,9 +927,9 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0) ereport(PANIC, (errcode_for_file_access(), - errmsg("could not seek in log segment %s, to offset %u: %m", - XLogFileNameP(recvFileTLI, recvSegNo), - startoff))); + errmsg("could not seek in log segment %s, to offset %u: %m", + XLogFileNameP(recvFileTLI, recvSegNo), + startoff))); recvOff = startoff; } @@ -1110,7 +1117,7 @@ XLogWalRcvSendHSFeedback(bool immed) * Send feedback at most once per wal_receiver_status_interval. */ if (!TimestampDifferenceExceeds(sendTime, now, - wal_receiver_status_interval * 1000)) + wal_receiver_status_interval * 1000)) return; sendTime = now; } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 1dcb0f57f44..717cbfd61c6 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -94,12 +94,13 @@ bool am_cascading_walsender = false; /* Am I cascading WAL to /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ -int wal_sender_timeout = 60 * 1000; /* maximum time to send one +int wal_sender_timeout = 60 * 1000; /* maximum time to send one * WAL data message */ + /* * State for WalSndWakeupRequest */ -bool wake_wal_senders = false; +bool wake_wal_senders = false; /* * These variables are used similarly to openLogFile/Id/Seg/Off, @@ -110,7 +111,7 @@ static XLogSegNo sendSegNo = 0; static uint32 sendOff = 0; /* Timeline ID of the currently open file */ -static TimeLineID curFileTimeLine = 0; +static TimeLineID curFileTimeLine = 0; /* * These variables keep track of the state of the timeline we're currently @@ -118,10 +119,10 @@ static TimeLineID curFileTimeLine = 0; * the timeline is not the latest timeline on this server, and the server's * history forked off from that timeline at sendTimeLineValidUpto. */ -static TimeLineID sendTimeLine = 0; -static TimeLineID sendTimeLineNextTLI = 0; -static bool sendTimeLineIsHistoric = false; -static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr; +static TimeLineID sendTimeLine = 0; +static TimeLineID sendTimeLineNextTLI = 0; +static bool sendTimeLineIsHistoric = false; +static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr; /* * How far have we sent WAL already? This is also advertised in @@ -138,8 +139,9 @@ static StringInfoData tmpbuf; * Timestamp of the last receipt of the reply from the standby. */ static TimestampTz last_reply_timestamp; + /* Have we sent a heartbeat message asking for reply, since last reply? */ -static bool ping_sent = false; +static bool ping_sent = false; /* * While streaming WAL in Copy mode, streamingDoneSending is set to true @@ -147,8 +149,8 @@ static bool ping_sent = false; * after that. streamingDoneReceiving is set to true when we receive CopyDone * from the other end. When both become true, it's time to exit Copy mode. */ -static bool streamingDoneSending; -static bool streamingDoneReceiving; +static bool streamingDoneSending; +static bool streamingDoneReceiving; /* Flags set by signal handlers for later service in main loop */ static volatile sig_atomic_t got_SIGHUP = false; @@ -322,8 +324,8 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd) off_t bytesleft; /* - * Reply with a result set with one row, and two columns. The first col - * is the name of the history file, 2nd is the contents. + * Reply with a result set with one row, and two columns. The first col is + * the name of the history file, 2nd is the contents. */ TLHistoryFileName(histfname, cmd->timeline); @@ -343,7 +345,7 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd) pq_sendint(&buf, 0, 2); /* format code */ /* second field */ - pq_sendstring(&buf, "content"); /* col name */ + pq_sendstring(&buf, "content"); /* col name */ pq_sendint(&buf, 0, 4); /* table oid */ pq_sendint(&buf, 0, 2); /* attnum */ pq_sendint(&buf, BYTEAOID, 4); /* type oid */ @@ -355,7 +357,7 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd) /* Send a DataRow message */ pq_beginmessage(&buf, 'D'); pq_sendint(&buf, 2, 2); /* # of columns */ - pq_sendint(&buf, strlen(histfname), 4); /* col1 len */ + pq_sendint(&buf, strlen(histfname), 4); /* col1 len */ pq_sendbytes(&buf, histfname, strlen(histfname)); fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666); @@ -373,15 +375,15 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd) if (lseek(fd, 0, SEEK_SET) != 0) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not seek to beginning of file \"%s\": %m", path))); + errmsg("could not seek to beginning of file \"%s\": %m", path))); pq_sendint(&buf, histfilelen, 4); /* col2 len */ bytesleft = histfilelen; while (bytesleft > 0) { - char rbuf[BLCKSZ]; - int nread; + char rbuf[BLCKSZ]; + int nread; nread = read(fd, rbuf, sizeof(rbuf)); if (nread <= 0) @@ -407,7 +409,7 @@ static void StartReplication(StartReplicationCmd *cmd) { StringInfoData buf; - XLogRecPtr FlushPtr; + XLogRecPtr FlushPtr; /* * We assume here that we're logging enough information in the WAL for @@ -420,8 +422,8 @@ StartReplication(StartReplicationCmd *cmd) /* * Select the timeline. If it was given explicitly by the client, use - * that. Otherwise use the timeline of the last replayed record, which - * is kept in ThisTimeLineID. + * that. Otherwise use the timeline of the last replayed record, which is + * kept in ThisTimeLineID. */ if (am_cascading_walsender) { @@ -448,8 +450,8 @@ StartReplication(StartReplicationCmd *cmd) sendTimeLineIsHistoric = true; /* - * Check that the timeline the client requested for exists, and the - * requested start location is on that timeline. + * Check that the timeline the client requested for exists, and + * the requested start location is on that timeline. */ timeLineHistory = readTimeLineHistory(ThisTimeLineID); switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory, @@ -461,14 +463,14 @@ StartReplication(StartReplicationCmd *cmd) * requested startpoint is on that timeline in our history. * * This is quite loose on purpose. We only check that we didn't - * fork off the requested timeline before the switchpoint. We don't - * check that we switched *to* it before the requested starting - * point. This is because the client can legitimately request to - * start replication from the beginning of the WAL segment that - * contains switchpoint, but on the new timeline, so that it - * doesn't end up with a partial segment. If you ask for a too old - * starting point, you'll get an error later when we fail to find - * the requested WAL segment in pg_xlog. + * fork off the requested timeline before the switchpoint. We + * don't check that we switched *to* it before the requested + * starting point. This is because the client can legitimately + * request to start replication from the beginning of the WAL + * segment that contains switchpoint, but on the new timeline, so + * that it doesn't end up with a partial segment. If you ask for a + * too old starting point, you'll get an error later when we fail + * to find the requested WAL segment in pg_xlog. * * XXX: we could be more strict here and only allow a startpoint * that's older than the switchpoint, if it it's still in the same @@ -503,12 +505,13 @@ StartReplication(StartReplicationCmd *cmd) if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto) { /* - * When we first start replication the standby will be behind the primary. - * For some applications, for example, synchronous replication, it is - * important to have a clear state for this initial catchup mode, so we - * can trigger actions when we change streaming state later. We may stay - * in this state for a long time, which is exactly why we want to be able - * to monitor whether or not we are still here. + * When we first start replication the standby will be behind the + * primary. For some applications, for example, synchronous + * replication, it is important to have a clear state for this initial + * catchup mode, so we can trigger actions when we change streaming + * state later. We may stay in this state for a long time, which is + * exactly why we want to be able to monitor whether or not we are + * still here. */ WalSndSetState(WALSNDSTATE_CATCHUP); @@ -568,20 +571,21 @@ StartReplication(StartReplicationCmd *cmd) if (sendTimeLineIsHistoric) { char tli_str[11]; - char startpos_str[8+1+8+1]; + char startpos_str[8 + 1 + 8 + 1]; snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI); snprintf(startpos_str, sizeof(startpos_str), "%X/%X", (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto); - pq_beginmessage(&buf, 'T'); /* RowDescription */ - pq_sendint(&buf, 2, 2); /* 2 fields */ + pq_beginmessage(&buf, 'T'); /* RowDescription */ + pq_sendint(&buf, 2, 2); /* 2 fields */ /* Field header */ pq_sendstring(&buf, "next_tli"); - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + /* * int8 may seem like a surprising data type for this, but in theory * int4 would not be wide enough for this, as TimeLineID is unsigned. @@ -592,8 +596,8 @@ StartReplication(StartReplicationCmd *cmd) pq_sendint(&buf, 0, 2); pq_sendstring(&buf, "next_tli_startpos"); - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ pq_sendint(&buf, TEXTOID, 4); /* type oid */ pq_sendint(&buf, -1, 2); pq_sendint(&buf, 0, 4); @@ -602,12 +606,12 @@ StartReplication(StartReplicationCmd *cmd) /* Data row */ pq_beginmessage(&buf, 'D'); - pq_sendint(&buf, 2, 2); /* number of columns */ + pq_sendint(&buf, 2, 2); /* number of columns */ pq_sendint(&buf, strlen(tli_str), 4); /* length */ pq_sendbytes(&buf, tli_str, strlen(tli_str)); - pq_sendint(&buf, strlen(startpos_str), 4); /* length */ + pq_sendint(&buf, strlen(startpos_str), 4); /* length */ pq_sendbytes(&buf, startpos_str, strlen(startpos_str)); pq_endmessage(&buf); @@ -840,7 +844,7 @@ ProcessStandbyReplyMessage(void) writePtr = pq_getmsgint64(&reply_message); flushPtr = pq_getmsgint64(&reply_message); applyPtr = pq_getmsgint64(&reply_message); - (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ replyRequested = pq_getmsgbyte(&reply_message); elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s", @@ -887,7 +891,7 @@ ProcessStandbyHSFeedbackMessage(void) * Decipher the reply message. The caller already consumed the msgtype * byte. */ - (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ feedbackXmin = pq_getmsgint(&reply_message, 4); feedbackEpoch = pq_getmsgint(&reply_message, 4); @@ -932,11 +936,11 @@ ProcessStandbyHSFeedbackMessage(void) * cleanup conflicts on the standby server. * * There is a small window for a race condition here: although we just - * checked that feedbackXmin precedes nextXid, the nextXid could have gotten - * advanced between our fetching it and applying the xmin below, perhaps - * far enough to make feedbackXmin wrap around. In that case the xmin we - * set here would be "in the future" and have no effect. No point in - * worrying about this since it's too late to save the desired data + * checked that feedbackXmin precedes nextXid, the nextXid could have + * gotten advanced between our fetching it and applying the xmin below, + * perhaps far enough to make feedbackXmin wrap around. In that case the + * xmin we set here would be "in the future" and have no effect. No point + * in worrying about this since it's too late to save the desired data * anyway. Assuming that the standby sends us an increasing sequence of * xmins, this could only happen during the first reply cycle, else our * own xmin would prevent nextXid from advancing so far. @@ -969,8 +973,8 @@ WalSndLoop(void) ping_sent = false; /* - * Loop until we reach the end of this timeline or the client requests - * to stop streaming. + * Loop until we reach the end of this timeline or the client requests to + * stop streaming. */ for (;;) { @@ -1082,8 +1086,8 @@ WalSndLoop(void) { /* * If half of wal_sender_timeout has lapsed without receiving - * any reply from standby, send a keep-alive message to standby - * requesting an immediate reply. + * any reply from standby, send a keep-alive message to + * standby requesting an immediate reply. */ timeout = TimestampTzPlusMilliseconds(last_reply_timestamp, wal_sender_timeout / 2); @@ -1133,6 +1137,7 @@ WalSndLoop(void) return; send_failure: + /* * Get here on send failure. Clean up and exit. * @@ -1290,7 +1295,7 @@ retry: curFileTimeLine = sendTimeLine; if (sendTimeLineIsHistoric) { - XLogSegNo endSegNo; + XLogSegNo endSegNo; XLByteToSeg(sendTimeLineValidUpto, endSegNo); if (sendSegNo == endSegNo) @@ -1311,7 +1316,7 @@ retry: ereport(ERROR, (errcode_for_file_access(), errmsg("requested WAL segment %s has already been removed", - XLogFileNameP(curFileTimeLine, sendSegNo)))); + XLogFileNameP(curFileTimeLine, sendSegNo)))); else ereport(ERROR, (errcode_for_file_access(), @@ -1327,9 +1332,9 @@ retry: if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not seek in log segment %s to offset %u: %m", - XLogFileNameP(curFileTimeLine, sendSegNo), - startoff))); + errmsg("could not seek in log segment %s to offset %u: %m", + XLogFileNameP(curFileTimeLine, sendSegNo), + startoff))); sendOff = startoff; } @@ -1344,9 +1349,9 @@ retry: { ereport(ERROR, (errcode_for_file_access(), - errmsg("could not read from log segment %s, offset %u, length %lu: %m", - XLogFileNameP(curFileTimeLine, sendSegNo), - sendOff, (unsigned long) segbytes))); + errmsg("could not read from log segment %s, offset %u, length %lu: %m", + XLogFileNameP(curFileTimeLine, sendSegNo), + sendOff, (unsigned long) segbytes))); } /* Update state for read */ @@ -1431,16 +1436,16 @@ XLogSend(bool *caughtup) /* * Streaming the latest timeline on a standby. * - * Attempt to send all WAL that has already been replayed, so that - * we know it's valid. If we're receiving WAL through streaming + * Attempt to send all WAL that has already been replayed, so that we + * know it's valid. If we're receiving WAL through streaming * replication, it's also OK to send any WAL that has been received * but not replayed. * * The timeline we're recovering from can change, or we can be - * promoted. In either case, the current timeline becomes historic. - * We need to detect that so that we don't try to stream past the - * point where we switched to another timeline. We check for promotion - * or timeline switch after calculating FlushPtr, to avoid a race + * promoted. In either case, the current timeline becomes historic. We + * need to detect that so that we don't try to stream past the point + * where we switched to another timeline. We check for promotion or + * timeline switch after calculating FlushPtr, to avoid a race * condition: if the timeline becomes historic just after we checked * that it was still current, it's still be OK to stream it up to the * FlushPtr that was calculated before it became historic. @@ -1496,7 +1501,7 @@ XLogSend(bool *caughtup) * * Attempt to send all data that's already been written out and * fsync'd to disk. We cannot go further than what's been written out - * given the current implementation of XLogRead(). And in any case + * given the current implementation of XLogRead(). And in any case * it's unsafe to send WAL that is not securely down to disk on the * master: if the master subsequently crashes and restarts, slaves * must not have applied any WAL that gets lost on the master. @@ -1509,13 +1514,14 @@ XLogSend(bool *caughtup) * forked to the next timeline, stop streaming. * * Note: We might already have sent WAL > sendTimeLineValidUpto. The - * startup process will normally replay all WAL that has been received from - * the master, before promoting, but if the WAL streaming is terminated at - * a WAL page boundary, the valid portion of the timeline might end in the - * middle of a WAL record. We might've already sent the first half of that - * partial WAL record to the cascading standby, so that sentPtr > - * sendTimeLineValidUpto. That's OK; the cascading standby can't replay the - * partial WAL record either, so it can still follow our timeline switch. + * startup process will normally replay all WAL that has been received + * from the master, before promoting, but if the WAL streaming is + * terminated at a WAL page boundary, the valid portion of the timeline + * might end in the middle of a WAL record. We might've already sent the + * first half of that partial WAL record to the cascading standby, so that + * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't + * replay the partial WAL record either, so it can still follow our + * timeline switch. */ if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) { @@ -1585,8 +1591,8 @@ XLogSend(bool *caughtup) pq_sendbyte(&output_message, 'w'); pq_sendint64(&output_message, startptr); /* dataStart */ - pq_sendint64(&output_message, SendRqstPtr); /* walEnd */ - pq_sendint64(&output_message, 0); /* sendtime, filled in last */ + pq_sendint64(&output_message, SendRqstPtr); /* walEnd */ + pq_sendint64(&output_message, 0); /* sendtime, filled in last */ /* * Read the log directly into the output buffer to avoid extra memcpy @@ -1643,16 +1649,16 @@ XLogSend(bool *caughtup) static XLogRecPtr GetStandbyFlushRecPtr(void) { - XLogRecPtr replayPtr; - TimeLineID replayTLI; - XLogRecPtr receivePtr; - TimeLineID receiveTLI; + XLogRecPtr replayPtr; + TimeLineID replayTLI; + XLogRecPtr receivePtr; + TimeLineID receiveTLI; XLogRecPtr result; /* * We can safely send what's already been replayed. Also, if walreceiver - * is streaming WAL from the same timeline, we can send anything that - * it has streamed, but hasn't been replayed yet. + * is streaming WAL from the same timeline, we can send anything that it + * has streamed, but hasn't been replayed yet. */ receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI); @@ -1742,8 +1748,8 @@ WalSndSignals(void) pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config * file */ pqsignal(SIGINT, SIG_IGN); /* not used */ - pqsignal(SIGTERM, die); /* request shutdown */ - pqsignal(SIGQUIT, quickdie); /* hard crash time */ + pqsignal(SIGTERM, die); /* request shutdown */ + pqsignal(SIGQUIT, quickdie); /* hard crash time */ InitializeTimeouts(); /* establishes SIGALRM handler */ pqsignal(SIGPIPE, SIG_IGN); pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */ |