diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 61 | ||||
-rw-r--r-- | src/backend/replication/repl_gram.y | 35 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 40 |
3 files changed, 90 insertions, 46 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 19ea159af4f..5c6e56a5b24 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -862,6 +862,9 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, PGresult *res; StringInfoData cmd; char *snapshot; + int use_new_options_syntax; + + use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000); initStringInfo(&cmd); @@ -872,26 +875,58 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, if (conn->logical) { - appendStringInfoString(&cmd, " LOGICAL pgoutput"); + appendStringInfoString(&cmd, " LOGICAL pgoutput "); + if (use_new_options_syntax) + appendStringInfoChar(&cmd, '('); if (two_phase) - appendStringInfoString(&cmd, " TWO_PHASE"); + { + appendStringInfoString(&cmd, "TWO_PHASE"); + if (use_new_options_syntax) + appendStringInfoString(&cmd, ", "); + else + appendStringInfoChar(&cmd, ' '); + } - switch (snapshot_action) + if (use_new_options_syntax) { - case CRS_EXPORT_SNAPSHOT: - appendStringInfoString(&cmd, " EXPORT_SNAPSHOT"); - break; - case CRS_NOEXPORT_SNAPSHOT: - appendStringInfoString(&cmd, " NOEXPORT_SNAPSHOT"); - break; - case CRS_USE_SNAPSHOT: - appendStringInfoString(&cmd, " USE_SNAPSHOT"); - break; + switch (snapshot_action) + { + case CRS_EXPORT_SNAPSHOT: + appendStringInfoString(&cmd, "SNAPSHOT 'export'"); + break; + case CRS_NOEXPORT_SNAPSHOT: + appendStringInfoString(&cmd, "SNAPSHOT 'nothing'"); + break; + case CRS_USE_SNAPSHOT: + appendStringInfoString(&cmd, "SNAPSHOT 'use'"); + break; + } } + else + { + switch (snapshot_action) + { + case CRS_EXPORT_SNAPSHOT: + appendStringInfoString(&cmd, "EXPORT_SNAPSHOT"); + break; + case CRS_NOEXPORT_SNAPSHOT: + appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT"); + break; + case CRS_USE_SNAPSHOT: + appendStringInfoString(&cmd, "USE_SNAPSHOT"); + break; + } + } + + if (use_new_options_syntax) + appendStringInfoChar(&cmd, ')'); } else { - appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL"); + if (use_new_options_syntax) + appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)"); + else + appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL"); } res = libpqrcv_PQexec(conn->streamConn, cmd.data); diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 3b59d62ed86..126380e2df7 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -103,8 +103,8 @@ static SQLCmd *make_sqlcmd(void); %type <node> plugin_opt_arg %type <str> opt_slot var_name ident_or_keyword %type <boolval> opt_temporary -%type <list> create_slot_opt_list -%type <defelt> create_slot_opt +%type <list> create_slot_options create_slot_legacy_opt_list +%type <defelt> create_slot_legacy_opt %% @@ -243,8 +243,8 @@ base_backup_legacy_opt: ; create_replication_slot: - /* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */ - K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list + /* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL [options] */ + K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_options { CreateReplicationSlotCmd *cmd; cmd = makeNode(CreateReplicationSlotCmd); @@ -254,8 +254,8 @@ create_replication_slot: cmd->options = $5; $$ = (Node *) cmd; } - /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */ - | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list + /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin [options] */ + | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_options { CreateReplicationSlotCmd *cmd; cmd = makeNode(CreateReplicationSlotCmd); @@ -268,28 +268,33 @@ create_replication_slot: } ; -create_slot_opt_list: - create_slot_opt_list create_slot_opt +create_slot_options: + '(' generic_option_list ')' { $$ = $2; } + | create_slot_legacy_opt_list { $$ = $1; } + ; + +create_slot_legacy_opt_list: + create_slot_legacy_opt_list create_slot_legacy_opt { $$ = lappend($1, $2); } | /* EMPTY */ { $$ = NIL; } ; -create_slot_opt: +create_slot_legacy_opt: K_EXPORT_SNAPSHOT { - $$ = makeDefElem("export_snapshot", - (Node *)makeInteger(true), -1); + $$ = makeDefElem("snapshot", + (Node *)makeString("export"), -1); } | K_NOEXPORT_SNAPSHOT { - $$ = makeDefElem("export_snapshot", - (Node *)makeInteger(false), -1); + $$ = makeDefElem("snapshot", + (Node *)makeString("nothing"), -1); } | K_USE_SNAPSHOT { - $$ = makeDefElem("use_snapshot", - (Node *)makeInteger(true), -1); + $$ = makeDefElem("snapshot", + (Node *)makeString("use"), -1); } | K_RESERVE_WAL { diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3ca2a11389d..b811a5c0ef2 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -872,26 +872,30 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, { DefElem *defel = (DefElem *) lfirst(lc); - if (strcmp(defel->defname, "export_snapshot") == 0) + if (strcmp(defel->defname, "snapshot") == 0) { + char *action; + if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); + action = defGetString(defel); snapshot_action_given = true; - *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT : - CRS_NOEXPORT_SNAPSHOT; - } - else if (strcmp(defel->defname, "use_snapshot") == 0) - { - if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL) + + if (strcmp(action, "export") == 0) + *snapshot_action = CRS_EXPORT_SNAPSHOT; + else if (strcmp(action, "nothing") == 0) + *snapshot_action = CRS_NOEXPORT_SNAPSHOT; + else if (strcmp(action, "use") == 0) + *snapshot_action = CRS_USE_SNAPSHOT; + else ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"", + defel->defname, action))); - snapshot_action_given = true; - *snapshot_action = CRS_USE_SNAPSHOT; } else if (strcmp(defel->defname, "reserve_wal") == 0) { @@ -901,7 +905,7 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, errmsg("conflicting or redundant options"))); reserve_wal_given = true; - *reserve_wal = true; + *reserve_wal = defGetBoolean(defel); } else if (strcmp(defel->defname, "two_phase") == 0) { @@ -910,7 +914,7 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); two_phase_given = true; - *two_phase = true; + *two_phase = defGetBoolean(defel); } else elog(ERROR, "unrecognized option: %s", defel->defname); @@ -980,7 +984,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ereport(ERROR, /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ (errmsg("%s must not be called inside a transaction", - "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT"))); + "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')"))); need_full_snapshot = true; } @@ -990,25 +994,25 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ereport(ERROR, /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ (errmsg("%s must be called inside a transaction", - "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); + "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')"))); if (XactIsoLevel != XACT_REPEATABLE_READ) ereport(ERROR, /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ (errmsg("%s must be called in REPEATABLE READ isolation mode transaction", - "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); + "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')"))); if (FirstSnapshotSet) ereport(ERROR, /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ (errmsg("%s must be called before any query", - "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); + "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')"))); if (IsSubTransaction()) ereport(ERROR, /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ (errmsg("%s must not be called in a subtransaction", - "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); + "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')"))); need_full_snapshot = true; } |