diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 246 |
1 files changed, 136 insertions, 110 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 6e268f3521d..ed66602935f 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -27,55 +27,57 @@ * synchronization has finished. * * The stream position synchronization works in multiple steps. - * - Sync finishes copy and sets table state as SYNCWAIT and waits - * for state to change in a loop. + * - Sync finishes copy and sets worker state as SYNCWAIT and waits for + * state to change in a loop. * - Apply periodically checks tables that are synchronizing for SYNCWAIT. - * When the desired state appears it will compare its position in the - * stream with the SYNCWAIT position and based on that changes the - * state to based on following rules: - * - if the apply is in front of the sync in the WAL stream the new - * state is set to CATCHUP and apply loops until the sync process - * catches up to the same LSN as apply - * - if the sync is in front of the apply in the WAL stream the new - * state is set to SYNCDONE - * - if both apply and sync are at the same position in the WAL stream - * the state of the table is set to READY - * - If the state was set to CATCHUP sync will read the stream and - * apply changes until it catches up to the specified stream - * position and then sets state to READY and signals apply that it - * can stop waiting and exits, if the state was set to something - * else than CATCHUP the sync process will simply end. - * - If the state was set to SYNCDONE by apply, the apply will - * continue tracking the table until it reaches the SYNCDONE stream - * position at which point it sets state to READY and stops tracking. + * When the desired state appears, it will set the worker state to + * CATCHUP and starts loop-waiting until either the table state is set + * to SYNCDONE or the sync worker exits. + * - After the sync worker has seen the state change to CATCHUP, it will + * read the stream and apply changes (acting like an apply worker) until + * it catches up to the specified stream position. Then it sets the + * state to SYNCDONE. There might be zero changes applied between + * CATCHUP and SYNCDONE, because the sync worker might be ahead of the + * apply worker. + * - Once the state was set to SYNCDONE, the apply will continue tracking + * the table until it reaches the SYNCDONE stream position, at which + * point it sets state to READY and stops tracking. Again, there might + * be zero changes in between. + * + * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP -> + * SYNCDONE -> READY. * * The catalog pg_subscription_rel is used to keep information about - * subscribed tables and their state and some transient state during - * data synchronization is kept in shared memory. + * subscribed tables and their state. Some transient state during data + * synchronization is kept in shared memory. The states SYNCWAIT and + * CATCHUP only appear in memory. * * Example flows look like this: * - Apply is in front: * sync:8 - * -> set SYNCWAIT + * -> set in memory SYNCWAIT * apply:10 - * -> set CATCHUP + * -> set in memory CATCHUP * -> enter wait-loop * sync:10 - * -> set READY + * -> set in catalog SYNCDONE * -> exit * apply:10 * -> exit wait-loop * -> continue rep + * apply:11 + * -> set in catalog READY * - Sync in front: * sync:10 - * -> set SYNCWAIT + * -> set in memory SYNCWAIT * apply:8 - * -> set SYNCDONE + * -> set in memory CATCHUP * -> continue per-table filtering * sync:10 + * -> set in catalog SYNCDONE * -> exit * apply:10 - * -> set READY + * -> set in catalog READY * -> stop per-table filtering * -> continue rep *------------------------------------------------------------------------- @@ -100,6 +102,7 @@ #include "replication/walreceiver.h" #include "replication/worker_internal.h" +#include "utils/snapmgr.h" #include "storage/ipc.h" #include "utils/builtins.h" @@ -130,69 +133,107 @@ finish_sync_worker(void) /* And flush all writes. */ XLogFlush(GetXLogWriteRecPtr()); - /* Find the main apply worker and signal it. */ - logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); - StartTransactionCommand(); ereport(LOG, (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); CommitTransactionCommand(); + /* Find the main apply worker and signal it. */ + logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); + /* Stop gracefully */ proc_exit(0); } /* - * Wait until the table synchronization change. + * Wait until the relation synchronization state is set in catalog to the + * expected one. * - * If called from apply worker, it will wait for the synchronization worker to - * change table state in shmem. If called from synchronization worker, it - * will wait for apply worker to change table state in shmem. + * Used when transitioning from CATCHUP state to SYNCDONE. * - * Returns false if the opposite worker has disappeared or the table state has - * been reset. + * Returns false if the synchronization worker has disappeared or the table state + * has been reset. */ static bool -wait_for_sync_status_change(Oid relid, char origstate) +wait_for_relation_state_change(Oid relid, char expected_state) { int rc; - char state = origstate; + char state; for (;;) { LogicalRepWorker *worker; + XLogRecPtr statelsn; CHECK_FOR_INTERRUPTS(); + /* XXX use cache invalidation here to improve performance? */ + PushActiveSnapshot(GetLatestSnapshot()); + state = GetSubscriptionRelState(MyLogicalRepWorker->subid, + relid, &statelsn, true); + PopActiveSnapshot(); + + if (state == SUBREL_STATE_UNKNOWN) + return false; + + if (state == expected_state) + return true; + + /* Check if the sync worker is still running and bail if not. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); /* Check if the opposite worker is still running and bail if not. */ worker = logicalrep_worker_find(MyLogicalRepWorker->subid, am_tablesync_worker() ? InvalidOid : relid, false); + LWLockRelease(LogicalRepWorkerLock); if (!worker) - { - LWLockRelease(LogicalRepWorkerLock); return false; - } - /* - * If I'm the synchronization worker, look at my own state. Otherwise - * look at the state of the synchronization worker we found above. - */ - if (am_tablesync_worker()) - worker = MyLogicalRepWorker; + rc = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); - Assert(worker->relid == relid); - state = worker->relstate; + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); - LWLockRelease(LogicalRepWorkerLock); + ResetLatch(&MyProc->procLatch); + } - if (state == SUBREL_STATE_UNKNOWN) + return false; +} + +/* + * Wait until the the apply worker changes the state of our synchronization + * worker to the expected one. + * + * Used when transitioning from SYNCWAIT state to CATCHUP. + * + * Returns false if the apply worker has disappeared or table state has been + * reset. + */ +static bool +wait_for_worker_state_change(char expected_state) +{ + int rc; + + for (;;) + { + LogicalRepWorker *worker; + + CHECK_FOR_INTERRUPTS(); + + /* Bail if he apply has died. */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + worker = logicalrep_worker_find(MyLogicalRepWorker->subid, + InvalidOid, false); + LWLockRelease(LogicalRepWorkerLock); + if (!worker) return false; - if (state != origstate) + if (MyLogicalRepWorker->relstate == expected_state) return true; rc = WaitLatch(&MyProc->procLatch, @@ -222,10 +263,9 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) * Handle table synchronization cooperation from the synchronization * worker. * - * If the sync worker is in catch up mode and reached the predetermined - * synchronization point in the WAL stream, mark the table as READY and - * finish. If it caught up too far, set to SYNCDONE and finish. Things will - * then proceed in the "sync in front" scenario. + * If the sync worker is in CATCHUP state and reached (or passed) the + * predetermined synchronization point in the WAL stream, mark the table as + * SYNCDONE and finish. */ static void process_syncing_tables_for_sync(XLogRecPtr current_lsn) @@ -239,10 +279,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) { TimeLineID tli; - MyLogicalRepWorker->relstate = - (current_lsn == MyLogicalRepWorker->relstate_lsn) - ? SUBREL_STATE_READY - : SUBREL_STATE_SYNCDONE; + MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; MyLogicalRepWorker->relstate_lsn = current_lsn; SpinLockRelease(&MyLogicalRepWorker->relmutex); @@ -274,17 +311,11 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) * at least wal_retrieve_retry_interval. * * For tables that are being synchronized already, check if sync workers - * either need action from the apply worker or have finished. - * - * The usual scenario is that the apply got ahead of the sync while the sync - * ran, and then the action needed by apply is to mark a table for CATCHUP and - * wait for the catchup to happen. In the less common case that sync worker - * got in front of the apply worker, the table is marked as SYNCDONE but not - * ready yet, as it needs to be tracked until apply reaches the same position - * to which it was synced. + * either need action from the apply worker or have finished. This is the + * SYNCWAIT to CATCHUP transition. * - * If the synchronization position is reached, then the table can be marked as - * READY and is no longer tracked. + * If the synchronization position is reached (SYNCDONE), then the table can + * be marked as READY and is no longer tracked. */ static void process_syncing_tables_for_apply(XLogRecPtr current_lsn) @@ -358,7 +389,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) last_start_times = NULL; } - /* Process all tables that are being synchronized. */ + /* + * Process all tables that are being synchronized. + */ foreach(lc, table_states) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); @@ -416,45 +449,29 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) if (syncworker && rstate->state == SUBREL_STATE_SYNCWAIT) { /* - * There are three possible synchronization situations here. - * - * a) Apply is in front of the table sync: We tell the table - * sync to CATCHUP. - * - * b) Apply is behind the table sync: We tell the table sync - * to mark the table as SYNCDONE and finish. - * - * c) Apply and table sync are at the same position: We tell - * table sync to mark the table as READY and finish. - * - * In any case we'll need to wait for table sync to change the - * state in catalog and only then continue ourselves. + * Tell sync worker it can catchup now. We'll wait for it so + * it does not get lost. */ - if (current_lsn > rstate->lsn) - { - rstate->state = SUBREL_STATE_CATCHUP; - rstate->lsn = current_lsn; - } - else if (current_lsn == rstate->lsn) - { - rstate->state = SUBREL_STATE_READY; - rstate->lsn = current_lsn; - } - else - rstate->state = SUBREL_STATE_SYNCDONE; - SpinLockAcquire(&syncworker->relmutex); - syncworker->relstate = rstate->state; - syncworker->relstate_lsn = rstate->lsn; + syncworker->relstate = SUBREL_STATE_CATCHUP; + syncworker->relstate_lsn = + Max(syncworker->relstate_lsn, current_lsn); SpinLockRelease(&syncworker->relmutex); /* Signal the sync worker, as it may be waiting for us. */ logicalrep_worker_wakeup_ptr(syncworker); /* - * Enter busy loop and wait for synchronization status change. + * Enter busy loop and wait for synchronization worker to + * reach expected state (or die trying). */ - wait_for_sync_status_change(rstate->relid, rstate->state); + if (!started_tx) + { + StartTransactionCommand(); + started_tx = true; + } + wait_for_relation_state_change(rstate->relid, + SUBREL_STATE_SYNCDONE); } /* @@ -889,19 +906,28 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relstate_lsn = *origin_startpos; SpinLockRelease(&MyLogicalRepWorker->relmutex); + /* Wait for main apply worker to tell us to catchup. */ + wait_for_worker_state_change(SUBREL_STATE_CATCHUP); + /* - * Wait for main apply worker to either tell us to catchup or - * that we are done. + * There are now two possible states here: + * a) Sync is behind the apply. If that's the case we need to + * catch up with it by consuming the logical replication + * stream up to the relstate_lsn. For that, we exit this + * function and continue in ApplyWorkerMain(). + * b) Sync is caught up with the apply. So it can just set + * the state to SYNCDONE and finish. */ - wait_for_sync_status_change(MyLogicalRepWorker->relid, - MyLogicalRepWorker->relstate); - if (MyLogicalRepWorker->relstate != SUBREL_STATE_CATCHUP) + if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn) { - /* Update the new state. */ + /* + * Update the new state in catalog. No need to bother + * with the shmem state as we are exiting for good. + */ SetSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, - MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn); + SUBREL_STATE_SYNCDONE, + *origin_startpos); finish_sync_worker(); } break; |