aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/catalog/pg_subscription.c27
-rw-r--r--src/backend/replication/logical/relation.c3
-rw-r--r--src/backend/replication/logical/tablesync.c261
-rw-r--r--src/backend/replication/logical/worker.c15
-rw-r--r--src/include/catalog/pg_subscription_rel.h3
5 files changed, 131 insertions, 178 deletions
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 311d46225ad..ca78d395181 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -328,20 +328,16 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
/*
* Get state of subscription table.
*
- * Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true.
+ * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
*/
char
-GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
- bool missing_ok)
+GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
{
- Relation rel;
HeapTuple tup;
char substate;
bool isnull;
Datum d;
- rel = table_open(SubscriptionRelRelationId, AccessShareLock);
-
/* Try finding the mapping. */
tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
ObjectIdGetDatum(relid),
@@ -349,22 +345,14 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
if (!HeapTupleIsValid(tup))
{
- if (missing_ok)
- {
- table_close(rel, AccessShareLock);
- *sublsn = InvalidXLogRecPtr;
- return SUBREL_STATE_UNKNOWN;
- }
-
- elog(ERROR, "subscription table %u in subscription %u does not exist",
- relid, subid);
+ *sublsn = InvalidXLogRecPtr;
+ return SUBREL_STATE_UNKNOWN;
}
/* Get the state. */
- d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
- Anum_pg_subscription_rel_srsubstate, &isnull);
- Assert(!isnull);
- substate = DatumGetChar(d);
+ substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
+
+ /* Get the LSN */
d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
Anum_pg_subscription_rel_srsublsn, &isnull);
if (isnull)
@@ -374,7 +362,6 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
/* Cleanup */
ReleaseSysCache(tup);
- table_close(rel, AccessShareLock);
return substate;
}
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index a6596f79a66..07aa52977f9 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -437,8 +437,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
if (entry->state != SUBREL_STATE_READY)
entry->state = GetSubscriptionRelState(MySubscription->oid,
entry->localreloid,
- &entry->statelsn,
- true);
+ &entry->statelsn);
return entry;
}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 843c9285d59..a91b00ed4bc 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1,6 +1,6 @@
/*-------------------------------------------------------------------------
* tablesync.c
- * PostgreSQL logical replication
+ * PostgreSQL logical replication: initial table data synchronization
*
* Copyright (c) 2012-2020, PostgreSQL Global Development Group
*
@@ -26,26 +26,30 @@
* - It allows us to synchronize any tables added after the initial
* synchronization has finished.
*
- * The stream position synchronization works in multiple steps.
- * - Sync finishes copy and sets worker state as SYNCWAIT and waits for
- * state to change in a loop.
- * - Apply periodically checks tables that are synchronizing for SYNCWAIT.
- * When the desired state appears, it will set the worker state to
- * CATCHUP and starts loop-waiting until either the table state is set
- * to SYNCDONE or the sync worker exits.
+ * The stream position synchronization works in multiple steps:
+ * - Apply worker requests a tablesync worker to start, setting the new
+ * table state to INIT.
+ * - Tablesync worker starts; changes table state from INIT to DATASYNC while
+ * copying.
+ * - Tablesync worker finishes the copy and sets table state to SYNCWAIT;
+ * waits for state change.
+ * - Apply worker periodically checks for tables in SYNCWAIT state. When
+ * any appear, it sets the table state to CATCHUP and starts loop-waiting
+ * until either the table state is set to SYNCDONE or the sync worker
+ * exits.
* - After the sync worker has seen the state change to CATCHUP, it will
* read the stream and apply changes (acting like an apply worker) until
* it catches up to the specified stream position. Then it sets the
* state to SYNCDONE. There might be zero changes applied between
* CATCHUP and SYNCDONE, because the sync worker might be ahead of the
* apply worker.
- * - Once the state was set to SYNCDONE, the apply will continue tracking
+ * - Once the state is set to SYNCDONE, the apply will continue tracking
* the table until it reaches the SYNCDONE stream position, at which
* point it sets state to READY and stops tracking. Again, there might
* be zero changes in between.
*
- * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP ->
- * SYNCDONE -> READY.
+ * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT ->
+ * CATCHUP -> SYNCDONE -> READY.
*
* The catalog pg_subscription_rel is used to keep information about
* subscribed tables and their state. Some transient state during data
@@ -67,7 +71,8 @@
* -> continue rep
* apply:11
* -> set in catalog READY
- * - Sync in front:
+ *
+ * - Sync is in front:
* sync:10
* -> set in memory SYNCWAIT
* apply:8
@@ -142,13 +147,14 @@ finish_sync_worker(void)
}
/*
- * Wait until the relation synchronization state is set in the catalog to the
- * expected one.
+ * Wait until the relation sync state is set in the catalog to the expected
+ * one; return true when it happens.
*
- * Used when transitioning from CATCHUP state to SYNCDONE.
+ * Returns false if the table sync worker or the table itself have
+ * disappeared, or the table state has been reset.
*
- * Returns false if the synchronization worker has disappeared or the table state
- * has been reset.
+ * Currently, this is used in the apply worker when transitioning from
+ * CATCHUP state to SYNCDONE.
*/
static bool
wait_for_relation_state_change(Oid relid, char expected_state)
@@ -162,28 +168,23 @@ wait_for_relation_state_change(Oid relid, char expected_state)
CHECK_FOR_INTERRUPTS();
- /* XXX use cache invalidation here to improve performance? */
- PushActiveSnapshot(GetLatestSnapshot());
+ InvalidateCatalogSnapshot();
state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
- relid, &statelsn, true);
- PopActiveSnapshot();
+ relid, &statelsn);
if (state == SUBREL_STATE_UNKNOWN)
- return false;
+ break;
if (state == expected_state)
return true;
/* Check if the sync worker is still running and bail if not. */
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-
- /* Check if the opposite worker is still running and bail if not. */
- worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
- am_tablesync_worker() ? InvalidOid : relid,
+ worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
false);
LWLockRelease(LogicalRepWorkerLock);
if (!worker)
- return false;
+ break;
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
@@ -810,6 +811,9 @@ copy_table(Relation rel)
/*
* Start syncing the table in the sync worker.
*
+ * If nothing needs to be done to sync the table, we exit the worker without
+ * any further action.
+ *
* The returned slot name is palloc'ed in current memory context.
*/
char *
@@ -819,12 +823,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
char *err;
char relstate;
XLogRecPtr relstate_lsn;
+ Relation rel;
+ WalRcvExecResult *res;
/* Check the state of the table synchronization. */
StartTransactionCommand();
relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
- &relstate_lsn, true);
+ &relstate_lsn);
CommitTransactionCommand();
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
@@ -833,6 +839,18 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
SpinLockRelease(&MyLogicalRepWorker->relmutex);
/*
+ * If synchronization is already done or no longer necessary, exit now
+ * that we've updated shared memory state.
+ */
+ switch (relstate)
+ {
+ case SUBREL_STATE_SYNCDONE:
+ case SUBREL_STATE_READY:
+ case SUBREL_STATE_UNKNOWN:
+ finish_sync_worker(); /* doesn't return */
+ }
+
+ /*
* To build a slot name for the sync work, we are limited to NAMEDATALEN -
* 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars
* and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the
@@ -856,134 +874,87 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
ereport(ERROR,
(errmsg("could not connect to the publisher: %s", err)));
- switch (MyLogicalRepWorker->relstate)
- {
- case SUBREL_STATE_INIT:
- case SUBREL_STATE_DATASYNC:
- {
- Relation rel;
- WalRcvExecResult *res;
+ Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
+ MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC);
- SpinLockAcquire(&MyLogicalRepWorker->relmutex);
- MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
- MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
- SpinLockRelease(&MyLogicalRepWorker->relmutex);
-
- /* Update the state and make it visible to others. */
- StartTransactionCommand();
- UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn);
- CommitTransactionCommand();
- pgstat_report_stat(false);
+ SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+ MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
+ MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
+ SpinLockRelease(&MyLogicalRepWorker->relmutex);
- /*
- * We want to do the table data sync in a single transaction.
- */
- StartTransactionCommand();
+ /* Update the state and make it visible to others. */
+ StartTransactionCommand();
+ UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ MyLogicalRepWorker->relstate,
+ MyLogicalRepWorker->relstate_lsn);
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
- /*
- * Use a standard write lock here. It might be better to
- * disallow access to the table while it's being synchronized.
- * But we don't want to block the main apply process from
- * working and it has to open the relation in RowExclusiveLock
- * when remapping remote relation id to local one.
- */
- rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
+ /*
+ * We want to do the table data sync in a single transaction.
+ */
+ StartTransactionCommand();
- /*
- * Create a temporary slot for the sync process. We do this
- * inside the transaction so that we can use the snapshot made
- * by the slot to get existing data.
- */
- res = walrcv_exec(wrconn,
- "BEGIN READ ONLY ISOLATION LEVEL "
- "REPEATABLE READ", 0, NULL);
- if (res->status != WALRCV_OK_COMMAND)
- ereport(ERROR,
- (errmsg("table copy could not start transaction on publisher"),
- errdetail("The error was: %s", res->err)));
- walrcv_clear_result(res);
+ /*
+ * Use a standard write lock here. It might be better to disallow access
+ * to the table while it's being synchronized. But we don't want to block
+ * the main apply process from working and it has to open the relation in
+ * RowExclusiveLock when remapping remote relation id to local one.
+ */
+ rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
- /*
- * Create new temporary logical decoding slot.
- *
- * We'll use slot for data copy so make sure the snapshot is
- * used for the transaction; that way the COPY will get data
- * that is consistent with the lsn used by the slot to start
- * decoding.
- */
- walrcv_create_slot(wrconn, slotname, true,
- CRS_USE_SNAPSHOT, origin_startpos);
+ /*
+ * Start a transaction in the remote node in REPEATABLE READ mode. This
+ * ensures that both the replication slot we create (see below) and the
+ * COPY are consistent with each other.
+ */
+ res = walrcv_exec(wrconn,
+ "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
+ 0, NULL);
+ if (res->status != WALRCV_OK_COMMAND)
+ ereport(ERROR,
+ (errmsg("table copy could not start transaction on publisher"),
+ errdetail("The error was: %s", res->err)));
+ walrcv_clear_result(res);
- PushActiveSnapshot(GetTransactionSnapshot());
- copy_table(rel);
- PopActiveSnapshot();
+ /*
+ * Create a new temporary logical decoding slot. This slot will be used
+ * for the catchup phase after COPY is done, so tell it to use the
+ * snapshot to make the final data consistent.
+ */
+ walrcv_create_slot(wrconn, slotname, true,
+ CRS_USE_SNAPSHOT, origin_startpos);
- res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
- if (res->status != WALRCV_OK_COMMAND)
- ereport(ERROR,
- (errmsg("table copy could not finish transaction on publisher"),
- errdetail("The error was: %s", res->err)));
- walrcv_clear_result(res);
+ /* Now do the initial data copy */
+ PushActiveSnapshot(GetTransactionSnapshot());
+ copy_table(rel);
+ PopActiveSnapshot();
- table_close(rel, NoLock);
+ res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+ if (res->status != WALRCV_OK_COMMAND)
+ ereport(ERROR,
+ (errmsg("table copy could not finish transaction on publisher"),
+ errdetail("The error was: %s", res->err)));
+ walrcv_clear_result(res);
- /* Make the copy visible. */
- CommandCounterIncrement();
+ table_close(rel, NoLock);
- /*
- * We are done with the initial data synchronization, update
- * the state.
- */
- SpinLockAcquire(&MyLogicalRepWorker->relmutex);
- MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
- MyLogicalRepWorker->relstate_lsn = *origin_startpos;
- SpinLockRelease(&MyLogicalRepWorker->relmutex);
-
- /* Wait for main apply worker to tell us to catchup. */
- wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
-
- /*----------
- * There are now two possible states here:
- * a) Sync is behind the apply. If that's the case we need to
- * catch up with it by consuming the logical replication
- * stream up to the relstate_lsn. For that, we exit this
- * function and continue in ApplyWorkerMain().
- * b) Sync is caught up with the apply. So it can just set
- * the state to SYNCDONE and finish.
- *----------
- */
- if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
- {
- /*
- * Update the new state in catalog. No need to bother
- * with the shmem state as we are exiting for good.
- */
- UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- SUBREL_STATE_SYNCDONE,
- *origin_startpos);
- finish_sync_worker();
- }
- break;
- }
- case SUBREL_STATE_SYNCDONE:
- case SUBREL_STATE_READY:
- case SUBREL_STATE_UNKNOWN:
+ /* Make the copy visible. */
+ CommandCounterIncrement();
- /*
- * Nothing to do here but finish. (UNKNOWN means the relation was
- * removed from pg_subscription_rel before the sync worker could
- * start.)
- */
- finish_sync_worker();
- break;
- default:
- elog(ERROR, "unknown relation state \"%c\"",
- MyLogicalRepWorker->relstate);
- }
+ /*
+ * We are done with the initial data synchronization, update the state.
+ */
+ SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+ MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
+ MyLogicalRepWorker->relstate_lsn = *origin_startpos;
+ SpinLockRelease(&MyLogicalRepWorker->relmutex);
+ /*
+ * Finally, wait until the main apply worker tells us to catch up and then
+ * return to let LogicalRepApplyLoop do it.
+ */
+ wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
return slotname;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 640409b757f..b8e297c5d34 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2060,6 +2060,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
{
TimestampTz last_recv_timestamp = GetCurrentTimestamp();
bool ping_sent = false;
+ TimeLineID tli;
/*
* Init the ApplyMessageContext which we clean up after each replication
@@ -2201,12 +2202,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/* Check if we need to exit the streaming loop. */
if (endofstream)
- {
- TimeLineID tli;
-
- walrcv_endstreaming(wrconn, &tli);
break;
- }
/*
* Wait for more data or latch. If we have unflushed transactions,
@@ -2283,6 +2279,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
send_feedback(last_received, requestReply, requestReply);
}
}
+
+ /* All done */
+ walrcv_endstreaming(wrconn, &tli);
}
/*
@@ -3024,10 +3023,8 @@ ApplyWorkerMain(Datum main_arg)
/* This is table synchronization worker, call initial sync. */
syncslotname = LogicalRepSyncTableStart(&origin_startpos);
- /* The slot name needs to be allocated in permanent memory context. */
- oldctx = MemoryContextSwitchTo(ApplyContext);
- myslotname = pstrdup(syncslotname);
- MemoryContextSwitchTo(oldctx);
+ /* allocate slot name in long-lived context */
+ myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
pfree(syncslotname);
}
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index f384f4e7fa6..ff5c8d7ff91 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -80,8 +80,7 @@ extern void AddSubscriptionRelState(Oid subid, Oid relid, char state,
XLogRecPtr sublsn);
extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
XLogRecPtr sublsn);
-extern char GetSubscriptionRelState(Oid subid, Oid relid,
- XLogRecPtr *sublsn, bool missing_ok);
+extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
extern void RemoveSubscriptionRel(Oid subid, Oid relid);
extern List *GetSubscriptionRelations(Oid subid);