diff options
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 201 |
1 files changed, 117 insertions, 84 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 12c17359063..a6002b223df 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -84,9 +84,6 @@ static bool publications_valid; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); -static void send_relation_and_attrs(Relation relation, TransactionId xid, - LogicalDecodingContext *ctx, - Bitmapset *columns); static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); @@ -129,6 +126,12 @@ typedef struct RelationSyncEntry bool replicate_valid; /* overall validity flag for entry */ bool schema_sent; + + /* + * This is set if the 'publish_generated_columns' parameter is true, and + * the relation contains generated columns. + */ + bool include_gencols; List *streamed_txns; /* streamed toplevel transactions with this * schema */ @@ -213,6 +216,9 @@ static void init_rel_sync_cache(MemoryContext cachectx); static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit); static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Relation relation); +static void send_relation_and_attrs(Relation relation, TransactionId xid, + LogicalDecodingContext *ctx, + RelationSyncEntry *relentry); static void rel_sync_cache_relation_cb(Datum arg, Oid relid); static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue); @@ -731,11 +737,11 @@ maybe_send_schema(LogicalDecodingContext *ctx, { Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid); - send_relation_and_attrs(ancestor, xid, ctx, relentry->columns); + send_relation_and_attrs(ancestor, xid, ctx, relentry); RelationClose(ancestor); } - send_relation_and_attrs(relation, xid, ctx, relentry->columns); + send_relation_and_attrs(relation, xid, ctx, relentry); if (data->in_streaming) set_schema_sent_in_streamed_txn(relentry, topxid); @@ -749,9 +755,11 @@ maybe_send_schema(LogicalDecodingContext *ctx, static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, - Bitmapset *columns) + RelationSyncEntry *relentry) { TupleDesc desc = RelationGetDescr(relation); + Bitmapset *columns = relentry->columns; + bool include_gencols = relentry->include_gencols; int i; /* @@ -766,7 +774,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid, { Form_pg_attribute att = TupleDescAttr(desc, i); - if (!logicalrep_should_publish_column(att, columns)) + if (!logicalrep_should_publish_column(att, columns, include_gencols)) continue; if (att->atttypid < FirstGenbkiObjectId) @@ -778,7 +786,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid, } OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, xid, relation, columns); + logicalrep_write_rel(ctx->out, xid, relation, columns, include_gencols); OutputPluginWrite(ctx, false); } @@ -1005,6 +1013,66 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications, } /* + * If the table contains a generated column, check for any conflicting + * values of 'publish_generated_columns' parameter in the publications. + */ +static void +check_and_init_gencol(PGOutputData *data, List *publications, + RelationSyncEntry *entry) +{ + Relation relation = RelationIdGetRelation(entry->publish_as_relid); + TupleDesc desc = RelationGetDescr(relation); + bool gencolpresent = false; + bool first = true; + + /* Check if there is any generated column present. */ + for (int i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attgenerated) + { + gencolpresent = true; + break; + } + } + + /* There are no generated columns to be published. */ + if (!gencolpresent) + { + entry->include_gencols = false; + return; + } + + /* + * There may be a conflicting value for 'publish_generated_columns' + * parameter in the publications. + */ + foreach_ptr(Publication, pub, publications) + { + /* + * The column list takes precedence over the + * 'publish_generated_columns' parameter. Those will be checked later, + * see pgoutput_column_list_init. + */ + if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL)) + continue; + + if (first) + { + entry->include_gencols = pub->pubgencols; + first = false; + } + else if (entry->include_gencols != pub->pubgencols) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications", + get_namespace_name(RelationGetNamespace(relation)), + RelationGetRelationName(relation))); + } +} + +/* * Initialize the column list. */ static void @@ -1014,6 +1082,10 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, ListCell *lc; bool first = true; Relation relation = RelationIdGetRelation(entry->publish_as_relid); + bool found_pub_collist = false; + Bitmapset *relcols = NULL; + + pgoutput_ensure_entry_cxt(data, entry); /* * Find if there are any column lists for this relation. If there are, @@ -1027,93 +1099,39 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, * fetch_table_list. But one can later change the publication so we still * need to check all the given publication-table mappings and report an * error if any publications have a different column list. - * - * FOR ALL TABLES and FOR TABLES IN SCHEMA imply "don't use column list". */ foreach(lc, publications) { Publication *pub = lfirst(lc); - HeapTuple cftuple = NULL; - Datum cfdatum = 0; Bitmapset *cols = NULL; + /* Retrieve the bitmap of columns for a column list publication. */ + found_pub_collist |= check_and_fetch_column_list(pub, + entry->publish_as_relid, + entry->entry_cxt, &cols); + /* - * If the publication is FOR ALL TABLES then it is treated the same as - * if there are no column lists (even if other publications have a - * list). + * For non-column list publications — e.g. TABLE (without a column + * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns + * of the table (including generated columns when + * 'publish_generated_columns' parameter is true). */ - if (!pub->alltables) + if (!cols) { - bool pub_no_list = true; - /* - * Check for the presence of a column list in this publication. - * - * Note: If we find no pg_publication_rel row, it's a publication - * defined for a whole schema, so it can't have a column list, - * just like a FOR ALL TABLES publication. + * Cache the table columns for the first publication with no + * specified column list to detect publication with a different + * column list. */ - cftuple = SearchSysCache2(PUBLICATIONRELMAP, - ObjectIdGetDatum(entry->publish_as_relid), - ObjectIdGetDatum(pub->oid)); - - if (HeapTupleIsValid(cftuple)) + if (!relcols && (list_length(publications) > 1)) { - /* Lookup the column list attribute. */ - cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple, - Anum_pg_publication_rel_prattrs, - &pub_no_list); - - /* Build the column list bitmap in the per-entry context. */ - if (!pub_no_list) /* when not null */ - { - int i; - int nliveatts = 0; - TupleDesc desc = RelationGetDescr(relation); - bool att_gen_present = false; - - pgoutput_ensure_entry_cxt(data, entry); - - cols = pub_collist_to_bitmapset(cols, cfdatum, - entry->entry_cxt); + MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt); - /* Get the number of live attributes. */ - for (i = 0; i < desc->natts; i++) - { - Form_pg_attribute att = TupleDescAttr(desc, i); - - if (att->attisdropped) - continue; - - if (att->attgenerated) - { - /* - * Generated cols are skipped unless they are - * present in a column list. - */ - if (!bms_is_member(att->attnum, cols)) - continue; - - att_gen_present = true; - } - - nliveatts++; - } - - /* - * Generated attributes are published only when they are - * present in the column list. Otherwise, a NULL column - * list means publish all columns. - */ - if (!att_gen_present && bms_num_members(cols) == nliveatts) - { - bms_free(cols); - cols = NULL; - } - } - - ReleaseSysCache(cftuple); + relcols = pub_form_cols_map(relation, entry->include_gencols); + MemoryContextSwitchTo(oldcxt); } + + cols = relcols; } if (first) @@ -1129,6 +1147,13 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, RelationGetRelationName(relation))); } /* loop all subscribed publications */ + /* + * If no column list publications exist, columns to be published will be + * computed later according to the 'publish_generated_columns' parameter. + */ + if (!found_pub_collist) + entry->columns = NULL; + RelationClose(relation); } @@ -1541,15 +1566,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { case REORDER_BUFFER_CHANGE_INSERT: logicalrep_write_insert(ctx->out, xid, targetrel, new_slot, - data->binary, relentry->columns); + data->binary, relentry->columns, + relentry->include_gencols); break; case REORDER_BUFFER_CHANGE_UPDATE: logicalrep_write_update(ctx->out, xid, targetrel, old_slot, - new_slot, data->binary, relentry->columns); + new_slot, data->binary, relentry->columns, + relentry->include_gencols); break; case REORDER_BUFFER_CHANGE_DELETE: logicalrep_write_delete(ctx->out, xid, targetrel, old_slot, - data->binary, relentry->columns); + data->binary, relentry->columns, + relentry->include_gencols); break; default: Assert(false); @@ -2000,6 +2028,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) { entry->replicate_valid = false; entry->schema_sent = false; + entry->include_gencols = false; entry->streamed_txns = NIL; entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; @@ -2052,6 +2081,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) * earlier definition. */ entry->schema_sent = false; + entry->include_gencols = false; list_free(entry->streamed_txns); entry->streamed_txns = NIL; bms_free(entry->columns); @@ -2223,6 +2253,9 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) /* Initialize the row filter */ pgoutput_row_filter_init(data, rel_publications, entry); + /* Check whether to publish generated columns. */ + check_and_init_gencol(data, rel_publications, entry); + /* Initialize the column list */ pgoutput_column_list_init(data, rel_publications, entry); } |