diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/pg_publication.c | 36 | ||||
-rw-r--r-- | src/backend/commands/publicationcmds.c | 68 | ||||
-rw-r--r-- | src/backend/replication/logical/proto.c | 66 | ||||
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 36 | ||||
-rw-r--r-- | src/backend/utils/cache/relcache.c | 2 |
5 files changed, 148 insertions, 60 deletions
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index b89098f5e99..7900a8f6a13 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -622,10 +622,11 @@ pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt) /* * Returns a bitmap representing the columns of the specified table. * - * Generated columns are included if include_gencols is true. + * Generated columns are included if include_gencols_type is + * PUBLISH_GENCOLS_STORED. */ Bitmapset * -pub_form_cols_map(Relation relation, bool include_gencols) +pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type) { Bitmapset *result = NULL; TupleDesc desc = RelationGetDescr(relation); @@ -634,9 +635,20 @@ pub_form_cols_map(Relation relation, bool include_gencols) { Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || (att->attgenerated && !include_gencols)) + if (att->attisdropped) continue; + if (att->attgenerated) + { + /* We only support replication of STORED generated cols. */ + if (att->attgenerated != ATTRIBUTE_GENERATED_STORED) + continue; + + /* User hasn't requested to replicate STORED generated cols. */ + if (include_gencols_type != PUBLISH_GENCOLS_STORED) + continue; + } + result = bms_add_member(result, att->attnum); } @@ -1068,7 +1080,7 @@ GetPublication(Oid pubid) pub->pubactions.pubdelete = pubform->pubdelete; pub->pubactions.pubtruncate = pubform->pubtruncate; pub->pubviaroot = pubform->pubviaroot; - pub->pubgencols = pubform->pubgencols; + pub->pubgencols_type = pubform->pubgencols_type; ReleaseSysCache(tup); @@ -1276,9 +1288,23 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) { Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->attisdropped || (att->attgenerated && !pub->pubgencols)) + if (att->attisdropped) continue; + if (att->attgenerated) + { + /* We only support replication of STORED generated cols. */ + if (att->attgenerated != ATTRIBUTE_GENERATED_STORED) + continue; + + /* + * User hasn't requested to replicate STORED generated + * cols. + */ + if (pub->pubgencols_type != PUBLISH_GENCOLS_STORED) + continue; + } + attnums[nattnums++] = att->attnum; } diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 35747b3df5f..b49d9ab78bf 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -70,6 +70,7 @@ static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok); static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, AlterPublicationStmt *stmt); static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok); +static char defGetGeneratedColsOption(DefElem *def); static void @@ -80,7 +81,7 @@ parse_publication_options(ParseState *pstate, bool *publish_via_partition_root_given, bool *publish_via_partition_root, bool *publish_generated_columns_given, - bool *publish_generated_columns) + char *publish_generated_columns) { ListCell *lc; @@ -94,7 +95,7 @@ parse_publication_options(ParseState *pstate, pubactions->pubdelete = true; pubactions->pubtruncate = true; *publish_via_partition_root = false; - *publish_generated_columns = false; + *publish_generated_columns = PUBLISH_GENCOLS_NONE; /* Parse options */ foreach(lc, options) @@ -160,7 +161,7 @@ parse_publication_options(ParseState *pstate, if (*publish_generated_columns_given) errorConflictingDefElem(defel, pstate); *publish_generated_columns_given = true; - *publish_generated_columns = defGetBoolean(defel); + *publish_generated_columns = defGetGeneratedColsOption(defel); } else ereport(ERROR, @@ -344,15 +345,16 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, * by the column list. If any column is missing, *invalid_column_list is set * to true. * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY - * are published either by listing them in the column list or by enabling - * publish_generated_columns option. If any unpublished generated column is - * found, *invalid_gen_col is set to true. + * are published, either by being explicitly named in the column list or, if + * no column list is specified, by setting the option + * publish_generated_columns to stored. If any unpublished + * generated column is found, *invalid_gen_col is set to true. * * Returns true if any of the above conditions are not met. */ bool pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, - bool pubviaroot, bool pubgencols, + bool pubviaroot, char pubgencols_type, bool *invalid_column_list, bool *invalid_gen_col) { @@ -394,10 +396,10 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, /* * As we don't allow a column list with REPLICA IDENTITY FULL, the - * publish_generated_columns option must be set to true if the table + * publish_generated_columns option must be set to stored if the table * has any stored generated columns. */ - if (!pubgencols && + if (pubgencols_type != PUBLISH_GENCOLS_STORED && relation->rd_att->constr && relation->rd_att->constr->has_generated_stored) *invalid_gen_col = true; @@ -425,10 +427,10 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, if (columns == NULL) { /* - * The publish_generated_columns option must be set to true if the - * REPLICA IDENTITY contains any stored generated column. + * The publish_generated_columns option must be set to stored if + * the REPLICA IDENTITY contains any stored generated column. */ - if (!pubgencols && att->attgenerated) + if (pubgencols_type != PUBLISH_GENCOLS_STORED && att->attgenerated) { *invalid_gen_col = true; break; @@ -775,7 +777,7 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) bool publish_via_partition_root_given; bool publish_via_partition_root; bool publish_generated_columns_given; - bool publish_generated_columns; + char publish_generated_columns; AclResult aclresult; List *relations = NIL; List *schemaidlist = NIL; @@ -834,8 +836,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) BoolGetDatum(pubactions.pubtruncate); values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root); - values[Anum_pg_publication_pubgencols - 1] = - BoolGetDatum(publish_generated_columns); + values[Anum_pg_publication_pubgencols_type - 1] = + CharGetDatum(publish_generated_columns); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -922,7 +924,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, bool publish_via_partition_root_given; bool publish_via_partition_root; bool publish_generated_columns_given; - bool publish_generated_columns; + char publish_generated_columns; ObjectAddress obj; Form_pg_publication pubform; List *root_relids = NIL; @@ -1046,8 +1048,8 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, if (publish_generated_columns_given) { - values[Anum_pg_publication_pubgencols - 1] = BoolGetDatum(publish_generated_columns); - replaces[Anum_pg_publication_pubgencols - 1] = true; + values[Anum_pg_publication_pubgencols_type - 1] = CharGetDatum(publish_generated_columns); + replaces[Anum_pg_publication_pubgencols_type - 1] = true; } tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, @@ -2043,3 +2045,33 @@ AlterPublicationOwner_oid(Oid subid, Oid newOwnerId) table_close(rel, RowExclusiveLock); } + +/* + * Extract the publish_generated_columns option value from a DefElem. "stored" + * and "none" values are accepted. + */ +static char +defGetGeneratedColsOption(DefElem *def) +{ + char *sval; + + /* + * If no parameter value given, assume "stored" is meant. + */ + if (!def->arg) + return PUBLISH_GENCOLS_STORED; + + sval = defGetString(def); + + if (pg_strcasecmp(sval, "none") == 0) + return PUBLISH_GENCOLS_NONE; + if (pg_strcasecmp(sval, "stored") == 0) + return PUBLISH_GENCOLS_STORED; + + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s requires a \"none\" or \"stored\" value", + def->defname)); + + return PUBLISH_GENCOLS_NONE; /* keep compiler quiet */ +} diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index bef350714db..dc72b7c8f77 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -30,11 +30,12 @@ #define TRUNCATE_RESTART_SEQS (1<<1) static void logicalrep_write_attrs(StringInfo out, Relation rel, - Bitmapset *columns, bool include_gencols); + Bitmapset *columns, + PublishGencolsType include_gencols_type); static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, bool binary, Bitmapset *columns, - bool include_gencols); + PublishGencolsType include_gencols_type); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -401,7 +402,8 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, - Bitmapset *columns, bool include_gencols) + Bitmapset *columns, + PublishGencolsType include_gencols_type) { pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); @@ -413,7 +415,8 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newslot, binary, columns, include_gencols); + logicalrep_write_tuple(out, rel, newslot, binary, columns, + include_gencols_type); } /* @@ -446,7 +449,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, - bool binary, Bitmapset *columns, bool include_gencols) + bool binary, Bitmapset *columns, + PublishGencolsType include_gencols_type) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); @@ -468,11 +472,12 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ logicalrep_write_tuple(out, rel, oldslot, binary, columns, - include_gencols); + include_gencols_type); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newslot, binary, columns, include_gencols); + logicalrep_write_tuple(out, rel, newslot, binary, columns, + include_gencols_type); } /* @@ -522,7 +527,8 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, - Bitmapset *columns, bool include_gencols) + Bitmapset *columns, + PublishGencolsType include_gencols_type) { Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -542,7 +548,8 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldslot, binary, columns, include_gencols); + logicalrep_write_tuple(out, rel, oldslot, binary, columns, + include_gencols_type); } /* @@ -658,7 +665,8 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, */ void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, - Bitmapset *columns, bool include_gencols) + Bitmapset *columns, + PublishGencolsType include_gencols_type) { char *relname; @@ -680,7 +688,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, rel->rd_rel->relreplident); /* send the attribute info */ - logicalrep_write_attrs(out, rel, columns, include_gencols); + logicalrep_write_attrs(out, rel, columns, include_gencols_type); } /* @@ -757,7 +765,8 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) */ static void logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, - bool binary, Bitmapset *columns, bool include_gencols) + bool binary, Bitmapset *columns, + PublishGencolsType include_gencols_type) { TupleDesc desc; Datum *values; @@ -771,7 +780,8 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, { Form_pg_attribute att = TupleDescAttr(desc, i); - if (!logicalrep_should_publish_column(att, columns, include_gencols)) + if (!logicalrep_should_publish_column(att, columns, + include_gencols_type)) continue; nliveatts++; @@ -789,7 +799,8 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot, Form_pg_type typclass; Form_pg_attribute att = TupleDescAttr(desc, i); - if (!logicalrep_should_publish_column(att, columns, include_gencols)) + if (!logicalrep_should_publish_column(att, columns, + include_gencols_type)) continue; if (isnull[i]) @@ -908,7 +919,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) */ static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, - bool include_gencols) + PublishGencolsType include_gencols_type) { TupleDesc desc; int i; @@ -923,7 +934,8 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, { Form_pg_attribute att = TupleDescAttr(desc, i); - if (!logicalrep_should_publish_column(att, columns, include_gencols)) + if (!logicalrep_should_publish_column(att, columns, + include_gencols_type)) continue; nliveatts++; @@ -941,7 +953,8 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns, Form_pg_attribute att = TupleDescAttr(desc, i); uint8 flags = 0; - if (!logicalrep_should_publish_column(att, columns, include_gencols)) + if (!logicalrep_should_publish_column(att, columns, + include_gencols_type)) continue; /* REPLICA IDENTITY FULL means all columns are sent as part of key. */ @@ -1254,16 +1267,17 @@ logicalrep_message_type(LogicalRepMsgType action) * * 'columns' represents the publication column list (if any) for that table. * - * 'include_gencols' flag indicates whether generated columns should be + * 'include_gencols_type' value indicates whether generated columns should be * published when there is no column list. Typically, this will have the same * value as the 'publish_generated_columns' publication parameter. * * Note that generated columns can be published only when present in a - * publication column list, or when include_gencols is true. + * publication column list, or when include_gencols_type is + * PUBLISH_GENCOLS_STORED. */ bool logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, - bool include_gencols) + PublishGencolsType include_gencols_type) { if (att->attisdropped) return false; @@ -1273,5 +1287,15 @@ logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, return bms_is_member(att->attnum, columns); /* All non-generated columns are always published. */ - return att->attgenerated ? include_gencols : true; + if (!att->attgenerated) + return true; + + /* + * Stored generated columns are only published when the user sets + * publish_generated_columns as stored. + */ + if (att->attgenerated == ATTRIBUTE_GENERATED_STORED) + return include_gencols_type == PUBLISH_GENCOLS_STORED; + + return false; } diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 2b7499b34b9..a363c88ffc0 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -128,10 +128,13 @@ typedef struct RelationSyncEntry bool schema_sent; /* - * This is set if the 'publish_generated_columns' parameter is true, and - * the relation contains generated columns. + * This will be PUBLISH_GENCOLS_STORED if the relation contains generated + * columns and the 'publish_generated_columns' parameter is set to + * PUBLISH_GENCOLS_STORED. Otherwise, it will be PUBLISH_GENCOLS_NONE, + * indicating that no generated columns should be published, unless + * explicitly specified in the column list. */ - bool include_gencols; + PublishGencolsType include_gencols_type; List *streamed_txns; /* streamed toplevel transactions with this * schema */ @@ -763,7 +766,7 @@ send_relation_and_attrs(Relation relation, TransactionId xid, { TupleDesc desc = RelationGetDescr(relation); Bitmapset *columns = relentry->columns; - bool include_gencols = relentry->include_gencols; + PublishGencolsType include_gencols_type = relentry->include_gencols_type; int i; /* @@ -778,7 +781,8 @@ send_relation_and_attrs(Relation relation, TransactionId xid, { Form_pg_attribute att = TupleDescAttr(desc, i); - if (!logicalrep_should_publish_column(att, columns, include_gencols)) + if (!logicalrep_should_publish_column(att, columns, + include_gencols_type)) continue; if (att->atttypid < FirstGenbkiObjectId) @@ -790,7 +794,8 @@ send_relation_and_attrs(Relation relation, TransactionId xid, } OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, xid, relation, columns, include_gencols); + logicalrep_write_rel(ctx->out, xid, relation, columns, + include_gencols_type); OutputPluginWrite(ctx, false); } @@ -1044,7 +1049,7 @@ check_and_init_gencol(PGOutputData *data, List *publications, /* There are no generated columns to be published. */ if (!gencolpresent) { - entry->include_gencols = false; + entry->include_gencols_type = PUBLISH_GENCOLS_NONE; return; } @@ -1064,10 +1069,10 @@ check_and_init_gencol(PGOutputData *data, List *publications, if (first) { - entry->include_gencols = pub->pubgencols; + entry->include_gencols_type = pub->pubgencols_type; first = false; } - else if (entry->include_gencols != pub->pubgencols) + else if (entry->include_gencols_type != pub->pubgencols_type) ereport(ERROR, errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications", @@ -1131,7 +1136,8 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, { MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt); - relcols = pub_form_cols_map(relation, entry->include_gencols); + relcols = pub_form_cols_map(relation, + entry->include_gencols_type); MemoryContextSwitchTo(oldcxt); } @@ -1571,17 +1577,17 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INSERT: logicalrep_write_insert(ctx->out, xid, targetrel, new_slot, data->binary, relentry->columns, - relentry->include_gencols); + relentry->include_gencols_type); break; case REORDER_BUFFER_CHANGE_UPDATE: logicalrep_write_update(ctx->out, xid, targetrel, old_slot, new_slot, data->binary, relentry->columns, - relentry->include_gencols); + relentry->include_gencols_type); break; case REORDER_BUFFER_CHANGE_DELETE: logicalrep_write_delete(ctx->out, xid, targetrel, old_slot, data->binary, relentry->columns, - relentry->include_gencols); + relentry->include_gencols_type); break; default: Assert(false); @@ -2032,7 +2038,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) { entry->replicate_valid = false; entry->schema_sent = false; - entry->include_gencols = false; + entry->include_gencols_type = PUBLISH_GENCOLS_NONE; entry->streamed_txns = NIL; entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; @@ -2082,7 +2088,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) * earlier definition. */ entry->schema_sent = false; - entry->include_gencols = false; + entry->include_gencols_type = PUBLISH_GENCOLS_NONE; list_free(entry->streamed_txns); entry->streamed_txns = NIL; bms_free(entry->columns); diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 43219a9629c..ee39d085ebe 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -5820,7 +5820,7 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) if ((pubform->pubupdate || pubform->pubdelete) && pub_contains_invalid_column(pubid, relation, ancestors, pubform->pubviaroot, - pubform->pubgencols, + pubform->pubgencols_type, &invalid_column_list, &invalid_gen_col)) { |