aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/tablesync.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r--src/backend/replication/logical/tablesync.c109
1 files changed, 105 insertions, 4 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 1659964571c..d8b12d94bc3 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -100,6 +100,7 @@
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
+#include "commands/sequence.h"
#include "miscadmin.h"
#include "parser/parse_relation.h"
#include "pgstat.h"
@@ -1000,6 +1001,95 @@ copy_table(Relation rel)
}
/*
+ * Fetch sequence data (current state) from the remote node.
+ */
+static void
+fetch_sequence_data(char *nspname, char *relname,
+ int64 *last_value, int64 *log_cnt, bool *is_called)
+{
+ WalRcvExecResult *res;
+ StringInfoData cmd;
+ TupleTableSlot *slot;
+ Oid tableRow[3] = {INT8OID, INT8OID, BOOLOID};
+
+ initStringInfo(&cmd);
+ appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n"
+ " FROM %s", quote_qualified_identifier(nspname, relname));
+
+ res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 3, tableRow);
+ pfree(cmd.data);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not receive list of replicated tables from the publisher: %s",
+ res->err)));
+
+ /* Process the sequence. */
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ bool isnull;
+
+ *last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+
+ *log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull));
+ Assert(!isnull);
+
+ *is_called = DatumGetBool(slot_getattr(slot, 3, &isnull));
+ Assert(!isnull);
+
+ ExecClearTuple(slot);
+ }
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+}
+
+/*
+ * Copy existing data of a sequence from publisher.
+ *
+ * Caller is responsible for locking the local relation.
+ */
+static void
+copy_sequence(Relation rel)
+{
+ LogicalRepRelMapEntry *relmapentry;
+ LogicalRepRelation lrel;
+ List *qual = NIL;
+ StringInfoData cmd;
+ int64 last_value = 0,
+ log_cnt = 0;
+ bool is_called = 0;
+
+ /* Get the publisher relation info. */
+ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
+ RelationGetRelationName(rel), &lrel, &qual);
+
+ /* sequences don't have row filters */
+ Assert(!qual);
+
+ /* Put the relation into relmap. */
+ logicalrep_relmap_update(&lrel);
+
+ /* Map the publisher relation to local one. */
+ relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
+ Assert(rel == relmapentry->localrel);
+
+ /* Start copy on the publisher. */
+ initStringInfo(&cmd);
+
+ Assert(lrel.relkind == RELKIND_SEQUENCE);
+
+ fetch_sequence_data(lrel.nspname, lrel.relname, &last_value, &log_cnt, &is_called);
+
+ /* tablesync sets the sequences in non-transactional way */
+ SetSequence(RelationGetRelid(rel), false, last_value, log_cnt, is_called);
+
+ logicalrep_rel_close(relmapentry, NoLock);
+}
+
+/*
* Determine the tablesync slot name.
*
* The name must not exceed NAMEDATALEN - 1 because of remote node constraints
@@ -1260,10 +1350,21 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
originname)));
}
- /* Now do the initial data copy */
- PushActiveSnapshot(GetTransactionSnapshot());
- copy_table(rel);
- PopActiveSnapshot();
+ /* Do the right action depending on the relation kind. */
+ if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE)
+ {
+ /* Now do the initial sequence copy */
+ PushActiveSnapshot(GetTransactionSnapshot());
+ copy_sequence(rel);
+ PopActiveSnapshot();
+ }
+ else
+ {
+ /* Now do the initial data copy */
+ PushActiveSnapshot(GetTransactionSnapshot());
+ copy_table(rel);
+ PopActiveSnapshot();
+ }
res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)