diff options
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 840 |
1 files changed, 840 insertions, 0 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c new file mode 100644 index 00000000000..3e16b0d5765 --- /dev/null +++ b/src/backend/replication/logical/tablesync.c @@ -0,0 +1,840 @@ +/*------------------------------------------------------------------------- + * tablesync.c + * PostgreSQL logical replication + * + * Copyright (c) 2012-2016, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/tablesync.c + * + * NOTES + * This file contains code for initial table data synchronization for + * logical replication. + * + * The initial data synchronization is done separately for each table, + * in separate apply worker that only fetches the initial snapshot data + * from the publisher and then synchronizes the position in stream with + * the main apply worker. + * + * The are several reasons for doing the synchronization this way: + * - It allows us to parallelize the initial data synchronization + * which lowers the time needed for it to happen. + * - The initial synchronization does not have to hold the xid and LSN + * for the time it takes to copy data of all tables, causing less + * bloat and lower disk consumption compared to doing the + * synchronization in single process for whole database. + * - It allows us to synchronize the tables added after the initial + * synchronization has finished. + * + * The stream position synchronization works in multiple steps. + * - Sync finishes copy and sets table state as SYNCWAIT and waits + * for state to change in a loop. + * - Apply periodically checks tables that are synchronizing for SYNCWAIT. + * When the desired state appears it will compare its position in the + * stream with the SYNCWAIT position and based on that changes the + * state to based on following rules: + * - if the apply is in front of the sync in the wal stream the new + * state is set to CATCHUP and apply loops until the sync process + * catches up to the same LSN as apply + * - if the sync is in front of the apply in the wal stream the new + * state is set to SYNCDONE + * - if both apply and sync are at the same position in the wal stream + * the state of the table is set to READY + * - If the state was set to CATCHUP sync will read the stream and + * apply changes until it catches up to the specified stream + * position and then sets state to READY and signals apply that it + * can stop waiting and exits, if the state was set to something + * else than CATCHUP the sync process will simply end. + * - If the state was set to SYNCDONE by apply, the apply will + * continue tracking the table until it reaches the SYNCDONE stream + * position at which point it sets state to READY and stops tracking. + * + * The catalog pg_subscription_rel is used to keep information about + * subscribed tables and their state and some transient state during + * data synchronization is kept in shared memory. + * + * Example flows look like this: + * - Apply is in front: + * sync:8 + * -> set SYNCWAIT + * apply:10 + * -> set CATCHUP + * -> enter wait-loop + * sync:10 + * -> set READY + * -> exit + * apply:10 + * -> exit wait-loop + * -> continue rep + * - Sync in front: + * sync:10 + * -> set SYNCWAIT + * apply:8 + * -> set SYNCDONE + * -> continue per-table filtering + * sync:10 + * -> exit + * apply:10 + * -> set READY + * -> stop per-table filtering + * -> continue rep + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" +#include "pgstat.h" + +#include "access/xact.h" + +#include "catalog/pg_subscription_rel.h" +#include "catalog/pg_type.h" + +#include "commands/copy.h" + +#include "replication/logicallauncher.h" +#include "replication/logicalrelation.h" +#include "replication/walreceiver.h" +#include "replication/worker_internal.h" + +#include "storage/ipc.h" + +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" + +static bool table_states_valid = false; + +StringInfo copybuf = NULL; + +/* + * Exit routine for synchronization worker. + */ +static void pg_attribute_noreturn() +finish_sync_worker(void) +{ + /* Commit any outstanding transaction. */ + if (IsTransactionState()) + CommitTransactionCommand(); + + /* And flush all writes. */ + XLogFlush(GetXLogWriteRecPtr()); + + /* Find the main apply worker and signal it. */ + logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); + + ereport(LOG, + (errmsg("logical replication synchronization worker finished processing"))); + + /* Stop gracefully */ + walrcv_disconnect(wrconn); + proc_exit(0); +} + +/* + * Wait until the table synchronization change. + * + * Returns false if the relation subscription state disappeared. + */ +static bool +wait_for_sync_status_change(Oid relid, char origstate) +{ + int rc; + char state = origstate; + + while (!got_SIGTERM) + { + LogicalRepWorker *worker; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + worker = logicalrep_worker_find(MyLogicalRepWorker->subid, + relid, false); + if (!worker) + { + LWLockRelease(LogicalRepWorkerLock); + return false; + } + state = worker->relstate; + LWLockRelease(LogicalRepWorkerLock); + + if (state == SUBREL_STATE_UNKNOWN) + return false; + + if (state != origstate) + return true; + + rc = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + 10000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + ResetLatch(&MyProc->procLatch); + } + + return false; +} + +/* + * Callback from syscache invalidation. + */ +void +invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) +{ + table_states_valid = false; +} + +/* + * Handle table synchronization cooperation from the synchronization + * worker. + * + * If the sync worker is in catch up mode and reached the predetermined + * synchronization point in the WAL stream, mark the table as READY and + * finish. If it caught up too far, set to SYNCDONE and finish. Things will + * then proceed in the "sync in front" scenario. + */ +static void +process_syncing_tables_for_sync(XLogRecPtr current_lsn) +{ + Assert(IsTransactionState()); + + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + + if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP && + current_lsn >= MyLogicalRepWorker->relstate_lsn) + { + TimeLineID tli; + + MyLogicalRepWorker->relstate = + (current_lsn == MyLogicalRepWorker->relstate_lsn) + ? SUBREL_STATE_READY + : SUBREL_STATE_SYNCDONE; + MyLogicalRepWorker->relstate_lsn = current_lsn; + + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + SetSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relstate_lsn); + + walrcv_endstreaming(wrconn, &tli); + finish_sync_worker(); + } + else + SpinLockRelease(&MyLogicalRepWorker->relmutex); +} + +/* + * Handle table synchronization cooperation from the apply worker. + * + * Walk over all subscription tables that are individually tracked by the + * apply process (currently, all that have state other than + * SUBREL_STATE_READY) and manage synchronization for them. + * + * If there are tables that need synchronizing and are not being synchronized + * yet, start sync workers for them (if there are free slots for sync + * workers). + * + * For tables that are being synchronized already, check if sync workers + * either need action from the apply worker or have finished. + * + * The usual scenario is that the apply got ahead of the sync while the sync + * ran, and then the action needed by apply is to mark a table for CATCHUP and + * wait for the catchup to happen. In the less common case that sync worker + * got in front of the apply worker, the table is marked as SYNCDONE but not + * ready yet, as it needs to be tracked until apply reaches the same position + * to which it was synced. + * + * If the synchronization position is reached, then the table can be marked as + * READY and is no longer tracked. + */ +static void +process_syncing_tables_for_apply(XLogRecPtr current_lsn) +{ + static List *table_states = NIL; + ListCell *lc; + + Assert(!IsTransactionState()); + + /* We need up to date sync state info for subscription tables here. */ + if (!table_states_valid) + { + MemoryContext oldctx; + List *rstates; + ListCell *lc; + SubscriptionRelState *rstate; + + /* Clean the old list. */ + list_free_deep(table_states); + table_states = NIL; + + StartTransactionCommand(); + + /* Fetch all non-ready tables. */ + rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); + + /* Allocate the tracking info in a permanent memory context. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + foreach(lc, rstates) + { + rstate = palloc(sizeof(SubscriptionRelState)); + memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); + table_states = lappend(table_states, rstate); + } + MemoryContextSwitchTo(oldctx); + + CommitTransactionCommand(); + + table_states_valid = true; + } + + /* Process all tables that are being synchronized. */ + foreach(lc, table_states) + { + SubscriptionRelState *rstate = (SubscriptionRelState *)lfirst(lc); + + if (rstate->state == SUBREL_STATE_SYNCDONE) + { + /* + * Apply has caught up to the position where the table sync + * has finished. Time to mark the table as ready so that + * apply will just continue to replicate it normally. + */ + if (current_lsn >= rstate->lsn) + { + rstate->state = SUBREL_STATE_READY; + rstate->lsn = current_lsn; + StartTransactionCommand(); + SetSubscriptionRelState(MyLogicalRepWorker->subid, + rstate->relid, rstate->state, + rstate->lsn); + CommitTransactionCommand(); + } + } + else + { + LogicalRepWorker *syncworker; + int nsyncworkers = 0; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid, + rstate->relid, false); + if (syncworker) + { + SpinLockAcquire(&syncworker->relmutex); + rstate->state = syncworker->relstate; + rstate->lsn = syncworker->relstate_lsn; + SpinLockRelease(&syncworker->relmutex); + } + else + /* + * If no sync worker for this table yet, could running sync + * workers for this subscription, while we have the lock, for + * later. + */ + nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); + LWLockRelease(LogicalRepWorkerLock); + + /* + * There is a worker synchronizing the relation and waiting for + * apply to do something. + */ + if (syncworker && rstate->state == SUBREL_STATE_SYNCWAIT) + { + /* + * There are three possible synchronization situations here. + * + * a) Apply is in front of the table sync: We tell the table + * sync to CATCHUP. + * + * b) Apply is behind the table sync: We tell the table sync + * to mark the table as SYNCDONE and finish. + + * c) Apply and table sync are at the same position: We tell + * table sync to mark the table as READY and finish. + * + * In any case we'll need to wait for table sync to change + * the state in catalog and only then continue ourselves. + */ + if (current_lsn > rstate->lsn) + { + rstate->state = SUBREL_STATE_CATCHUP; + rstate->lsn = current_lsn; + } + else if (current_lsn == rstate->lsn) + { + rstate->state = SUBREL_STATE_READY; + rstate->lsn = current_lsn; + } + else + rstate->state = SUBREL_STATE_SYNCDONE; + + SpinLockAcquire(&syncworker->relmutex); + syncworker->relstate = rstate->state; + syncworker->relstate_lsn = rstate->lsn; + SpinLockRelease(&syncworker->relmutex); + + /* Signal the sync worker, as it may be waiting for us. */ + logicalrep_worker_wakeup_ptr(syncworker); + + /* + * Enter busy loop and wait for synchronization status + * change. + */ + wait_for_sync_status_change(rstate->relid, rstate->state); + } + + /* + * If there is no sync worker registered for the table and + * there is some free sync worker slot, start new sync worker + * for the table. + */ + else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription) + { + logicalrep_worker_launch(MyLogicalRepWorker->dbid, + MySubscription->oid, + MySubscription->name, + MyLogicalRepWorker->userid, + rstate->relid); + } + } + } +} + +/* + * Process state possible change(s) of tables that are being synchronized. + */ +void +process_syncing_tables(XLogRecPtr current_lsn) +{ + if (am_tablesync_worker()) + process_syncing_tables_for_sync(current_lsn); + else + process_syncing_tables_for_apply(current_lsn); +} + +/* + * Create list of columns for COPY based on logical relation mapping. + */ +static List * +make_copy_attnamelist(LogicalRepRelMapEntry *rel) +{ + List *attnamelist = NIL; + TupleDesc desc = RelationGetDescr(rel->localrel); + int i; + + for (i = 0; i < desc->natts; i++) + { + int remoteattnum = rel->attrmap[i]; + + /* Skip dropped attributes. */ + if (desc->attrs[i]->attisdropped) + continue; + + /* Skip attributes that are missing on remote side. */ + if (remoteattnum < 0) + continue; + + attnamelist = lappend(attnamelist, + makeString(rel->remoterel.attnames[remoteattnum])); + } + + return attnamelist; +} + +/* + * Data source callback for the COPY FROM, which reads from the remote + * connection and passes the data back to our local COPY. + */ +static int +copy_read_data(void *outbuf, int minread, int maxread) +{ + int bytesread = 0; + int avail; + + /* If there are some leftover data from previous read, use them. */ + avail = copybuf->len - copybuf->cursor; + if (avail) + { + if (avail > maxread) + avail = maxread; + memcpy(outbuf, ©buf->data[copybuf->cursor], avail); + copybuf->cursor += avail; + maxread -= avail; + bytesread += avail; + } + + while (!got_SIGTERM && maxread > 0 && bytesread < minread) + { + pgsocket fd = PGINVALID_SOCKET; + int rc; + int len; + char *buf = NULL; + + for (;;) + { + /* Try read the data. */ + len = walrcv_receive(wrconn, &buf, &fd); + + CHECK_FOR_INTERRUPTS(); + + if (len == 0) + break; + else if (len < 0) + return bytesread; + else + { + /* Process the data */ + copybuf->data = buf; + copybuf->len = len; + copybuf->cursor = 0; + + avail = copybuf->len - copybuf->cursor; + if (avail > maxread) + avail = maxread; + memcpy(outbuf, ©buf->data[copybuf->cursor], avail); + outbuf = (void *) ((char *) outbuf + avail); + copybuf->cursor += avail; + maxread -= avail; + bytesread += avail; + } + + if (maxread <= 0 || bytesread >= minread) + return bytesread; + } + + /* + * Wait for more data or latch. + */ + rc = WaitLatchOrSocket(&MyProc->procLatch, + WL_SOCKET_READABLE | WL_LATCH_SET | + WL_TIMEOUT | WL_POSTMASTER_DEATH, + fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA); + + /* Emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + ResetLatch(&MyProc->procLatch); + } + + /* Check for exit condition. */ + if (got_SIGTERM) + proc_exit(0); + + return bytesread; +} + + +/* + * Get information about remote relation in similar fashion the RELATION + * message provides during replication. + */ +static void +fetch_remote_table_info(char *nspname, char *relname, + LogicalRepRelation *lrel) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[2] = {OIDOID, CHAROID}; + Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID}; + bool isnull; + int natt; + + lrel->nspname = nspname; + lrel->relname = relname; + + /* First fetch Oid and replica identity. */ + initStringInfo(&cmd); + appendStringInfo(&cmd, "SELECT c.oid, c.relreplident" + " FROM pg_catalog.pg_class c," + " pg_catalog.pg_namespace n" + " WHERE n.nspname = %s" + " AND c.relname = %s" + " AND c.relkind = 'r'", + quote_literal_cstr(nspname), + quote_literal_cstr(relname)); + res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s", + nspname, relname, res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + ereport(ERROR, + (errmsg("table \"%s.%s\" not found on publisher", + nspname, relname))); + + lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + ExecDropSingleTupleTableSlot(slot); + walrcv_clear_result(res); + + /* Now fetch columns. */ + resetStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT a.attname," + " a.atttypid," + " a.atttypmod," + " a.attnum = ANY(i.indkey)" + " FROM pg_catalog.pg_attribute a" + " LEFT JOIN pg_catalog.pg_index i" + " ON (i.indexrelid = pg_get_replica_identity_index(%u))" + " WHERE a.attnum > 0::pg_catalog.int2" + " AND NOT a.attisdropped" + " AND a.attrelid = %u" + " ORDER BY a.attnum", + lrel->remoteid, lrel->remoteid); + res = walrcv_exec(wrconn, cmd.data, 4, attrRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch table info for table \"%s.%s\": %s", + nspname, relname, res->err))); + + /* We don't know number of rows coming, so allocate enough space. */ + lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *)); + lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid)); + lrel->attkeys = NULL; + + natt = 0; + slot = MakeSingleTupleTableSlot(res->tupledesc); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + lrel->attnames[natt] = + pstrdup(TextDatumGetCString(slot_getattr(slot, 1, &isnull))); + Assert(!isnull); + lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + if (DatumGetBool(slot_getattr(slot, 4, &isnull))) + lrel->attkeys = bms_add_member(lrel->attkeys, natt); + + /* Should never happen. */ + if (++natt >= MaxTupleAttributeNumber) + elog(ERROR, "too many columns in remote table \"%s.%s\"", + nspname, relname); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + lrel->natts = natt; + + walrcv_clear_result(res); + pfree(cmd.data); +} + +/* + * Copy existing data of a table from publisher. + * + * Caller is responsible for locking the local relation. + */ +static void +copy_table(Relation rel) +{ + LogicalRepRelMapEntry *relmapentry; + LogicalRepRelation lrel; + WalRcvExecResult *res; + StringInfoData cmd; + CopyState cstate; + List *attnamelist; + + /* Get the publisher relation info. */ + fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), + RelationGetRelationName(rel), &lrel); + + /* Put the relation into relmap. */ + logicalrep_relmap_update(&lrel); + + /* Map the publisher relation to local one. */ + relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); + Assert(rel == relmapentry->localrel); + + /* Start copy on the publisher. */ + initStringInfo(&cmd); + appendStringInfo(&cmd, "COPY %s TO STDOUT", + quote_qualified_identifier(lrel.nspname, lrel.relname)); + res = walrcv_exec(wrconn, cmd.data, 0, NULL); + pfree(cmd.data); + if (res->status != WALRCV_OK_COPY_OUT) + ereport(ERROR, + (errmsg("could not start initial contents copy for table \"%s.%s\": %s", + lrel.nspname, lrel.relname, res->err))); + walrcv_clear_result(res); + + copybuf = makeStringInfo(); + + /* Create CopyState for ingestion of the data from publisher. */ + attnamelist = make_copy_attnamelist(relmapentry); + cstate = BeginCopyFrom(NULL, rel, NULL, false, copy_read_data, attnamelist, NIL); + + /* Do the copy */ + (void) CopyFrom(cstate); + + logicalrep_rel_close(relmapentry, NoLock); +} + +/* + * Start syncing the table in the sync worker. + * + * The returned slot name is palloced in current memory context. + */ +char * +LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) +{ + char *slotname; + char *err; + + /* Check the state of the table synchronization. */ + StartTransactionCommand(); + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->relstate = + GetSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + &MyLogicalRepWorker->relstate_lsn, + false); + SpinLockRelease(&MyLogicalRepWorker->relmutex); + CommitTransactionCommand(); + + /* + * To build a slot name for the sync work, we are limited to NAMEDATALEN - + * 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars + * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the + * NAMEDATALEN on the remote that matters, but this scheme will also work + * reasonably if that is different.) + */ + StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */ + slotname = psprintf("%.*s_%u_sync_%u", + NAMEDATALEN - 28, + MySubscription->slotname, + MySubscription->oid, + MyLogicalRepWorker->relid); + + wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err); + if (wrconn == NULL) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + + switch (MyLogicalRepWorker->relstate) + { + case SUBREL_STATE_INIT: + case SUBREL_STATE_DATASYNC: + { + Relation rel; + WalRcvExecResult *res; + + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; + MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + /* Update the state and make it visible to others. */ + StartTransactionCommand(); + SetSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relstate_lsn); + CommitTransactionCommand(); + + /* + * We want to do the table data sync in single + * transaction. + */ + StartTransactionCommand(); + + /* + * Use standard write lock here. It might be better to + * disallow access to table while it's being synchronized. + * But we don't want to block the main apply process from + * working and it has to open relation in RowExclusiveLock + * when remapping remote relation id to local one. + */ + rel = heap_open(MyLogicalRepWorker->relid, RowExclusiveLock); + + /* + * Create temporary slot for the sync process. + * We do this inside transaction so that we can use the + * snapshot made by the slot to get existing data. + */ + res = walrcv_exec(wrconn, + "BEGIN READ ONLY ISOLATION LEVEL " + "REPEATABLE READ", 0, NULL); + if (res->status != WALRCV_OK_COMMAND) + ereport(ERROR, + (errmsg("table copy could not start transaction on publisher"), + errdetail("The error was: %s", res->err))); + walrcv_clear_result(res); + + /* + * Create new temporary logical decoding slot. + * + * We'll use slot for data copy so make sure the snapshot + * is used for the transaction, that way the COPY will get + * data that is consistent with the lsn used by the slot + * to start decoding. + */ + walrcv_create_slot(wrconn, slotname, true, + CRS_USE_SNAPSHOT, origin_startpos); + + copy_table(rel); + + res = walrcv_exec(wrconn, "COMMIT", 0, NULL); + if (res->status != WALRCV_OK_COMMAND) + ereport(ERROR, + (errmsg("table copy could not finish transaction on publisher"), + errdetail("The error was: %s", res->err))); + walrcv_clear_result(res); + + heap_close(rel, NoLock); + + /* Make the copy visible. */ + CommandCounterIncrement(); + + /* + * We are done with the initial data synchronization, + * update the state. + */ + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT; + MyLogicalRepWorker->relstate_lsn = *origin_startpos; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + /* + * Wait for main apply worker to either tell us to + * catchup or that we are done. + */ + wait_for_sync_status_change(MyLogicalRepWorker->relid, + MyLogicalRepWorker->relstate); + if (MyLogicalRepWorker->relstate != SUBREL_STATE_CATCHUP) + { + /* Update the new state. */ + SetSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relstate_lsn); + finish_sync_worker(); + } + break; + } + case SUBREL_STATE_SYNCDONE: + case SUBREL_STATE_READY: + /* Nothing to do here but finish. */ + finish_sync_worker(); + break; + default: + elog(ERROR, "unknown relation state \"%c\"", + MyLogicalRepWorker->relstate); + } + + return slotname; +} |