diff options
author | Amit Kapila <akapila@postgresql.org> | 2024-10-30 12:36:26 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2024-10-30 12:36:26 +0530 |
commit | 745217a051a9341e8c577ea59a87665d331d4af0 (patch) | |
tree | 84dc2a406947b44ebbe9d71214fd024a83413a32 /src/backend/replication/logical/tablesync.c | |
parent | f22e436bff779fee4e1ce49733ba5791d4634fb1 (diff) | |
download | postgresql-745217a051a9341e8c577ea59a87665d331d4af0.tar.gz postgresql-745217a051a9341e8c577ea59a87665d331d4af0.zip |
Replicate generated columns when specified in the column list.
This commit allows logical replication to publish and replicate generated
columns when explicitly listed in the column list. We also ensured that
the generated columns were copied during the initial tablesync when they
were published.
We will allow to replicate generated columns even when they are not
specified in the column list (via a new publication option) in a separate
commit.
The motivation of this work is to allow replication for cases where the
client doesn't have generated columns. For example, the case where one is
trying to replicate data from Postgres to the non-Postgres database.
Author: Shubham Khanna, Vignesh C, Hou Zhijie
Reviewed-by: Peter Smith, Hayato Kuroda, Shlok Kyal, Amit Kapila
Discussion: https://postgr.es/m/B80D17B2-2C8E-4C7D-87F2-E5B4BE3C069E@gmail.com
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 56 |
1 files changed, 40 insertions, 16 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index d4b5d210e3e..118503fcb76 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -787,23 +787,27 @@ copy_read_data(void *outbuf, int minread, int maxread) /* * Get information about remote relation in similar fashion the RELATION - * message provides during replication. This function also returns the relation - * qualifications to be used in the COPY command. + * message provides during replication. + * + * This function also returns (a) the relation qualifications to be used in + * the COPY command, and (b) whether the remote relation has published any + * generated column. */ static void -fetch_remote_table_info(char *nspname, char *relname, - LogicalRepRelation *lrel, List **qual) +fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, + List **qual, bool *gencol_published) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; - Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID}; + Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID}; Oid qualRow[] = {TEXTOID}; bool isnull; int natt; StringInfo pub_names = NULL; Bitmapset *included_cols = NULL; + int server_version = walrcv_server_version(LogRepWorkerWalRcvConn); lrel->nspname = nspname; lrel->relname = relname; @@ -851,7 +855,7 @@ fetch_remote_table_info(char *nspname, char *relname, * We need to do this before fetching info about column names and types, * so that we can skip columns that should not be replicated. */ - if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000) + if (server_version >= 150000) { WalRcvExecResult *pubres; TupleTableSlot *tslot; @@ -941,7 +945,13 @@ fetch_remote_table_info(char *nspname, char *relname, "SELECT a.attnum," " a.attname," " a.atttypid," - " a.attnum = ANY(i.indkey)" + " a.attnum = ANY(i.indkey)"); + + /* Generated columns can be replicated since version 18. */ + if (server_version >= 180000) + appendStringInfo(&cmd, ", a.attgenerated != ''"); + + appendStringInfo(&cmd, " FROM pg_catalog.pg_attribute a" " LEFT JOIN pg_catalog.pg_index i" " ON (i.indexrelid = pg_get_replica_identity_index(%u))" @@ -950,11 +960,11 @@ fetch_remote_table_info(char *nspname, char *relname, " AND a.attrelid = %u" " ORDER BY a.attnum", lrel->remoteid, - (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ? + (server_version >= 120000 && server_version < 180000 ? "AND a.attgenerated = ''" : ""), lrel->remoteid); res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, - lengthof(attrRow), attrRow); + server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, @@ -998,6 +1008,13 @@ fetch_remote_table_info(char *nspname, char *relname, if (DatumGetBool(slot_getattr(slot, 4, &isnull))) lrel->attkeys = bms_add_member(lrel->attkeys, natt); + /* Remember if the remote table has published any generated column. */ + if (server_version >= 180000 && !(*gencol_published)) + { + *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull)); + Assert(!isnull); + } + /* Should never happen. */ if (++natt >= MaxTupleAttributeNumber) elog(ERROR, "too many columns in remote table \"%s.%s\"", @@ -1030,7 +1047,7 @@ fetch_remote_table_info(char *nspname, char *relname, * 3) one of the subscribed publications is declared as TABLES IN SCHEMA * that includes this relation */ - if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000) + if (server_version >= 150000) { /* Reuse the already-built pub_names. */ Assert(pub_names != NULL); @@ -1106,10 +1123,12 @@ copy_table(Relation rel) List *attnamelist; ParseState *pstate; List *options = NIL; + bool gencol_published = false; /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), - RelationGetRelationName(rel), &lrel, &qual); + RelationGetRelationName(rel), &lrel, &qual, + &gencol_published); /* Put the relation into relmap. */ logicalrep_relmap_update(&lrel); @@ -1121,8 +1140,8 @@ copy_table(Relation rel) /* Start copy on the publisher. */ initStringInfo(&cmd); - /* Regular table with no row filter */ - if (lrel.relkind == RELKIND_RELATION && qual == NIL) + /* Regular table with no row filter or generated columns */ + if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published) { appendStringInfo(&cmd, "COPY %s", quote_qualified_identifier(lrel.nspname, lrel.relname)); @@ -1153,9 +1172,14 @@ copy_table(Relation rel) { /* * For non-tables and tables with row filters, we need to do COPY - * (SELECT ...), but we can't just do SELECT * because we need to not - * copy generated columns. For tables with any row filters, build a - * SELECT query with OR'ed row filters for COPY. + * (SELECT ...), but we can't just do SELECT * because we may need to + * copy only subset of columns including generated columns. For tables + * with any row filters, build a SELECT query with OR'ed row filters + * for COPY. + * + * We also need to use this same COPY (SELECT ...) syntax when + * generated columns are published, because copy of generated columns + * is not supported by the normal COPY. */ appendStringInfoString(&cmd, "COPY (SELECT "); for (int i = 0; i < lrel.natts; i++) |