diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/catalog/pg_subscription.c | 27 | ||||
-rw-r--r-- | src/backend/replication/logical/relation.c | 3 | ||||
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 261 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 15 | ||||
-rw-r--r-- | src/include/catalog/pg_subscription_rel.h | 3 |
5 files changed, 131 insertions, 178 deletions
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 311d46225ad..ca78d395181 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -328,20 +328,16 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, /* * Get state of subscription table. * - * Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true. + * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription. */ char -GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, - bool missing_ok) +GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn) { - Relation rel; HeapTuple tup; char substate; bool isnull; Datum d; - rel = table_open(SubscriptionRelRelationId, AccessShareLock); - /* Try finding the mapping. */ tup = SearchSysCache2(SUBSCRIPTIONRELMAP, ObjectIdGetDatum(relid), @@ -349,22 +345,14 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, if (!HeapTupleIsValid(tup)) { - if (missing_ok) - { - table_close(rel, AccessShareLock); - *sublsn = InvalidXLogRecPtr; - return SUBREL_STATE_UNKNOWN; - } - - elog(ERROR, "subscription table %u in subscription %u does not exist", - relid, subid); + *sublsn = InvalidXLogRecPtr; + return SUBREL_STATE_UNKNOWN; } /* Get the state. */ - d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, - Anum_pg_subscription_rel_srsubstate, &isnull); - Assert(!isnull); - substate = DatumGetChar(d); + substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate; + + /* Get the LSN */ d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, Anum_pg_subscription_rel_srsublsn, &isnull); if (isnull) @@ -374,7 +362,6 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, /* Cleanup */ ReleaseSysCache(tup); - table_close(rel, AccessShareLock); return substate; } diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index a6596f79a66..07aa52977f9 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -437,8 +437,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) if (entry->state != SUBREL_STATE_READY) entry->state = GetSubscriptionRelState(MySubscription->oid, entry->localreloid, - &entry->statelsn, - true); + &entry->statelsn); return entry; } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 843c9285d59..a91b00ed4bc 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1,6 +1,6 @@ /*------------------------------------------------------------------------- * tablesync.c - * PostgreSQL logical replication + * PostgreSQL logical replication: initial table data synchronization * * Copyright (c) 2012-2020, PostgreSQL Global Development Group * @@ -26,26 +26,30 @@ * - It allows us to synchronize any tables added after the initial * synchronization has finished. * - * The stream position synchronization works in multiple steps. - * - Sync finishes copy and sets worker state as SYNCWAIT and waits for - * state to change in a loop. - * - Apply periodically checks tables that are synchronizing for SYNCWAIT. - * When the desired state appears, it will set the worker state to - * CATCHUP and starts loop-waiting until either the table state is set - * to SYNCDONE or the sync worker exits. + * The stream position synchronization works in multiple steps: + * - Apply worker requests a tablesync worker to start, setting the new + * table state to INIT. + * - Tablesync worker starts; changes table state from INIT to DATASYNC while + * copying. + * - Tablesync worker finishes the copy and sets table state to SYNCWAIT; + * waits for state change. + * - Apply worker periodically checks for tables in SYNCWAIT state. When + * any appear, it sets the table state to CATCHUP and starts loop-waiting + * until either the table state is set to SYNCDONE or the sync worker + * exits. * - After the sync worker has seen the state change to CATCHUP, it will * read the stream and apply changes (acting like an apply worker) until * it catches up to the specified stream position. Then it sets the * state to SYNCDONE. There might be zero changes applied between * CATCHUP and SYNCDONE, because the sync worker might be ahead of the * apply worker. - * - Once the state was set to SYNCDONE, the apply will continue tracking + * - Once the state is set to SYNCDONE, the apply will continue tracking * the table until it reaches the SYNCDONE stream position, at which * point it sets state to READY and stops tracking. Again, there might * be zero changes in between. * - * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP -> - * SYNCDONE -> READY. + * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> + * CATCHUP -> SYNCDONE -> READY. * * The catalog pg_subscription_rel is used to keep information about * subscribed tables and their state. Some transient state during data @@ -67,7 +71,8 @@ * -> continue rep * apply:11 * -> set in catalog READY - * - Sync in front: + * + * - Sync is in front: * sync:10 * -> set in memory SYNCWAIT * apply:8 @@ -142,13 +147,14 @@ finish_sync_worker(void) } /* - * Wait until the relation synchronization state is set in the catalog to the - * expected one. + * Wait until the relation sync state is set in the catalog to the expected + * one; return true when it happens. * - * Used when transitioning from CATCHUP state to SYNCDONE. + * Returns false if the table sync worker or the table itself have + * disappeared, or the table state has been reset. * - * Returns false if the synchronization worker has disappeared or the table state - * has been reset. + * Currently, this is used in the apply worker when transitioning from + * CATCHUP state to SYNCDONE. */ static bool wait_for_relation_state_change(Oid relid, char expected_state) @@ -162,28 +168,23 @@ wait_for_relation_state_change(Oid relid, char expected_state) CHECK_FOR_INTERRUPTS(); - /* XXX use cache invalidation here to improve performance? */ - PushActiveSnapshot(GetLatestSnapshot()); + InvalidateCatalogSnapshot(); state = GetSubscriptionRelState(MyLogicalRepWorker->subid, - relid, &statelsn, true); - PopActiveSnapshot(); + relid, &statelsn); if (state == SUBREL_STATE_UNKNOWN) - return false; + break; if (state == expected_state) return true; /* Check if the sync worker is still running and bail if not. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - - /* Check if the opposite worker is still running and bail if not. */ - worker = logicalrep_worker_find(MyLogicalRepWorker->subid, - am_tablesync_worker() ? InvalidOid : relid, + worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid, false); LWLockRelease(LogicalRepWorkerLock); if (!worker) - return false; + break; (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, @@ -810,6 +811,9 @@ copy_table(Relation rel) /* * Start syncing the table in the sync worker. * + * If nothing needs to be done to sync the table, we exit the worker without + * any further action. + * * The returned slot name is palloc'ed in current memory context. */ char * @@ -819,12 +823,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) char *err; char relstate; XLogRecPtr relstate_lsn; + Relation rel; + WalRcvExecResult *res; /* Check the state of the table synchronization. */ StartTransactionCommand(); relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, - &relstate_lsn, true); + &relstate_lsn); CommitTransactionCommand(); SpinLockAcquire(&MyLogicalRepWorker->relmutex); @@ -833,6 +839,18 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) SpinLockRelease(&MyLogicalRepWorker->relmutex); /* + * If synchronization is already done or no longer necessary, exit now + * that we've updated shared memory state. + */ + switch (relstate) + { + case SUBREL_STATE_SYNCDONE: + case SUBREL_STATE_READY: + case SUBREL_STATE_UNKNOWN: + finish_sync_worker(); /* doesn't return */ + } + + /* * To build a slot name for the sync work, we are limited to NAMEDATALEN - * 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the @@ -856,134 +874,87 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) ereport(ERROR, (errmsg("could not connect to the publisher: %s", err))); - switch (MyLogicalRepWorker->relstate) - { - case SUBREL_STATE_INIT: - case SUBREL_STATE_DATASYNC: - { - Relation rel; - WalRcvExecResult *res; + Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT || + MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC); - SpinLockAcquire(&MyLogicalRepWorker->relmutex); - MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; - MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr; - SpinLockRelease(&MyLogicalRepWorker->relmutex); - - /* Update the state and make it visible to others. */ - StartTransactionCommand(); - UpdateSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn); - CommitTransactionCommand(); - pgstat_report_stat(false); + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; + MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr; + SpinLockRelease(&MyLogicalRepWorker->relmutex); - /* - * We want to do the table data sync in a single transaction. - */ - StartTransactionCommand(); + /* Update the state and make it visible to others. */ + StartTransactionCommand(); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relstate_lsn); + CommitTransactionCommand(); + pgstat_report_stat(false); - /* - * Use a standard write lock here. It might be better to - * disallow access to the table while it's being synchronized. - * But we don't want to block the main apply process from - * working and it has to open the relation in RowExclusiveLock - * when remapping remote relation id to local one. - */ - rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock); + /* + * We want to do the table data sync in a single transaction. + */ + StartTransactionCommand(); - /* - * Create a temporary slot for the sync process. We do this - * inside the transaction so that we can use the snapshot made - * by the slot to get existing data. - */ - res = walrcv_exec(wrconn, - "BEGIN READ ONLY ISOLATION LEVEL " - "REPEATABLE READ", 0, NULL); - if (res->status != WALRCV_OK_COMMAND) - ereport(ERROR, - (errmsg("table copy could not start transaction on publisher"), - errdetail("The error was: %s", res->err))); - walrcv_clear_result(res); + /* + * Use a standard write lock here. It might be better to disallow access + * to the table while it's being synchronized. But we don't want to block + * the main apply process from working and it has to open the relation in + * RowExclusiveLock when remapping remote relation id to local one. + */ + rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock); - /* - * Create new temporary logical decoding slot. - * - * We'll use slot for data copy so make sure the snapshot is - * used for the transaction; that way the COPY will get data - * that is consistent with the lsn used by the slot to start - * decoding. - */ - walrcv_create_slot(wrconn, slotname, true, - CRS_USE_SNAPSHOT, origin_startpos); + /* + * Start a transaction in the remote node in REPEATABLE READ mode. This + * ensures that both the replication slot we create (see below) and the + * COPY are consistent with each other. + */ + res = walrcv_exec(wrconn, + "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ", + 0, NULL); + if (res->status != WALRCV_OK_COMMAND) + ereport(ERROR, + (errmsg("table copy could not start transaction on publisher"), + errdetail("The error was: %s", res->err))); + walrcv_clear_result(res); - PushActiveSnapshot(GetTransactionSnapshot()); - copy_table(rel); - PopActiveSnapshot(); + /* + * Create a new temporary logical decoding slot. This slot will be used + * for the catchup phase after COPY is done, so tell it to use the + * snapshot to make the final data consistent. + */ + walrcv_create_slot(wrconn, slotname, true, + CRS_USE_SNAPSHOT, origin_startpos); - res = walrcv_exec(wrconn, "COMMIT", 0, NULL); - if (res->status != WALRCV_OK_COMMAND) - ereport(ERROR, - (errmsg("table copy could not finish transaction on publisher"), - errdetail("The error was: %s", res->err))); - walrcv_clear_result(res); + /* Now do the initial data copy */ + PushActiveSnapshot(GetTransactionSnapshot()); + copy_table(rel); + PopActiveSnapshot(); - table_close(rel, NoLock); + res = walrcv_exec(wrconn, "COMMIT", 0, NULL); + if (res->status != WALRCV_OK_COMMAND) + ereport(ERROR, + (errmsg("table copy could not finish transaction on publisher"), + errdetail("The error was: %s", res->err))); + walrcv_clear_result(res); - /* Make the copy visible. */ - CommandCounterIncrement(); + table_close(rel, NoLock); - /* - * We are done with the initial data synchronization, update - * the state. - */ - SpinLockAcquire(&MyLogicalRepWorker->relmutex); - MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT; - MyLogicalRepWorker->relstate_lsn = *origin_startpos; - SpinLockRelease(&MyLogicalRepWorker->relmutex); - - /* Wait for main apply worker to tell us to catchup. */ - wait_for_worker_state_change(SUBREL_STATE_CATCHUP); - - /*---------- - * There are now two possible states here: - * a) Sync is behind the apply. If that's the case we need to - * catch up with it by consuming the logical replication - * stream up to the relstate_lsn. For that, we exit this - * function and continue in ApplyWorkerMain(). - * b) Sync is caught up with the apply. So it can just set - * the state to SYNCDONE and finish. - *---------- - */ - if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn) - { - /* - * Update the new state in catalog. No need to bother - * with the shmem state as we are exiting for good. - */ - UpdateSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - SUBREL_STATE_SYNCDONE, - *origin_startpos); - finish_sync_worker(); - } - break; - } - case SUBREL_STATE_SYNCDONE: - case SUBREL_STATE_READY: - case SUBREL_STATE_UNKNOWN: + /* Make the copy visible. */ + CommandCounterIncrement(); - /* - * Nothing to do here but finish. (UNKNOWN means the relation was - * removed from pg_subscription_rel before the sync worker could - * start.) - */ - finish_sync_worker(); - break; - default: - elog(ERROR, "unknown relation state \"%c\"", - MyLogicalRepWorker->relstate); - } + /* + * We are done with the initial data synchronization, update the state. + */ + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT; + MyLogicalRepWorker->relstate_lsn = *origin_startpos; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + /* + * Finally, wait until the main apply worker tells us to catch up and then + * return to let LogicalRepApplyLoop do it. + */ + wait_for_worker_state_change(SUBREL_STATE_CATCHUP); return slotname; } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 640409b757f..b8e297c5d34 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2060,6 +2060,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) { TimestampTz last_recv_timestamp = GetCurrentTimestamp(); bool ping_sent = false; + TimeLineID tli; /* * Init the ApplyMessageContext which we clean up after each replication @@ -2201,12 +2202,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* Check if we need to exit the streaming loop. */ if (endofstream) - { - TimeLineID tli; - - walrcv_endstreaming(wrconn, &tli); break; - } /* * Wait for more data or latch. If we have unflushed transactions, @@ -2283,6 +2279,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received) send_feedback(last_received, requestReply, requestReply); } } + + /* All done */ + walrcv_endstreaming(wrconn, &tli); } /* @@ -3024,10 +3023,8 @@ ApplyWorkerMain(Datum main_arg) /* This is table synchronization worker, call initial sync. */ syncslotname = LogicalRepSyncTableStart(&origin_startpos); - /* The slot name needs to be allocated in permanent memory context. */ - oldctx = MemoryContextSwitchTo(ApplyContext); - myslotname = pstrdup(syncslotname); - MemoryContextSwitchTo(oldctx); + /* allocate slot name in long-lived context */ + myslotname = MemoryContextStrdup(ApplyContext, syncslotname); pfree(syncslotname); } diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index f384f4e7fa6..ff5c8d7ff91 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -80,8 +80,7 @@ extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn); extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn); -extern char GetSubscriptionRelState(Oid subid, Oid relid, - XLogRecPtr *sublsn, bool missing_ok); +extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern void RemoveSubscriptionRel(Oid subid, Oid relid); extern List *GetSubscriptionRelations(Oid subid); |