diff options
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 46 |
1 files changed, 39 insertions, 7 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 40b6377a852..1696454c0bb 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -63,7 +63,8 @@ parse_subscription_options(List *options, bool *copy_data, char **synchronous_commit, bool *refresh, - bool *binary_given, bool *binary) + bool *binary_given, bool *binary, + bool *streaming_given, bool *streaming) { ListCell *lc; bool connect_given = false; @@ -99,6 +100,11 @@ parse_subscription_options(List *options, *binary_given = false; *binary = false; } + if (streaming) + { + *streaming_given = false; + *streaming = false; + } /* Parse options */ foreach(lc, options) @@ -194,6 +200,16 @@ parse_subscription_options(List *options, *binary_given = true; *binary = defGetBoolean(defel); } + else if (strcmp(defel->defname, "streaming") == 0 && streaming) + { + if (*streaming_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + *streaming_given = true; + *streaming = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -337,6 +353,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) bool enabled_given; bool enabled; bool copy_data; + bool streaming; + bool streaming_given; char *synchronous_commit; char *conninfo; char *slotname; @@ -360,7 +378,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) ©_data, &synchronous_commit, NULL, /* no "refresh" */ - &binary_given, &binary); + &binary_given, &binary, + &streaming_given, &streaming); /* * Since creating a replication slot is not transactional, rolling back @@ -427,6 +446,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary); + values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (slotname) @@ -698,6 +718,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) char *synchronous_commit; bool binary_given; bool binary; + bool streaming_given; + bool streaming; parse_subscription_options(stmt->options, NULL, /* no "connect" */ @@ -707,7 +729,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) NULL, /* no "copy_data" */ &synchronous_commit, NULL, /* no "refresh" */ - &binary_given, &binary); + &binary_given, &binary, + &streaming_given, &streaming); if (slotname_given) { @@ -739,6 +762,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) replaces[Anum_pg_subscription_subbinary - 1] = true; } + if (streaming_given) + { + values[Anum_pg_subscription_substream - 1] = + BoolGetDatum(streaming); + replaces[Anum_pg_subscription_substream - 1] = true; + } + update_tuple = true; break; } @@ -756,7 +786,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) NULL, /* no "copy_data" */ NULL, /* no "synchronous_commit" */ NULL, /* no "refresh" */ - NULL, NULL); /* no "binary" */ + NULL, NULL, /* no "binary" */ + NULL, NULL); /* no streaming */ Assert(enabled_given); if (!sub->slotname && enabled) @@ -800,8 +831,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) ©_data, NULL, /* no "synchronous_commit" */ &refresh, - NULL, NULL); /* no "binary" */ - + NULL, NULL, /* no "binary" */ + NULL, NULL); /* no "streaming" */ values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); replaces[Anum_pg_subscription_subpublications - 1] = true; @@ -843,7 +874,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) ©_data, NULL, /* no "synchronous_commit" */ NULL, /* no "refresh" */ - NULL, NULL); /* no "binary" */ + NULL, NULL, /* no "binary" */ + NULL, NULL); /* no "streaming" */ AlterSubscription_refresh(sub, copy_data); |