aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c327
1 files changed, 163 insertions, 164 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 04813b506e1..9d1eab9e1e6 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -88,29 +88,29 @@
typedef struct FlushPosition
{
- dlist_node node;
- XLogRecPtr local_end;
- XLogRecPtr remote_end;
+ dlist_node node;
+ XLogRecPtr local_end;
+ XLogRecPtr remote_end;
} FlushPosition;
static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
typedef struct SlotErrCallbackArg
{
- LogicalRepRelation *rel;
+ LogicalRepRelation *rel;
int attnum;
} SlotErrCallbackArg;
-static MemoryContext ApplyMessageContext = NULL;
-MemoryContext ApplyContext = NULL;
+static MemoryContext ApplyMessageContext = NULL;
+MemoryContext ApplyContext = NULL;
-WalReceiverConn *wrconn = NULL;
+WalReceiverConn *wrconn = NULL;
-Subscription *MySubscription = NULL;
-bool MySubscriptionValid = false;
+Subscription *MySubscription = NULL;
+bool MySubscriptionValid = false;
-bool in_remote_transaction = false;
-static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+bool in_remote_transaction = false;
+static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
@@ -215,7 +215,7 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
*/
static void
slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
- TupleTableSlot *slot)
+ TupleTableSlot *slot)
{
TupleDesc desc = RelationGetDescr(rel->localrel);
int num_phys_attrs = desc->natts;
@@ -271,9 +271,9 @@ slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate,
static void
slot_store_error_callback(void *arg)
{
- SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
- Oid remotetypoid,
- localtypoid;
+ SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg;
+ Oid remotetypoid,
+ localtypoid;
if (errarg->attnum < 0)
return;
@@ -295,12 +295,12 @@ slot_store_error_callback(void *arg)
*/
static void
slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
- char **values)
+ char **values)
{
- int natts = slot->tts_tupleDescriptor->natts;
- int i;
- SlotErrCallbackArg errarg;
- ErrorContextCallback errcallback;
+ int natts = slot->tts_tupleDescriptor->natts;
+ int i;
+ SlotErrCallbackArg errarg;
+ ErrorContextCallback errcallback;
ExecClearTuple(slot);
@@ -315,14 +315,14 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
/* Call the "in" function for each non-dropped attribute */
for (i = 0; i < natts; i++)
{
- Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
- int remoteattnum = rel->attrmap[i];
+ Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
+ int remoteattnum = rel->attrmap[i];
if (!att->attisdropped && remoteattnum >= 0 &&
values[remoteattnum] != NULL)
{
- Oid typinput;
- Oid typioparam;
+ Oid typinput;
+ Oid typioparam;
errarg.attnum = remoteattnum;
@@ -359,12 +359,12 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
*/
static void
slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
- char **values, bool *replaces)
+ char **values, bool *replaces)
{
- int natts = slot->tts_tupleDescriptor->natts;
- int i;
- SlotErrCallbackArg errarg;
- ErrorContextCallback errcallback;
+ int natts = slot->tts_tupleDescriptor->natts;
+ int i;
+ SlotErrCallbackArg errarg;
+ ErrorContextCallback errcallback;
slot_getallattrs(slot);
ExecClearTuple(slot);
@@ -380,16 +380,16 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
/* Call the "in" function for each replaced attribute */
for (i = 0; i < natts; i++)
{
- Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
- int remoteattnum = rel->attrmap[i];
+ Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i];
+ int remoteattnum = rel->attrmap[i];
if (remoteattnum >= 0 && !replaces[remoteattnum])
continue;
if (remoteattnum >= 0 && values[remoteattnum] != NULL)
{
- Oid typinput;
- Oid typioparam;
+ Oid typinput;
+ Oid typioparam;
errarg.attnum = remoteattnum;
@@ -418,7 +418,7 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel,
static void
apply_handle_begin(StringInfo s)
{
- LogicalRepBeginData begin_data;
+ LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data);
@@ -437,7 +437,7 @@ apply_handle_begin(StringInfo s)
static void
apply_handle_commit(StringInfo s)
{
- LogicalRepCommitData commit_data;
+ LogicalRepCommitData commit_data;
logicalrep_read_commit(s, &commit_data);
@@ -476,8 +476,8 @@ static void
apply_handle_origin(StringInfo s)
{
/*
- * ORIGIN message can only come inside remote transaction and before
- * any actual writes.
+ * ORIGIN message can only come inside remote transaction and before any
+ * actual writes.
*/
if (!in_remote_transaction ||
(IsTransactionState() && !am_tablesync_worker()))
@@ -497,7 +497,7 @@ apply_handle_origin(StringInfo s)
static void
apply_handle_relation(StringInfo s)
{
- LogicalRepRelation *rel;
+ LogicalRepRelation *rel;
rel = logicalrep_read_rel(s);
logicalrep_relmap_update(rel);
@@ -512,7 +512,7 @@ apply_handle_relation(StringInfo s)
static void
apply_handle_type(StringInfo s)
{
- LogicalRepTyp typ;
+ LogicalRepTyp typ;
logicalrep_read_typ(s, &typ);
logicalrep_typmap_update(&typ);
@@ -526,7 +526,7 @@ apply_handle_type(StringInfo s)
static Oid
GetRelationIdentityOrPK(Relation rel)
{
- Oid idxoid;
+ Oid idxoid;
idxoid = RelationGetReplicaIndex(rel);
@@ -543,11 +543,11 @@ static void
apply_handle_insert(StringInfo s)
{
LogicalRepRelMapEntry *rel;
- LogicalRepTupleData newtup;
- LogicalRepRelId relid;
- EState *estate;
- TupleTableSlot *remoteslot;
- MemoryContext oldctx;
+ LogicalRepTupleData newtup;
+ LogicalRepRelId relid;
+ EState *estate;
+ TupleTableSlot *remoteslot;
+ MemoryContext oldctx;
ensure_transaction();
@@ -607,15 +607,15 @@ check_relation_updatable(LogicalRepRelMapEntry *rel)
return;
/*
- * We are in error mode so it's fine this is somewhat slow.
- * It's better to give user correct error.
+ * We are in error mode so it's fine this is somewhat slow. It's better to
+ * give user correct error.
*/
if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
{
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("publisher does not send replica identity column "
- "expected by the logical replication target relation \"%s.%s\"",
+ "expected by the logical replication target relation \"%s.%s\"",
rel->remoterel.nspname, rel->remoterel.relname)));
}
@@ -637,17 +637,17 @@ static void
apply_handle_update(StringInfo s)
{
LogicalRepRelMapEntry *rel;
- LogicalRepRelId relid;
- Oid idxoid;
- EState *estate;
- EPQState epqstate;
- LogicalRepTupleData oldtup;
- LogicalRepTupleData newtup;
- bool has_oldtup;
- TupleTableSlot *localslot;
- TupleTableSlot *remoteslot;
- bool found;
- MemoryContext oldctx;
+ LogicalRepRelId relid;
+ Oid idxoid;
+ EState *estate;
+ EPQState epqstate;
+ LogicalRepTupleData oldtup;
+ LogicalRepTupleData newtup;
+ bool has_oldtup;
+ TupleTableSlot *localslot;
+ TupleTableSlot *remoteslot;
+ bool found;
+ MemoryContext oldctx;
ensure_transaction();
@@ -685,8 +685,8 @@ apply_handle_update(StringInfo s)
MemoryContextSwitchTo(oldctx);
/*
- * Try to find tuple using either replica identity index, primary key
- * or if needed, sequential scan.
+ * Try to find tuple using either replica identity index, primary key or
+ * if needed, sequential scan.
*/
idxoid = GetRelationIdentityOrPK(rel->localrel);
Assert(OidIsValid(idxoid) ||
@@ -758,15 +758,15 @@ static void
apply_handle_delete(StringInfo s)
{
LogicalRepRelMapEntry *rel;
- LogicalRepTupleData oldtup;
- LogicalRepRelId relid;
- Oid idxoid;
- EState *estate;
- EPQState epqstate;
- TupleTableSlot *remoteslot;
- TupleTableSlot *localslot;
- bool found;
- MemoryContext oldctx;
+ LogicalRepTupleData oldtup;
+ LogicalRepRelId relid;
+ Oid idxoid;
+ EState *estate;
+ EPQState epqstate;
+ TupleTableSlot *remoteslot;
+ TupleTableSlot *localslot;
+ bool found;
+ MemoryContext oldctx;
ensure_transaction();
@@ -802,8 +802,8 @@ apply_handle_delete(StringInfo s)
MemoryContextSwitchTo(oldctx);
/*
- * Try to find tuple using either replica identity index, primary key
- * or if needed, sequential scan.
+ * Try to find tuple using either replica identity index, primary key or
+ * if needed, sequential scan.
*/
idxoid = GetRelationIdentityOrPK(rel->localrel);
Assert(OidIsValid(idxoid) ||
@@ -826,7 +826,7 @@ apply_handle_delete(StringInfo s)
}
else
{
- /* The tuple to be deleted could not be found.*/
+ /* The tuple to be deleted could not be found. */
ereport(DEBUG1,
(errmsg("logical replication could not find row for delete "
"in replication target %s",
@@ -856,46 +856,46 @@ apply_handle_delete(StringInfo s)
static void
apply_dispatch(StringInfo s)
{
- char action = pq_getmsgbyte(s);
+ char action = pq_getmsgbyte(s);
switch (action)
{
- /* BEGIN */
+ /* BEGIN */
case 'B':
apply_handle_begin(s);
break;
- /* COMMIT */
+ /* COMMIT */
case 'C':
apply_handle_commit(s);
break;
- /* INSERT */
+ /* INSERT */
case 'I':
apply_handle_insert(s);
break;
- /* UPDATE */
+ /* UPDATE */
case 'U':
apply_handle_update(s);
break;
- /* DELETE */
+ /* DELETE */
case 'D':
apply_handle_delete(s);
break;
- /* RELATION */
+ /* RELATION */
case 'R':
apply_handle_relation(s);
break;
- /* TYPE */
+ /* TYPE */
case 'Y':
apply_handle_type(s);
break;
- /* ORIGIN */
+ /* ORIGIN */
case 'O':
apply_handle_origin(s);
break;
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("invalid logical replication message type %c", action)));
+ errmsg("invalid logical replication message type %c", action)));
}
}
@@ -925,7 +925,7 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
dlist_foreach_modify(iter, &lsn_mapping)
{
FlushPosition *pos =
- dlist_container(FlushPosition, node, iter.cur);
+ dlist_container(FlushPosition, node, iter.cur);
*write = pos->remote_end;
@@ -995,12 +995,12 @@ static void
LogicalRepApplyLoop(XLogRecPtr last_received)
{
/*
- * Init the ApplyMessageContext which we clean up after each
- * replication protocol message.
+ * Init the ApplyMessageContext which we clean up after each replication
+ * protocol message.
*/
ApplyMessageContext = AllocSetContextCreate(ApplyContext,
- "ApplyMessageContext",
- ALLOCSET_DEFAULT_SIZES);
+ "ApplyMessageContext",
+ ALLOCSET_DEFAULT_SIZES);
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
@@ -1039,7 +1039,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
}
else
{
- int c;
+ int c;
StringInfoData s;
/* Reset timeout. */
@@ -1108,7 +1108,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
{
/*
* If we didn't get any transactions for a while there might be
- * unconsumed invalidation messages in the queue, consume them now.
+ * unconsumed invalidation messages in the queue, consume them
+ * now.
*/
AcceptInvalidationMessages();
if (!MySubscriptionValid)
@@ -1126,6 +1127,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
if (endofstream)
{
TimeLineID tli;
+
walrcv_endstreaming(wrconn, &tli);
break;
}
@@ -1152,19 +1154,18 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
if (rc & WL_TIMEOUT)
{
/*
- * We didn't receive anything new. If we haven't heard
- * anything from the server for more than
- * wal_receiver_timeout / 2, ping the server. Also, if
- * it's been longer than wal_receiver_status_interval
- * since the last update we sent, send a status update to
- * the master anyway, to report any progress in applying
- * WAL.
+ * We didn't receive anything new. If we haven't heard anything
+ * from the server for more than wal_receiver_timeout / 2, ping
+ * the server. Also, if it's been longer than
+ * wal_receiver_status_interval since the last update we sent,
+ * send a status update to the master anyway, to report any
+ * progress in applying WAL.
*/
bool requestReply = false;
/*
- * Check if time since last receive from standby has
- * reached the configured limit.
+ * Check if time since last receive from standby has reached the
+ * configured limit.
*/
if (wal_receiver_timeout > 0)
{
@@ -1180,13 +1181,13 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
(errmsg("terminating logical replication worker due to timeout")));
/*
- * We didn't receive anything new, for half of
- * receiver replication timeout. Ping the server.
+ * We didn't receive anything new, for half of receiver
+ * replication timeout. Ping the server.
*/
if (!ping_sent)
{
timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
- (wal_receiver_timeout / 2));
+ (wal_receiver_timeout / 2));
if (now >= timeout)
{
requestReply = true;
@@ -1211,17 +1212,17 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
static void
send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
{
- static StringInfo reply_message = NULL;
- static TimestampTz send_time = 0;
+ static StringInfo reply_message = NULL;
+ static TimestampTz send_time = 0;
static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
static XLogRecPtr last_writepos = InvalidXLogRecPtr;
static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
- XLogRecPtr writepos;
- XLogRecPtr flushpos;
+ XLogRecPtr writepos;
+ XLogRecPtr flushpos;
TimestampTz now;
- bool have_pending_txes;
+ bool have_pending_txes;
/*
* If the user doesn't want status to be reported to the publisher, be
@@ -1237,8 +1238,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
get_flush_position(&writepos, &flushpos, &have_pending_txes);
/*
- * No outstanding transactions to flush, we can report the latest
- * received position. This is important for synchronous replication.
+ * No outstanding transactions to flush, we can report the latest received
+ * position. This is important for synchronous replication.
*/
if (!have_pending_txes)
flushpos = writepos = recvpos;
@@ -1262,7 +1263,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
if (!reply_message)
{
- MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
+ MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
+
reply_message = makeStringInfo();
MemoryContextSwitchTo(oldctx);
}
@@ -1273,7 +1275,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
pq_sendint64(reply_message, recvpos); /* write */
pq_sendint64(reply_message, flushpos); /* flush */
pq_sendint64(reply_message, writepos); /* apply */
- pq_sendint64(reply_message, now); /* sendTime */
+ pq_sendint64(reply_message, now); /* sendTime */
pq_sendbyte(reply_message, requestReply); /* replyRequested */
elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
@@ -1300,9 +1302,9 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
static void
reread_subscription(void)
{
- MemoryContext oldctx;
- Subscription *newsub;
- bool started_tx = false;
+ MemoryContext oldctx;
+ Subscription *newsub;
+ bool started_tx = false;
/* This function might be called inside or outside of transaction. */
if (!IsTransactionState())
@@ -1317,47 +1319,45 @@ reread_subscription(void)
newsub = GetSubscription(MyLogicalRepWorker->subid, true);
/*
- * Exit if the subscription was removed.
- * This normally should not happen as the worker gets killed
- * during DROP SUBSCRIPTION.
+ * Exit if the subscription was removed. This normally should not happen
+ * as the worker gets killed during DROP SUBSCRIPTION.
*/
if (!newsub)
{
ereport(LOG,
- (errmsg("logical replication worker for subscription \"%s\" will "
- "stop because the subscription was removed",
- MySubscription->name)));
+ (errmsg("logical replication worker for subscription \"%s\" will "
+ "stop because the subscription was removed",
+ MySubscription->name)));
walrcv_disconnect(wrconn);
proc_exit(0);
}
/*
- * Exit if the subscription was disabled.
- * This normally should not happen as the worker gets killed
- * during ALTER SUBSCRIPTION ... DISABLE.
+ * Exit if the subscription was disabled. This normally should not happen
+ * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE.
*/
if (!newsub->enabled)
{
ereport(LOG,
- (errmsg("logical replication worker for subscription \"%s\" will "
- "stop because the subscription was disabled",
- MySubscription->name)));
+ (errmsg("logical replication worker for subscription \"%s\" will "
+ "stop because the subscription was disabled",
+ MySubscription->name)));
walrcv_disconnect(wrconn);
proc_exit(0);
}
/*
- * Exit if connection string was changed. The launcher will start
- * new worker.
+ * Exit if connection string was changed. The launcher will start new
+ * worker.
*/
if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0)
{
ereport(LOG,
- (errmsg("logical replication worker for subscription \"%s\" will "
- "restart because the connection information was changed",
- MySubscription->name)));
+ (errmsg("logical replication worker for subscription \"%s\" will "
+ "restart because the connection information was changed",
+ MySubscription->name)));
walrcv_disconnect(wrconn);
proc_exit(0);
@@ -1370,9 +1370,9 @@ reread_subscription(void)
if (strcmp(newsub->name, MySubscription->name) != 0)
{
ereport(LOG,
- (errmsg("logical replication worker for subscription \"%s\" will "
- "restart because subscription was renamed",
- MySubscription->name)));
+ (errmsg("logical replication worker for subscription \"%s\" will "
+ "restart because subscription was renamed",
+ MySubscription->name)));
walrcv_disconnect(wrconn);
proc_exit(0);
@@ -1382,30 +1382,30 @@ reread_subscription(void)
Assert(newsub->slotname);
/*
- * We need to make new connection to new slot if slot name has changed
- * so exit here as well if that's the case.
+ * We need to make new connection to new slot if slot name has changed so
+ * exit here as well if that's the case.
*/
if (strcmp(newsub->slotname, MySubscription->slotname) != 0)
{
ereport(LOG,
- (errmsg("logical replication worker for subscription \"%s\" will "
- "restart because the replication slot name was changed",
- MySubscription->name)));
+ (errmsg("logical replication worker for subscription \"%s\" will "
+ "restart because the replication slot name was changed",
+ MySubscription->name)));
walrcv_disconnect(wrconn);
proc_exit(0);
}
/*
- * Exit if publication list was changed. The launcher will start
- * new worker.
+ * Exit if publication list was changed. The launcher will start new
+ * worker.
*/
if (!equal(newsub->publications, MySubscription->publications))
{
ereport(LOG,
- (errmsg("logical replication worker for subscription \"%s\" will "
- "restart because subscription's publications were changed",
- MySubscription->name)));
+ (errmsg("logical replication worker for subscription \"%s\" will "
+ "restart because subscription's publications were changed",
+ MySubscription->name)));
walrcv_disconnect(wrconn);
proc_exit(0);
@@ -1448,11 +1448,11 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
void
ApplyWorkerMain(Datum main_arg)
{
- int worker_slot = DatumGetInt32(main_arg);
- MemoryContext oldctx;
- char originname[NAMEDATALEN];
- XLogRecPtr origin_startpos;
- char *myslotname;
+ int worker_slot = DatumGetInt32(main_arg);
+ MemoryContext oldctx;
+ char originname[NAMEDATALEN];
+ XLogRecPtr origin_startpos;
+ char *myslotname;
WalRcvStreamOptions options;
/* Attach to slot */
@@ -1488,8 +1488,8 @@ ApplyWorkerMain(Datum main_arg)
/* Load the subscription into persistent memory context. */
ApplyContext = AllocSetContextCreate(TopMemoryContext,
- "ApplyContext",
- ALLOCSET_DEFAULT_SIZES);
+ "ApplyContext",
+ ALLOCSET_DEFAULT_SIZES);
StartTransactionCommand();
oldctx = MemoryContextSwitchTo(ApplyContext);
MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
@@ -1503,9 +1503,9 @@ ApplyWorkerMain(Datum main_arg)
if (!MySubscription->enabled)
{
ereport(LOG,
- (errmsg("logical replication worker for subscription \"%s\" will not "
- "start because the subscription was disabled during startup",
- MySubscription->name)));
+ (errmsg("logical replication worker for subscription \"%s\" will not "
+ "start because the subscription was disabled during startup",
+ MySubscription->name)));
proc_exit(0);
}
@@ -1530,7 +1530,7 @@ ApplyWorkerMain(Datum main_arg)
if (am_tablesync_worker())
{
- char *syncslotname;
+ char *syncslotname;
/* This is table synchroniation worker, call initial sync. */
syncslotname = LogicalRepSyncTableStart(&origin_startpos);
@@ -1545,10 +1545,10 @@ ApplyWorkerMain(Datum main_arg)
else
{
/* This is main apply worker */
- RepOriginId originid;
- TimeLineID startpointTLI;
- char *err;
- int server_version;
+ RepOriginId originid;
+ TimeLineID startpointTLI;
+ char *err;
+ int server_version;
myslotname = MySubscription->slotname;
@@ -1570,9 +1570,8 @@ ApplyWorkerMain(Datum main_arg)
(errmsg("could not connect to the publisher: %s", err)));
/*
- * We don't really use the output identify_system for anything
- * but it does some initializations on the upstream so let's still
- * call it.
+ * We don't really use the output identify_system for anything but it
+ * does some initializations on the upstream so let's still call it.
*/
(void) walrcv_identify_system(wrconn, &startpointTLI,
&server_version);
@@ -1580,8 +1579,8 @@ ApplyWorkerMain(Datum main_arg)
}
/*
- * Setup callback for syscache so that we know when something
- * changes in the subscription relation state.
+ * Setup callback for syscache so that we know when something changes in
+ * the subscription relation state.
*/
CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
invalidate_syncing_table_states,