aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/pgoutput/pgoutput.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c201
1 files changed, 117 insertions, 84 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 12c17359063..a6002b223df 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -84,9 +84,6 @@ static bool publications_valid;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
-static void send_relation_and_attrs(Relation relation, TransactionId xid,
- LogicalDecodingContext *ctx,
- Bitmapset *columns);
static void send_repl_origin(LogicalDecodingContext *ctx,
RepOriginId origin_id, XLogRecPtr origin_lsn,
bool send_origin);
@@ -129,6 +126,12 @@ typedef struct RelationSyncEntry
bool replicate_valid; /* overall validity flag for entry */
bool schema_sent;
+
+ /*
+ * This is set if the 'publish_generated_columns' parameter is true, and
+ * the relation contains generated columns.
+ */
+ bool include_gencols;
List *streamed_txns; /* streamed toplevel transactions with this
* schema */
@@ -213,6 +216,9 @@ static void init_rel_sync_cache(MemoryContext cachectx);
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
Relation relation);
+static void send_relation_and_attrs(Relation relation, TransactionId xid,
+ LogicalDecodingContext *ctx,
+ RelationSyncEntry *relentry);
static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
uint32 hashvalue);
@@ -731,11 +737,11 @@ maybe_send_schema(LogicalDecodingContext *ctx,
{
Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
- send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
+ send_relation_and_attrs(ancestor, xid, ctx, relentry);
RelationClose(ancestor);
}
- send_relation_and_attrs(relation, xid, ctx, relentry->columns);
+ send_relation_and_attrs(relation, xid, ctx, relentry);
if (data->in_streaming)
set_schema_sent_in_streamed_txn(relentry, topxid);
@@ -749,9 +755,11 @@ maybe_send_schema(LogicalDecodingContext *ctx,
static void
send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx,
- Bitmapset *columns)
+ RelationSyncEntry *relentry)
{
TupleDesc desc = RelationGetDescr(relation);
+ Bitmapset *columns = relentry->columns;
+ bool include_gencols = relentry->include_gencols;
int i;
/*
@@ -766,7 +774,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
{
Form_pg_attribute att = TupleDescAttr(desc, i);
- if (!logicalrep_should_publish_column(att, columns))
+ if (!logicalrep_should_publish_column(att, columns, include_gencols))
continue;
if (att->atttypid < FirstGenbkiObjectId)
@@ -778,7 +786,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
}
OutputPluginPrepareWrite(ctx, false);
- logicalrep_write_rel(ctx->out, xid, relation, columns);
+ logicalrep_write_rel(ctx->out, xid, relation, columns, include_gencols);
OutputPluginWrite(ctx, false);
}
@@ -1005,6 +1013,66 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications,
}
/*
+ * If the table contains a generated column, check for any conflicting
+ * values of 'publish_generated_columns' parameter in the publications.
+ */
+static void
+check_and_init_gencol(PGOutputData *data, List *publications,
+ RelationSyncEntry *entry)
+{
+ Relation relation = RelationIdGetRelation(entry->publish_as_relid);
+ TupleDesc desc = RelationGetDescr(relation);
+ bool gencolpresent = false;
+ bool first = true;
+
+ /* Check if there is any generated column present. */
+ for (int i = 0; i < desc->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ if (att->attgenerated)
+ {
+ gencolpresent = true;
+ break;
+ }
+ }
+
+ /* There are no generated columns to be published. */
+ if (!gencolpresent)
+ {
+ entry->include_gencols = false;
+ return;
+ }
+
+ /*
+ * There may be a conflicting value for 'publish_generated_columns'
+ * parameter in the publications.
+ */
+ foreach_ptr(Publication, pub, publications)
+ {
+ /*
+ * The column list takes precedence over the
+ * 'publish_generated_columns' parameter. Those will be checked later,
+ * see pgoutput_column_list_init.
+ */
+ if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
+ continue;
+
+ if (first)
+ {
+ entry->include_gencols = pub->pubgencols;
+ first = false;
+ }
+ else if (entry->include_gencols != pub->pubgencols)
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
+ get_namespace_name(RelationGetNamespace(relation)),
+ RelationGetRelationName(relation)));
+ }
+}
+
+/*
* Initialize the column list.
*/
static void
@@ -1014,6 +1082,10 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
ListCell *lc;
bool first = true;
Relation relation = RelationIdGetRelation(entry->publish_as_relid);
+ bool found_pub_collist = false;
+ Bitmapset *relcols = NULL;
+
+ pgoutput_ensure_entry_cxt(data, entry);
/*
* Find if there are any column lists for this relation. If there are,
@@ -1027,93 +1099,39 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
* fetch_table_list. But one can later change the publication so we still
* need to check all the given publication-table mappings and report an
* error if any publications have a different column list.
- *
- * FOR ALL TABLES and FOR TABLES IN SCHEMA imply "don't use column list".
*/
foreach(lc, publications)
{
Publication *pub = lfirst(lc);
- HeapTuple cftuple = NULL;
- Datum cfdatum = 0;
Bitmapset *cols = NULL;
+ /* Retrieve the bitmap of columns for a column list publication. */
+ found_pub_collist |= check_and_fetch_column_list(pub,
+ entry->publish_as_relid,
+ entry->entry_cxt, &cols);
+
/*
- * If the publication is FOR ALL TABLES then it is treated the same as
- * if there are no column lists (even if other publications have a
- * list).
+ * For non-column list publications — e.g. TABLE (without a column
+ * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
+ * of the table (including generated columns when
+ * 'publish_generated_columns' parameter is true).
*/
- if (!pub->alltables)
+ if (!cols)
{
- bool pub_no_list = true;
-
/*
- * Check for the presence of a column list in this publication.
- *
- * Note: If we find no pg_publication_rel row, it's a publication
- * defined for a whole schema, so it can't have a column list,
- * just like a FOR ALL TABLES publication.
+ * Cache the table columns for the first publication with no
+ * specified column list to detect publication with a different
+ * column list.
*/
- cftuple = SearchSysCache2(PUBLICATIONRELMAP,
- ObjectIdGetDatum(entry->publish_as_relid),
- ObjectIdGetDatum(pub->oid));
-
- if (HeapTupleIsValid(cftuple))
+ if (!relcols && (list_length(publications) > 1))
{
- /* Lookup the column list attribute. */
- cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
- Anum_pg_publication_rel_prattrs,
- &pub_no_list);
-
- /* Build the column list bitmap in the per-entry context. */
- if (!pub_no_list) /* when not null */
- {
- int i;
- int nliveatts = 0;
- TupleDesc desc = RelationGetDescr(relation);
- bool att_gen_present = false;
-
- pgoutput_ensure_entry_cxt(data, entry);
-
- cols = pub_collist_to_bitmapset(cols, cfdatum,
- entry->entry_cxt);
+ MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
- /* Get the number of live attributes. */
- for (i = 0; i < desc->natts; i++)
- {
- Form_pg_attribute att = TupleDescAttr(desc, i);
-
- if (att->attisdropped)
- continue;
-
- if (att->attgenerated)
- {
- /*
- * Generated cols are skipped unless they are
- * present in a column list.
- */
- if (!bms_is_member(att->attnum, cols))
- continue;
-
- att_gen_present = true;
- }
-
- nliveatts++;
- }
-
- /*
- * Generated attributes are published only when they are
- * present in the column list. Otherwise, a NULL column
- * list means publish all columns.
- */
- if (!att_gen_present && bms_num_members(cols) == nliveatts)
- {
- bms_free(cols);
- cols = NULL;
- }
- }
-
- ReleaseSysCache(cftuple);
+ relcols = pub_form_cols_map(relation, entry->include_gencols);
+ MemoryContextSwitchTo(oldcxt);
}
+
+ cols = relcols;
}
if (first)
@@ -1129,6 +1147,13 @@ pgoutput_column_list_init(PGOutputData *data, List *publications,
RelationGetRelationName(relation)));
} /* loop all subscribed publications */
+ /*
+ * If no column list publications exist, columns to be published will be
+ * computed later according to the 'publish_generated_columns' parameter.
+ */
+ if (!found_pub_collist)
+ entry->columns = NULL;
+
RelationClose(relation);
}
@@ -1541,15 +1566,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{
case REORDER_BUFFER_CHANGE_INSERT:
logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
- data->binary, relentry->columns);
+ data->binary, relentry->columns,
+ relentry->include_gencols);
break;
case REORDER_BUFFER_CHANGE_UPDATE:
logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
- new_slot, data->binary, relentry->columns);
+ new_slot, data->binary, relentry->columns,
+ relentry->include_gencols);
break;
case REORDER_BUFFER_CHANGE_DELETE:
logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
- data->binary, relentry->columns);
+ data->binary, relentry->columns,
+ relentry->include_gencols);
break;
default:
Assert(false);
@@ -2000,6 +2028,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
{
entry->replicate_valid = false;
entry->schema_sent = false;
+ entry->include_gencols = false;
entry->streamed_txns = NIL;
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
@@ -2052,6 +2081,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
* earlier definition.
*/
entry->schema_sent = false;
+ entry->include_gencols = false;
list_free(entry->streamed_txns);
entry->streamed_txns = NIL;
bms_free(entry->columns);
@@ -2223,6 +2253,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
/* Initialize the row filter */
pgoutput_row_filter_init(data, rel_publications, entry);
+ /* Check whether to publish generated columns. */
+ check_and_init_gencol(data, rel_publications, entry);
+
/* Initialize the column list */
pgoutput_column_list_init(data, rel_publications, entry);
}