aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/subscriptioncmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r--src/backend/commands/subscriptioncmds.c46
1 files changed, 39 insertions, 7 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 40b6377a852..1696454c0bb 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -63,7 +63,8 @@ parse_subscription_options(List *options,
bool *copy_data,
char **synchronous_commit,
bool *refresh,
- bool *binary_given, bool *binary)
+ bool *binary_given, bool *binary,
+ bool *streaming_given, bool *streaming)
{
ListCell *lc;
bool connect_given = false;
@@ -99,6 +100,11 @@ parse_subscription_options(List *options,
*binary_given = false;
*binary = false;
}
+ if (streaming)
+ {
+ *streaming_given = false;
+ *streaming = false;
+ }
/* Parse options */
foreach(lc, options)
@@ -194,6 +200,16 @@ parse_subscription_options(List *options,
*binary_given = true;
*binary = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "streaming") == 0 && streaming)
+ {
+ if (*streaming_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+
+ *streaming_given = true;
+ *streaming = defGetBoolean(defel);
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -337,6 +353,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
bool enabled_given;
bool enabled;
bool copy_data;
+ bool streaming;
+ bool streaming_given;
char *synchronous_commit;
char *conninfo;
char *slotname;
@@ -360,7 +378,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
&copy_data,
&synchronous_commit,
NULL, /* no "refresh" */
- &binary_given, &binary);
+ &binary_given, &binary,
+ &streaming_given, &streaming);
/*
* Since creating a replication slot is not transactional, rolling back
@@ -427,6 +446,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
+ values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (slotname)
@@ -698,6 +718,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
char *synchronous_commit;
bool binary_given;
bool binary;
+ bool streaming_given;
+ bool streaming;
parse_subscription_options(stmt->options,
NULL, /* no "connect" */
@@ -707,7 +729,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
NULL, /* no "copy_data" */
&synchronous_commit,
NULL, /* no "refresh" */
- &binary_given, &binary);
+ &binary_given, &binary,
+ &streaming_given, &streaming);
if (slotname_given)
{
@@ -739,6 +762,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
replaces[Anum_pg_subscription_subbinary - 1] = true;
}
+ if (streaming_given)
+ {
+ values[Anum_pg_subscription_substream - 1] =
+ BoolGetDatum(streaming);
+ replaces[Anum_pg_subscription_substream - 1] = true;
+ }
+
update_tuple = true;
break;
}
@@ -756,7 +786,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
NULL, /* no "copy_data" */
NULL, /* no "synchronous_commit" */
NULL, /* no "refresh" */
- NULL, NULL); /* no "binary" */
+ NULL, NULL, /* no "binary" */
+ NULL, NULL); /* no streaming */
Assert(enabled_given);
if (!sub->slotname && enabled)
@@ -800,8 +831,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
&copy_data,
NULL, /* no "synchronous_commit" */
&refresh,
- NULL, NULL); /* no "binary" */
-
+ NULL, NULL, /* no "binary" */
+ NULL, NULL); /* no "streaming" */
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(stmt->publication);
replaces[Anum_pg_subscription_subpublications - 1] = true;
@@ -843,7 +874,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
&copy_data,
NULL, /* no "synchronous_commit" */
NULL, /* no "refresh" */
- NULL, NULL); /* no "binary" */
+ NULL, NULL, /* no "binary" */
+ NULL, NULL); /* no "streaming" */
AlterSubscription_refresh(sub, copy_data);