aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/tablesync.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r--src/backend/replication/logical/tablesync.c34
1 files changed, 16 insertions, 18 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e50b9f79056..acc6498567d 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -298,11 +298,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
SpinLockRelease(&MyLogicalRepWorker->relmutex);
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ MyLogicalRepWorker->relstate,
+ MyLogicalRepWorker->relstate_lsn);
walrcv_endstreaming(wrconn, &tli);
finish_sync_worker();
@@ -427,9 +426,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
StartTransactionCommand();
started_tx = true;
}
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- rstate->relid, rstate->state,
- rstate->lsn, true);
+
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ rstate->relid, rstate->state,
+ rstate->lsn);
}
}
else
@@ -870,11 +870,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
/* Update the state and make it visible to others. */
StartTransactionCommand();
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ MyLogicalRepWorker->relstate,
+ MyLogicalRepWorker->relstate_lsn);
CommitTransactionCommand();
pgstat_report_stat(false);
@@ -961,11 +960,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* Update the new state in catalog. No need to bother
* with the shmem state as we are exiting for good.
*/
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- SUBREL_STATE_SYNCDONE,
- *origin_startpos,
- true);
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ SUBREL_STATE_SYNCDONE,
+ *origin_startpos);
finish_sync_worker();
}
break;