diff options
author | Peter Eisentraut <peter_e@gmx.net> | 2016-11-30 12:00:00 -0500 |
---|---|---|
committer | Peter Eisentraut <peter_e@gmx.net> | 2016-12-01 20:23:28 -0500 |
commit | 78c8c814390f14398e8fd43fe7282cb2e260b50f (patch) | |
tree | 9c6a71c446c8430fa57754bbf8cf97714e2c4dfb /src/backend/replication/walreceiver.c | |
parent | 597a87ccc9a6fa8af7f3cf280b1e24e41807d555 (diff) | |
download | postgresql-78c8c814390f14398e8fd43fe7282cb2e260b50f.tar.gz postgresql-78c8c814390f14398e8fd43fe7282cb2e260b50f.zip |
Refactor libpqwalreceiver
The whole walreceiver API is now wrapped into a struct, like most of our
other loadable module APIs. The libpq connection is no longer a global
variable in libpqwalreceiver. Instead, it is encapsulated into a struct
that is passed around the functions. This allows multiple walreceivers
to run at the same time.
Add some rudimentary support for logical replication connections to
libpqwalreceiver.
These changes are mostly cosmetic and are going to be useful for the
future logical replication patches.
From: Petr Jelinek <petr@2ndquadrant.com>
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r-- | src/backend/replication/walreceiver.c | 59 |
1 files changed, 29 insertions, 30 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 8bfb0415608..cc3cf7d2147 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -74,16 +74,9 @@ int wal_receiver_status_interval; int wal_receiver_timeout; bool hot_standby_feedback; -/* libpqreceiver hooks to these when loaded */ -walrcv_connect_type walrcv_connect = NULL; -walrcv_get_conninfo_type walrcv_get_conninfo = NULL; -walrcv_identify_system_type walrcv_identify_system = NULL; -walrcv_startstreaming_type walrcv_startstreaming = NULL; -walrcv_endstreaming_type walrcv_endstreaming = NULL; -walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile = NULL; -walrcv_receive_type walrcv_receive = NULL; -walrcv_send_type walrcv_send = NULL; -walrcv_disconnect_type walrcv_disconnect = NULL; +/* libpqwalreceiver connection */ +static WalReceiverConn *wrconn = NULL; +WalReceiverFunctionsType *WalReceiverFunctions = NULL; #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ @@ -286,14 +279,7 @@ WalReceiverMain(void) /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); - if (walrcv_connect == NULL || - walrcv_get_conninfo == NULL || - walrcv_startstreaming == NULL || - walrcv_endstreaming == NULL || - walrcv_identify_system == NULL || - walrcv_readtimelinehistoryfile == NULL || - walrcv_receive == NULL || walrcv_send == NULL || - walrcv_disconnect == NULL) + if (WalReceiverFunctions == NULL) elog(ERROR, "libpqwalreceiver didn't initialize correctly"); /* @@ -307,14 +293,14 @@ WalReceiverMain(void) /* Establish the connection to the primary for XLOG streaming */ EnableWalRcvImmediateExit(); - walrcv_connect(conninfo); + wrconn = walrcv_connect(conninfo, false, "walreceiver"); DisableWalRcvImmediateExit(); /* * Save user-visible connection string. This clobbers the original * conninfo, for security. */ - tmp_conninfo = walrcv_get_conninfo(); + tmp_conninfo = walrcv_get_conninfo(wrconn); SpinLockAcquire(&walrcv->mutex); memset(walrcv->conninfo, 0, MAXCONNINFO); if (tmp_conninfo) @@ -328,12 +314,25 @@ WalReceiverMain(void) first_stream = true; for (;;) { + char *primary_sysid; + char standby_sysid[32]; + /* * Check that we're connected to a valid server using the * IDENTIFY_SYSTEM replication command, */ EnableWalRcvImmediateExit(); - walrcv_identify_system(&primaryTLI); + primary_sysid = walrcv_identify_system(wrconn, &primaryTLI); + + snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, + GetSystemIdentifier()); + if (strcmp(primary_sysid, standby_sysid) != 0) + { + ereport(ERROR, + (errmsg("database system identifier differs between the primary and standby"), + errdetail("The primary's identifier is %s, the standby's identifier is %s.", + primary_sysid, standby_sysid))); + } DisableWalRcvImmediateExit(); /* @@ -370,7 +369,7 @@ WalReceiverMain(void) * on the new timeline. */ ThisTimeLineID = startpointTLI; - if (walrcv_startstreaming(startpointTLI, startpoint, + if (walrcv_startstreaming(wrconn, startpointTLI, startpoint, slotname[0] != '\0' ? slotname : NULL)) { if (first_stream) @@ -422,7 +421,7 @@ WalReceiverMain(void) } /* See if we can read data immediately */ - len = walrcv_receive(&buf, &wait_fd); + len = walrcv_receive(wrconn, &buf, &wait_fd); if (len != 0) { /* @@ -453,7 +452,7 @@ WalReceiverMain(void) endofwal = true; break; } - len = walrcv_receive(&buf, &wait_fd); + len = walrcv_receive(wrconn, &buf, &wait_fd); } /* Let the master know that we received some data. */ @@ -570,7 +569,7 @@ WalReceiverMain(void) * our side, too. */ EnableWalRcvImmediateExit(); - walrcv_endstreaming(&primaryTLI); + walrcv_endstreaming(wrconn, &primaryTLI); DisableWalRcvImmediateExit(); /* @@ -726,7 +725,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last) tli))); EnableWalRcvImmediateExit(); - walrcv_readtimelinehistoryfile(tli, &fname, &content, &len); + walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len); DisableWalRcvImmediateExit(); /* @@ -778,8 +777,8 @@ WalRcvDie(int code, Datum arg) SpinLockRelease(&walrcv->mutex); /* Terminate the connection gracefully. */ - if (walrcv_disconnect != NULL) - walrcv_disconnect(); + if (wrconn != NULL) + walrcv_disconnect(wrconn); /* Wake up the startup process to notice promptly that we're gone */ WakeupRecovery(); @@ -1150,7 +1149,7 @@ XLogWalRcvSendReply(bool force, bool requestReply) (uint32) (applyPtr >> 32), (uint32) applyPtr, requestReply ? " (reply requested)" : ""); - walrcv_send(reply_message.data, reply_message.len); + walrcv_send(wrconn, reply_message.data, reply_message.len); } /* @@ -1228,7 +1227,7 @@ XLogWalRcvSendHSFeedback(bool immed) pq_sendint64(&reply_message, GetCurrentIntegerTimestamp()); pq_sendint(&reply_message, xmin, 4); pq_sendint(&reply_message, nextEpoch, 4); - walrcv_send(reply_message.data, reply_message.len); + walrcv_send(wrconn, reply_message.data, reply_message.len); if (TransactionIdIsValid(xmin)) master_has_standby_xmin = true; else |