aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/tablesync.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r--src/backend/replication/logical/tablesync.c109
1 files changed, 4 insertions, 105 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index b2cb31eaad7..49ceec3bdc8 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -100,7 +100,6 @@
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
-#include "commands/sequence.h"
#include "miscadmin.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
@@ -1138,95 +1137,6 @@ copy_table(Relation rel)
}
/*
- * Fetch sequence data (current state) from the remote node.
- */
-static void
-fetch_sequence_data(char *nspname, char *relname,
- int64 *last_value, int64 *log_cnt, bool *is_called)
-{
- WalRcvExecResult *res;
- StringInfoData cmd;
- TupleTableSlot *slot;
- Oid tableRow[3] = {INT8OID, INT8OID, BOOLOID};
-
- initStringInfo(&cmd);
- appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n"
- " FROM %s", quote_qualified_identifier(nspname, relname));
-
- res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 3, tableRow);
- pfree(cmd.data);
-
- if (res->status != WALRCV_OK_TUPLES)
- ereport(ERROR,
- (errmsg("could not receive list of replicated tables from the publisher: %s",
- res->err)));
-
- /* Process the sequence. */
- slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
- while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
- {
- bool isnull;
-
- *last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull));
- Assert(!isnull);
-
- *log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull));
- Assert(!isnull);
-
- *is_called = DatumGetBool(slot_getattr(slot, 3, &isnull));
- Assert(!isnull);
-
- ExecClearTuple(slot);
- }
- ExecDropSingleTupleTableSlot(slot);
-
- walrcv_clear_result(res);
-}
-
-/*
- * Copy existing data of a sequence from publisher.
- *
- * Caller is responsible for locking the local relation.
- */
-static void
-copy_sequence(Relation rel)
-{
- LogicalRepRelMapEntry *relmapentry;
- LogicalRepRelation lrel;
- List *qual = NIL;
- StringInfoData cmd;
- int64 last_value = 0,
- log_cnt = 0;
- bool is_called = 0;
-
- /* Get the publisher relation info. */
- fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
- RelationGetRelationName(rel), &lrel, &qual);
-
- /* sequences don't have row filters */
- Assert(!qual);
-
- /* Put the relation into relmap. */
- logicalrep_relmap_update(&lrel);
-
- /* Map the publisher relation to local one. */
- relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
- Assert(rel == relmapentry->localrel);
-
- /* Start copy on the publisher. */
- initStringInfo(&cmd);
-
- Assert(lrel.relkind == RELKIND_SEQUENCE);
-
- fetch_sequence_data(lrel.nspname, lrel.relname, &last_value, &log_cnt, &is_called);
-
- /* tablesync sets the sequences in non-transactional way */
- SetSequence(RelationGetRelid(rel), false, last_value, log_cnt, is_called);
-
- logicalrep_rel_close(relmapentry, NoLock);
-}
-
-/*
* Determine the tablesync slot name.
*
* The name must not exceed NAMEDATALEN - 1 because of remote node constraints
@@ -1487,21 +1397,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
originname)));
}
- /* Do the right action depending on the relation kind. */
- if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE)
- {
- /* Now do the initial sequence copy */
- PushActiveSnapshot(GetTransactionSnapshot());
- copy_sequence(rel);
- PopActiveSnapshot();
- }
- else
- {
- /* Now do the initial data copy */
- PushActiveSnapshot(GetTransactionSnapshot());
- copy_table(rel);
- PopActiveSnapshot();
- }
+ /* Now do the initial data copy */
+ PushActiveSnapshot(GetTransactionSnapshot());
+ copy_table(rel);
+ PopActiveSnapshot();
res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)