aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/tablesync.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/tablesync.c')
-rw-r--r--src/backend/replication/logical/tablesync.c56
1 files changed, 40 insertions, 16 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index d4b5d210e3e..118503fcb76 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -787,23 +787,27 @@ copy_read_data(void *outbuf, int minread, int maxread)
/*
* Get information about remote relation in similar fashion the RELATION
- * message provides during replication. This function also returns the relation
- * qualifications to be used in the COPY command.
+ * message provides during replication.
+ *
+ * This function also returns (a) the relation qualifications to be used in
+ * the COPY command, and (b) whether the remote relation has published any
+ * generated column.
*/
static void
-fetch_remote_table_info(char *nspname, char *relname,
- LogicalRepRelation *lrel, List **qual)
+fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
+ List **qual, bool *gencol_published)
{
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
- Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
+ Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
Oid qualRow[] = {TEXTOID};
bool isnull;
int natt;
StringInfo pub_names = NULL;
Bitmapset *included_cols = NULL;
+ int server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
lrel->nspname = nspname;
lrel->relname = relname;
@@ -851,7 +855,7 @@ fetch_remote_table_info(char *nspname, char *relname,
* We need to do this before fetching info about column names and types,
* so that we can skip columns that should not be replicated.
*/
- if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+ if (server_version >= 150000)
{
WalRcvExecResult *pubres;
TupleTableSlot *tslot;
@@ -941,7 +945,13 @@ fetch_remote_table_info(char *nspname, char *relname,
"SELECT a.attnum,"
" a.attname,"
" a.atttypid,"
- " a.attnum = ANY(i.indkey)"
+ " a.attnum = ANY(i.indkey)");
+
+ /* Generated columns can be replicated since version 18. */
+ if (server_version >= 180000)
+ appendStringInfo(&cmd, ", a.attgenerated != ''");
+
+ appendStringInfo(&cmd,
" FROM pg_catalog.pg_attribute a"
" LEFT JOIN pg_catalog.pg_index i"
" ON (i.indexrelid = pg_get_replica_identity_index(%u))"
@@ -950,11 +960,11 @@ fetch_remote_table_info(char *nspname, char *relname,
" AND a.attrelid = %u"
" ORDER BY a.attnum",
lrel->remoteid,
- (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
+ (server_version >= 120000 && server_version < 180000 ?
"AND a.attgenerated = ''" : ""),
lrel->remoteid);
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
- lengthof(attrRow), attrRow);
+ server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
@@ -998,6 +1008,13 @@ fetch_remote_table_info(char *nspname, char *relname,
if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
lrel->attkeys = bms_add_member(lrel->attkeys, natt);
+ /* Remember if the remote table has published any generated column. */
+ if (server_version >= 180000 && !(*gencol_published))
+ {
+ *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
+ Assert(!isnull);
+ }
+
/* Should never happen. */
if (++natt >= MaxTupleAttributeNumber)
elog(ERROR, "too many columns in remote table \"%s.%s\"",
@@ -1030,7 +1047,7 @@ fetch_remote_table_info(char *nspname, char *relname,
* 3) one of the subscribed publications is declared as TABLES IN SCHEMA
* that includes this relation
*/
- if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+ if (server_version >= 150000)
{
/* Reuse the already-built pub_names. */
Assert(pub_names != NULL);
@@ -1106,10 +1123,12 @@ copy_table(Relation rel)
List *attnamelist;
ParseState *pstate;
List *options = NIL;
+ bool gencol_published = false;
/* Get the publisher relation info. */
fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
- RelationGetRelationName(rel), &lrel, &qual);
+ RelationGetRelationName(rel), &lrel, &qual,
+ &gencol_published);
/* Put the relation into relmap. */
logicalrep_relmap_update(&lrel);
@@ -1121,8 +1140,8 @@ copy_table(Relation rel)
/* Start copy on the publisher. */
initStringInfo(&cmd);
- /* Regular table with no row filter */
- if (lrel.relkind == RELKIND_RELATION && qual == NIL)
+ /* Regular table with no row filter or generated columns */
+ if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published)
{
appendStringInfo(&cmd, "COPY %s",
quote_qualified_identifier(lrel.nspname, lrel.relname));
@@ -1153,9 +1172,14 @@ copy_table(Relation rel)
{
/*
* For non-tables and tables with row filters, we need to do COPY
- * (SELECT ...), but we can't just do SELECT * because we need to not
- * copy generated columns. For tables with any row filters, build a
- * SELECT query with OR'ed row filters for COPY.
+ * (SELECT ...), but we can't just do SELECT * because we may need to
+ * copy only subset of columns including generated columns. For tables
+ * with any row filters, build a SELECT query with OR'ed row filters
+ * for COPY.
+ *
+ * We also need to use this same COPY (SELECT ...) syntax when
+ * generated columns are published, because copy of generated columns
+ * is not supported by the normal COPY.
*/
appendStringInfoString(&cmd, "COPY (SELECT ");
for (int i = 0; i < lrel.natts; i++)