aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/pgoutput/pgoutput.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c94
1 files changed, 6 insertions, 88 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9d33630464c..fe5accca576 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -15,7 +15,6 @@
#include "access/tupconvert.h"
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
-#include "catalog/pg_publication_namespace.h"
#include "catalog/pg_publication_rel.h"
#include "commands/defrem.h"
#include "executor/executor.h"
@@ -55,10 +54,6 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
-static void pgoutput_sequence(LogicalDecodingContext *ctx,
- ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
- Relation relation, bool transactional,
- int64 last_value, int64 log_cnt, bool is_called);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
@@ -260,7 +255,6 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->change_cb = pgoutput_change;
cb->truncate_cb = pgoutput_truncate;
cb->message_cb = pgoutput_message;
- cb->sequence_cb = pgoutput_sequence;
cb->commit_cb = pgoutput_commit_txn;
cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
@@ -277,7 +271,6 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->stream_commit_cb = pgoutput_stream_commit;
cb->stream_change_cb = pgoutput_change;
cb->stream_message_cb = pgoutput_message;
- cb->stream_sequence_cb = pgoutput_sequence;
cb->stream_truncate_cb = pgoutput_truncate;
/* transaction streaming - two-phase commit */
cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
@@ -291,7 +284,6 @@ parse_output_parameters(List *options, PGOutputData *data)
bool publication_names_given = false;
bool binary_option_given = false;
bool messages_option_given = false;
- bool sequences_option_given = false;
bool streaming_given = false;
bool two_phase_option_given = false;
@@ -299,7 +291,6 @@ parse_output_parameters(List *options, PGOutputData *data)
data->streaming = false;
data->messages = false;
data->two_phase = false;
- data->sequences = true;
foreach(lc, options)
{
@@ -368,16 +359,6 @@ parse_output_parameters(List *options, PGOutputData *data)
data->messages = defGetBoolean(defel);
}
- else if (strcmp(defel->defname, "sequences") == 0)
- {
- if (sequences_option_given)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
- sequences_option_given = true;
-
- data->sequences = defGetBoolean(defel);
- }
else if (strcmp(defel->defname, "streaming") == 0)
{
if (streaming_given)
@@ -1709,64 +1690,6 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginWrite(ctx, true);
}
-static void
-pgoutput_sequence(LogicalDecodingContext *ctx,
- ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
- Relation relation, bool transactional,
- int64 last_value, int64 log_cnt, bool is_called)
-{
- PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
- TransactionId xid = InvalidTransactionId;
- RelationSyncEntry *relentry;
-
- if (!data->sequences)
- return;
-
- if (!is_publishable_relation(relation))
- return;
-
- /*
- * Remember the xid for the message in streaming mode. See
- * pgoutput_change.
- */
- if (in_streaming)
- xid = txn->xid;
-
- relentry = get_rel_sync_entry(data, relation);
-
- /*
- * First check the sequence filter.
- *
- * We handle just REORDER_BUFFER_CHANGE_SEQUENCE here.
- */
- if (!relentry->pubactions.pubsequence)
- return;
-
- /*
- * Output BEGIN if we haven't yet. Avoid for non-transactional
- * sequence changes.
- */
- if (transactional)
- {
- PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
-
- /* Send BEGIN if we haven't yet */
- if (txndata && !txndata->sent_begin_txn)
- pgoutput_send_begin(ctx, txn);
- }
-
- OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_sequence(ctx->out,
- relation,
- xid,
- sequence_lsn,
- transactional,
- last_value,
- log_cnt,
- is_called);
- OutputPluginWrite(ctx, true);
-}
-
/*
* Currently we always forward.
*/
@@ -2052,8 +1975,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->schema_sent = false;
entry->streamed_txns = NIL;
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
- entry->pubactions.pubdelete = entry->pubactions.pubtruncate =
- entry->pubactions.pubsequence = false;
+ entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
entry->new_slot = NULL;
entry->old_slot = NULL;
memset(entry->exprstate, 0, sizeof(entry->exprstate));
@@ -2068,18 +1990,18 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
{
Oid schemaId = get_rel_namespace(relid);
List *pubids = GetRelationPublications(relid);
- char relkind = get_rel_relkind(relid);
- char objectType = pub_get_object_type_for_relkind(relkind);
+
/*
* We don't acquire a lock on the namespace system table as we build
* the cache entry using a historic snapshot and all the later changes
* are absorbed while decoding WAL.
*/
- List *schemaPubids = GetSchemaPublications(schemaId, objectType);
+ List *schemaPubids = GetSchemaPublications(schemaId);
ListCell *lc;
Oid publish_as_relid = relid;
int publish_ancestor_level = 0;
bool am_partition = get_rel_relispartition(relid);
+ char relkind = get_rel_relkind(relid);
List *rel_publications = NIL;
/* Reload publications if needed before use. */
@@ -2111,7 +2033,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->pubactions.pubupdate = false;
entry->pubactions.pubdelete = false;
entry->pubactions.pubtruncate = false;
- entry->pubactions.pubsequence = false;
/*
* Tuple slots cleanups. (Will be rebuilt later if needed).
@@ -2159,11 +2080,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
/*
* If this is a FOR ALL TABLES publication, pick the partition root
- * and set the ancestor level accordingly. If this is a FOR ALL
- * SEQUENCES publication, we publish it too but we don't need to
- * pick the partition root etc.
+ * and set the ancestor level accordingly.
*/
- if (pub->alltables || pub->allsequences)
+ if (pub->alltables)
{
publish = true;
if (pub->pubviaroot && am_partition)
@@ -2227,7 +2146,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
- entry->pubactions.pubsequence |= pub->pubactions.pubsequence;
/*
* We want to publish the changes as the top-most ancestor