aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/tablesync.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2024-10-30 12:36:26 +0530
committerAmit Kapila <akapila@postgresql.org>2024-10-30 12:36:26 +0530
commit745217a051a9341e8c577ea59a87665d331d4af0 (patch)
tree84dc2a406947b44ebbe9d71214fd024a83413a32 /src/backend/replication/logical/tablesync.c
parentf22e436bff779fee4e1ce49733ba5791d4634fb1 (diff)
downloadpostgresql-745217a051a9341e8c577ea59a87665d331d4af0.tar.gz
postgresql-745217a051a9341e8c577ea59a87665d331d4af0.zip
Replicate generated columns when specified in the column list.
This commit allows logical replication to publish and replicate generated columns when explicitly listed in the column list. We also ensured that the generated columns were copied during the initial tablesync when they were published. We will allow to replicate generated columns even when they are not specified in the column list (via a new publication option) in a separate commit. The motivation of this work is to allow replication for cases where the client doesn't have generated columns. For example, the case where one is trying to replicate data from Postgres to the non-Postgres database. Author: Shubham Khanna, Vignesh C, Hou Zhijie Reviewed-by: Peter Smith, Hayato Kuroda, Shlok Kyal, Amit Kapila Discussion: https://postgr.es/m/B80D17B2-2C8E-4C7D-87F2-E5B4BE3C069E@gmail.com
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r--src/backend/replication/logical/tablesync.c56
1 files changed, 40 insertions, 16 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index d4b5d210e3e..118503fcb76 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -787,23 +787,27 @@ copy_read_data(void *outbuf, int minread, int maxread)
/*
* Get information about remote relation in similar fashion the RELATION
- * message provides during replication. This function also returns the relation
- * qualifications to be used in the COPY command.
+ * message provides during replication.
+ *
+ * This function also returns (a) the relation qualifications to be used in
+ * the COPY command, and (b) whether the remote relation has published any
+ * generated column.
*/
static void
-fetch_remote_table_info(char *nspname, char *relname,
- LogicalRepRelation *lrel, List **qual)
+fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
+ List **qual, bool *gencol_published)
{
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
- Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
+ Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
Oid qualRow[] = {TEXTOID};
bool isnull;
int natt;
StringInfo pub_names = NULL;
Bitmapset *included_cols = NULL;
+ int server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
lrel->nspname = nspname;
lrel->relname = relname;
@@ -851,7 +855,7 @@ fetch_remote_table_info(char *nspname, char *relname,
* We need to do this before fetching info about column names and types,
* so that we can skip columns that should not be replicated.
*/
- if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+ if (server_version >= 150000)
{
WalRcvExecResult *pubres;
TupleTableSlot *tslot;
@@ -941,7 +945,13 @@ fetch_remote_table_info(char *nspname, char *relname,
"SELECT a.attnum,"
" a.attname,"
" a.atttypid,"
- " a.attnum = ANY(i.indkey)"
+ " a.attnum = ANY(i.indkey)");
+
+ /* Generated columns can be replicated since version 18. */
+ if (server_version >= 180000)
+ appendStringInfo(&cmd, ", a.attgenerated != ''");
+
+ appendStringInfo(&cmd,
" FROM pg_catalog.pg_attribute a"
" LEFT JOIN pg_catalog.pg_index i"
" ON (i.indexrelid = pg_get_replica_identity_index(%u))"
@@ -950,11 +960,11 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND a.attrelid = %u"
" ORDER BY a.attnum",
lrel->remoteid,
- (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
+ (server_version >= 120000 && server_version < 180000 ?
"AND a.attgenerated = ''" : ""),
lrel->remoteid);
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
- lengthof(attrRow), attrRow);
+ server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
@@ -998,6 +1008,13 @@ fetch_remote_table_info(char *nspname, char *relname,
if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
lrel->attkeys = bms_add_member(lrel->attkeys, natt);
+ /* Remember if the remote table has published any generated column. */
+ if (server_version >= 180000 && !(*gencol_published))
+ {
+ *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
+ Assert(!isnull);
+ }
+
/* Should never happen. */
if (++natt >= MaxTupleAttributeNumber)
elog(ERROR, "too many columns in remote table \"%s.%s\"",
@@ -1030,7 +1047,7 @@ fetch_remote_table_info(char *nspname, char *relname,
* 3) one of the subscribed publications is declared as TABLES IN SCHEMA
* that includes this relation
*/
- if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+ if (server_version >= 150000)
{
/* Reuse the already-built pub_names. */
Assert(pub_names != NULL);
@@ -1106,10 +1123,12 @@ copy_table(Relation rel)
List *attnamelist;
ParseState *pstate;
List *options = NIL;
+ bool gencol_published = false;
/* Get the publisher relation info. */
fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
- RelationGetRelationName(rel), &lrel, &qual);
+ RelationGetRelationName(rel), &lrel, &qual,
+ &gencol_published);
/* Put the relation into relmap. */
logicalrep_relmap_update(&lrel);
@@ -1121,8 +1140,8 @@ copy_table(Relation rel)
/* Start copy on the publisher. */
initStringInfo(&cmd);
- /* Regular table with no row filter */
- if (lrel.relkind == RELKIND_RELATION && qual == NIL)
+ /* Regular table with no row filter or generated columns */
+ if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published)
{
appendStringInfo(&cmd, "COPY %s",
quote_qualified_identifier(lrel.nspname, lrel.relname));
@@ -1153,9 +1172,14 @@ copy_table(Relation rel)
{
/*
* For non-tables and tables with row filters, we need to do COPY
- * (SELECT ...), but we can't just do SELECT * because we need to not
- * copy generated columns. For tables with any row filters, build a
- * SELECT query with OR'ed row filters for COPY.
+ * (SELECT ...), but we can't just do SELECT * because we may need to
+ * copy only subset of columns including generated columns. For tables
+ * with any row filters, build a SELECT query with OR'ed row filters
+ * for COPY.
+ *
+ * We also need to use this same COPY (SELECT ...) syntax when
+ * generated columns are published, because copy of generated columns
+ * is not supported by the normal COPY.
*/
appendStringInfoString(&cmd, "COPY (SELECT ");
for (int i = 0; i < lrel.natts; i++)