diff options
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r-- | src/backend/replication/logical/proto.c | 52 | ||||
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 109 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 56 |
3 files changed, 213 insertions, 4 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index c9b0eeefd7e..3dbe85d61a2 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -649,6 +649,56 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, } /* + * Write SEQUENCE to stream + */ +void +logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid, + XLogRecPtr lsn, bool transactional, + int64 last_value, int64 log_cnt, bool is_called) +{ + uint8 flags = 0; + char *relname; + + pq_sendbyte(out, LOGICAL_REP_MSG_SEQUENCE); + + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + + pq_sendint8(out, flags); + pq_sendint64(out, lsn); + + logicalrep_write_namespace(out, RelationGetNamespace(rel)); + relname = RelationGetRelationName(rel); + pq_sendstring(out, relname); + + pq_sendint8(out, transactional); + pq_sendint64(out, last_value); + pq_sendint64(out, log_cnt); + pq_sendint8(out, is_called); +} + +/* + * Read SEQUENCE from the stream. + */ +void +logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata) +{ + /* XXX skipping flags and lsn */ + pq_getmsgint(in, 1); + pq_getmsgint64(in); + + /* Read relation name from stream */ + seqdata->nspname = pstrdup(logicalrep_read_namespace(in)); + seqdata->seqname = pstrdup(pq_getmsgstring(in)); + + seqdata->transactional = pq_getmsgint(in, 1); + seqdata->last_value = pq_getmsgint64(in); + seqdata->log_cnt = pq_getmsgint64(in); + seqdata->is_called = pq_getmsgint(in, 1); +} + +/* * Write relation description to the output stream. */ void @@ -1203,6 +1253,8 @@ logicalrep_message_type(LogicalRepMsgType action) return "STREAM ABORT"; case LOGICAL_REP_MSG_STREAM_PREPARE: return "STREAM PREPARE"; + case LOGICAL_REP_MSG_SEQUENCE: + return "SEQUENCE"; } elog(ERROR, "invalid logical replication message type \"%c\"", action); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 1659964571c..d8b12d94bc3 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -100,6 +100,7 @@ #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" +#include "commands/sequence.h" #include "miscadmin.h" #include "parser/parse_relation.h" #include "pgstat.h" @@ -1000,6 +1001,95 @@ copy_table(Relation rel) } /* + * Fetch sequence data (current state) from the remote node. + */ +static void +fetch_sequence_data(char *nspname, char *relname, + int64 *last_value, int64 *log_cnt, bool *is_called) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[3] = {INT8OID, INT8OID, BOOLOID}; + + initStringInfo(&cmd); + appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n" + " FROM %s", quote_qualified_identifier(nspname, relname)); + + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 3, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not receive list of replicated tables from the publisher: %s", + res->err))); + + /* Process the sequence. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + bool isnull; + + *last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + *log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + *is_called = DatumGetBool(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); +} + +/* + * Copy existing data of a sequence from publisher. + * + * Caller is responsible for locking the local relation. + */ +static void +copy_sequence(Relation rel) +{ + LogicalRepRelMapEntry *relmapentry; + LogicalRepRelation lrel; + List *qual = NIL; + StringInfoData cmd; + int64 last_value = 0, + log_cnt = 0; + bool is_called = 0; + + /* Get the publisher relation info. */ + fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), + RelationGetRelationName(rel), &lrel, &qual); + + /* sequences don't have row filters */ + Assert(!qual); + + /* Put the relation into relmap. */ + logicalrep_relmap_update(&lrel); + + /* Map the publisher relation to local one. */ + relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); + Assert(rel == relmapentry->localrel); + + /* Start copy on the publisher. */ + initStringInfo(&cmd); + + Assert(lrel.relkind == RELKIND_SEQUENCE); + + fetch_sequence_data(lrel.nspname, lrel.relname, &last_value, &log_cnt, &is_called); + + /* tablesync sets the sequences in non-transactional way */ + SetSequence(RelationGetRelid(rel), false, last_value, log_cnt, is_called); + + logicalrep_rel_close(relmapentry, NoLock); +} + +/* * Determine the tablesync slot name. * * The name must not exceed NAMEDATALEN - 1 because of remote node constraints @@ -1260,10 +1350,21 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) originname))); } - /* Now do the initial data copy */ - PushActiveSnapshot(GetTransactionSnapshot()); - copy_table(rel); - PopActiveSnapshot(); + /* Do the right action depending on the relation kind. */ + if (get_rel_relkind(RelationGetRelid(rel)) == RELKIND_SEQUENCE) + { + /* Now do the initial sequence copy */ + PushActiveSnapshot(GetTransactionSnapshot()); + copy_sequence(rel); + PopActiveSnapshot(); + } + else + { + /* Now do the initial data copy */ + PushActiveSnapshot(GetTransactionSnapshot()); + copy_table(rel); + PopActiveSnapshot(); + } res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL); if (res->status != WALRCV_OK_COMMAND) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 82dcffc2db8..f3868b3e1f8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -143,6 +143,7 @@ #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_tablespace.h" +#include "commands/sequence.h" #include "commands/tablecmds.h" #include "commands/tablespace.h" #include "commands/trigger.h" @@ -1144,6 +1145,57 @@ apply_handle_origin(StringInfo s) } /* + * Handle SEQUENCE message. + */ +static void +apply_handle_sequence(StringInfo s) +{ + LogicalRepSequence seq; + Oid relid; + + if (handle_streamed_transaction(LOGICAL_REP_MSG_SEQUENCE, s)) + return; + + logicalrep_read_sequence(s, &seq); + + /* + * Non-transactional sequence updates should not be part of a remote + * transaction. There should not be any running transaction. + */ + Assert((!seq.transactional) || in_remote_transaction); + Assert(!(!seq.transactional && in_remote_transaction)); + Assert(!(!seq.transactional && IsTransactionState())); + + /* + * Make sure we're in a transaction (needed by SetSequence). For + * non-transactional updates we're guaranteed to start a new one, + * and we'll commit it at the end. + */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + maybe_reread_subscription(); + } + + relid = RangeVarGetRelid(makeRangeVar(seq.nspname, + seq.seqname, -1), + RowExclusiveLock, false); + + /* lock the sequence in AccessExclusiveLock, as expected by SetSequence */ + LockRelationOid(relid, AccessExclusiveLock); + + /* apply the sequence change */ + SetSequence(relid, seq.transactional, seq.last_value, seq.log_cnt, seq.is_called); + + /* + * Commit the per-stream transaction (we only do this when not in + * remote transaction, i.e. for non-transactional sequence updates. + */ + if (!in_remote_transaction) + CommitTransactionCommand(); +} + +/* * Handle STREAM START message. */ static void @@ -2511,6 +2563,10 @@ apply_dispatch(StringInfo s) */ break; + case LOGICAL_REP_MSG_SEQUENCE: + apply_handle_sequence(s); + return; + case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); break; |