diff options
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 109 |
1 files changed, 4 insertions, 105 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index b2cb31eaad7..49ceec3bdc8 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -100,7 +100,6 @@ #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" -#include "commands/sequence.h" #include "miscadmin.h" #include "parser/parse_relation.h" #include "pgstat.h" @@ -1138,95 +1137,6 @@ copy_table(Relation rel) } /* - * Fetch sequence data (current state) from the remote node. - */ -static void -fetch_sequence_data(char *nspname, char *relname, - int64 *last_value, int64 *log_cnt, bool *is_called) -{ - WalRcvExecResult *res; - StringInfoData cmd; - TupleTableSlot *slot; - Oid tableRow[3] = {INT8OID, INT8OID, BOOLOID}; - - initStringInfo(&cmd); - appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n" - " FROM %s", quote_qualified_identifier(nspname, relname)); - - res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 3, tableRow); - pfree(cmd.data); - - if (res->status != WALRCV_OK_TUPLES) - ereport(ERROR, - (errmsg("could not receive list of replicated tables from the publisher: %s", - res->err))); - - /* Process the sequence. */ - slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); - while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) - { - bool isnull; - - *last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull)); - Assert(!isnull); - - *log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull)); - Assert(!isnull); - - *is_called = DatumGetBool(slot_getattr(slot, 3, &isnull)); - Assert(!isnull); - - ExecClearTuple(slot); - } - ExecDropSingleTupleTableSlot(slot); - - walrcv_clear_result(res); -} - -/* - * Copy existing data of a sequence from publisher. - * - * Caller is responsible for locking the local relation. - */ -static void -copy_sequence(Relation rel) -{ - LogicalRepRelMapEntry *relmapentry; - LogicalRepRelation lrel; - List *qual = NIL; - StringInfoData cmd; - int64 last_value = 0, - log_cnt = 0; - bool is_called = 0; - - /* Get the publisher relation info. */ - fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), - RelationGetRelationName(rel), &lrel, &qual); - - /* sequences don't have row filters */ - Assert(!qual); - - /* Put the relation into relmap. */ - logicalrep_relmap_update(&lrel); - - /* Map the publisher relation to local one. */ - relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); - Assert(rel == relmapentry->localrel); - - /* Start copy on the publisher. */ - initStringInfo(&cmd); - - Assert(lrel.relkind == RELKIND_SEQUENCE); - - fetch_sequence_data(lrel.nspname, lrel.relname, &last_value, &log_cnt, &is_called); - - /* tablesync sets the sequences in non-transactional way */ - SetSequence(RelationGetRelid(rel), false, last_value, log_cnt, is_called); - - logicalrep_rel_close(relmapentry, NoLock); -} - -/* * Determine the tablesync slot name. * * The name must not exceed NAMEDATALEN - 1 because of remote node constraints @@ -1487,21 +1397,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) originname))); } - /* Do the right action depending on the relation kind. */ - if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE) - { - /* Now do the initial sequence copy */ - PushActiveSnapshot(GetTransactionSnapshot()); - copy_sequence(rel); - PopActiveSnapshot(); - } - else - { - /* Now do the initial data copy */ - PushActiveSnapshot(GetTransactionSnapshot()); - copy_table(rel); - PopActiveSnapshot(); - } + /* Now do the initial data copy */ + PushActiveSnapshot(GetTransactionSnapshot()); + copy_table(rel); + PopActiveSnapshot(); res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL); if (res->status != WALRCV_OK_COMMAND) |