diff options
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 109 |
1 files changed, 105 insertions, 4 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 1659964571c..d8b12d94bc3 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -100,6 +100,7 @@ #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" +#include "commands/sequence.h" #include "miscadmin.h" #include "parser/parse_relation.h" #include "pgstat.h" @@ -1000,6 +1001,95 @@ copy_table(Relation rel) } /* + * Fetch sequence data (current state) from the remote node. + */ +static void +fetch_sequence_data(char *nspname, char *relname, + int64 *last_value, int64 *log_cnt, bool *is_called) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[3] = {INT8OID, INT8OID, BOOLOID}; + + initStringInfo(&cmd); + appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n" + " FROM %s", quote_qualified_identifier(nspname, relname)); + + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 3, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not receive list of replicated tables from the publisher: %s", + res->err))); + + /* Process the sequence. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + bool isnull; + + *last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + *log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + *is_called = DatumGetBool(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); +} + +/* + * Copy existing data of a sequence from publisher. + * + * Caller is responsible for locking the local relation. + */ +static void +copy_sequence(Relation rel) +{ + LogicalRepRelMapEntry *relmapentry; + LogicalRepRelation lrel; + List *qual = NIL; + StringInfoData cmd; + int64 last_value = 0, + log_cnt = 0; + bool is_called = 0; + + /* Get the publisher relation info. */ + fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), + RelationGetRelationName(rel), &lrel, &qual); + + /* sequences don't have row filters */ + Assert(!qual); + + /* Put the relation into relmap. */ + logicalrep_relmap_update(&lrel); + + /* Map the publisher relation to local one. */ + relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); + Assert(rel == relmapentry->localrel); + + /* Start copy on the publisher. */ + initStringInfo(&cmd); + + Assert(lrel.relkind == RELKIND_SEQUENCE); + + fetch_sequence_data(lrel.nspname, lrel.relname, &last_value, &log_cnt, &is_called); + + /* tablesync sets the sequences in non-transactional way */ + SetSequence(RelationGetRelid(rel), false, last_value, log_cnt, is_called); + + logicalrep_rel_close(relmapentry, NoLock); +} + +/* * Determine the tablesync slot name. * * The name must not exceed NAMEDATALEN - 1 because of remote node constraints @@ -1260,10 +1350,21 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) originname))); } - /* Now do the initial data copy */ - PushActiveSnapshot(GetTransactionSnapshot()); - copy_table(rel); - PopActiveSnapshot(); + /* Do the right action depending on the relation kind. */ + if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE) + { + /* Now do the initial sequence copy */ + PushActiveSnapshot(GetTransactionSnapshot()); + copy_sequence(rel); + PopActiveSnapshot(); + } + else + { + /* Now do the initial data copy */ + PushActiveSnapshot(GetTransactionSnapshot()); + copy_table(rel); + PopActiveSnapshot(); + } res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL); if (res->status != WALRCV_OK_COMMAND) |