diff options
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 94 |
1 files changed, 6 insertions, 88 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 9d33630464c..fe5accca576 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -15,7 +15,6 @@ #include "access/tupconvert.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" -#include "catalog/pg_publication_namespace.h" #include "catalog/pg_publication_rel.h" #include "commands/defrem.h" #include "executor/executor.h" @@ -55,10 +54,6 @@ static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message); -static void pgoutput_sequence(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn, XLogRecPtr sequence_lsn, - Relation relation, bool transactional, - int64 last_value, int64 log_cnt, bool is_called); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, @@ -260,7 +255,6 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->change_cb = pgoutput_change; cb->truncate_cb = pgoutput_truncate; cb->message_cb = pgoutput_message; - cb->sequence_cb = pgoutput_sequence; cb->commit_cb = pgoutput_commit_txn; cb->begin_prepare_cb = pgoutput_begin_prepare_txn; @@ -277,7 +271,6 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_commit_cb = pgoutput_stream_commit; cb->stream_change_cb = pgoutput_change; cb->stream_message_cb = pgoutput_message; - cb->stream_sequence_cb = pgoutput_sequence; cb->stream_truncate_cb = pgoutput_truncate; /* transaction streaming - two-phase commit */ cb->stream_prepare_cb = pgoutput_stream_prepare_txn; @@ -291,7 +284,6 @@ parse_output_parameters(List *options, PGOutputData *data) bool publication_names_given = false; bool binary_option_given = false; bool messages_option_given = false; - bool sequences_option_given = false; bool streaming_given = false; bool two_phase_option_given = false; @@ -299,7 +291,6 @@ parse_output_parameters(List *options, PGOutputData *data) data->streaming = false; data->messages = false; data->two_phase = false; - data->sequences = true; foreach(lc, options) { @@ -368,16 +359,6 @@ parse_output_parameters(List *options, PGOutputData *data) data->messages = defGetBoolean(defel); } - else if (strcmp(defel->defname, "sequences") == 0) - { - if (sequences_option_given) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); - sequences_option_given = true; - - data->sequences = defGetBoolean(defel); - } else if (strcmp(defel->defname, "streaming") == 0) { if (streaming_given) @@ -1709,64 +1690,6 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } -static void -pgoutput_sequence(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn, XLogRecPtr sequence_lsn, - Relation relation, bool transactional, - int64 last_value, int64 log_cnt, bool is_called) -{ - PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; - TransactionId xid = InvalidTransactionId; - RelationSyncEntry *relentry; - - if (!data->sequences) - return; - - if (!is_publishable_relation(relation)) - return; - - /* - * Remember the xid for the message in streaming mode. See - * pgoutput_change. - */ - if (in_streaming) - xid = txn->xid; - - relentry = get_rel_sync_entry(data, relation); - - /* - * First check the sequence filter. - * - * We handle just REORDER_BUFFER_CHANGE_SEQUENCE here. - */ - if (!relentry->pubactions.pubsequence) - return; - - /* - * Output BEGIN if we haven't yet. Avoid for non-transactional - * sequence changes. - */ - if (transactional) - { - PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; - - /* Send BEGIN if we haven't yet */ - if (txndata && !txndata->sent_begin_txn) - pgoutput_send_begin(ctx, txn); - } - - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_sequence(ctx->out, - relation, - xid, - sequence_lsn, - transactional, - last_value, - log_cnt, - is_called); - OutputPluginWrite(ctx, true); -} - /* * Currently we always forward. */ @@ -2052,8 +1975,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->schema_sent = false; entry->streamed_txns = NIL; entry->pubactions.pubinsert = entry->pubactions.pubupdate = - entry->pubactions.pubdelete = entry->pubactions.pubtruncate = - entry->pubactions.pubsequence = false; + entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; entry->new_slot = NULL; entry->old_slot = NULL; memset(entry->exprstate, 0, sizeof(entry->exprstate)); @@ -2068,18 +1990,18 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) { Oid schemaId = get_rel_namespace(relid); List *pubids = GetRelationPublications(relid); - char relkind = get_rel_relkind(relid); - char objectType = pub_get_object_type_for_relkind(relkind); + /* * We don't acquire a lock on the namespace system table as we build * the cache entry using a historic snapshot and all the later changes * are absorbed while decoding WAL. */ - List *schemaPubids = GetSchemaPublications(schemaId, objectType); + List *schemaPubids = GetSchemaPublications(schemaId); ListCell *lc; Oid publish_as_relid = relid; int publish_ancestor_level = 0; bool am_partition = get_rel_relispartition(relid); + char relkind = get_rel_relkind(relid); List *rel_publications = NIL; /* Reload publications if needed before use. */ @@ -2111,7 +2033,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->pubactions.pubupdate = false; entry->pubactions.pubdelete = false; entry->pubactions.pubtruncate = false; - entry->pubactions.pubsequence = false; /* * Tuple slots cleanups. (Will be rebuilt later if needed). @@ -2159,11 +2080,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) /* * If this is a FOR ALL TABLES publication, pick the partition root - * and set the ancestor level accordingly. If this is a FOR ALL - * SEQUENCES publication, we publish it too but we don't need to - * pick the partition root etc. + * and set the ancestor level accordingly. */ - if (pub->alltables || pub->allsequences) + if (pub->alltables) { publish = true; if (pub->pubviaroot && am_partition) @@ -2227,7 +2146,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; - entry->pubactions.pubsequence |= pub->pubactions.pubsequence; /* * We want to publish the changes as the top-most ancestor |