diff options
-rw-r--r-- | doc/src/sgml/ref/pg_receivewal.sgml | 11 | ||||
-rw-r--r-- | src/bin/pg_basebackup/pg_receivewal.c | 31 | ||||
-rw-r--r-- | src/bin/pg_basebackup/streamutil.c | 97 | ||||
-rw-r--r-- | src/bin/pg_basebackup/streamutil.h | 3 | ||||
-rw-r--r-- | src/bin/pg_basebackup/t/020_pg_receivewal.pl | 53 |
5 files changed, 191 insertions, 4 deletions
diff --git a/doc/src/sgml/ref/pg_receivewal.sgml b/doc/src/sgml/ref/pg_receivewal.sgml index 6da8b2be8c0..d3c74882937 100644 --- a/doc/src/sgml/ref/pg_receivewal.sgml +++ b/doc/src/sgml/ref/pg_receivewal.sgml @@ -90,6 +90,17 @@ PostgreSQL documentation <listitem> <para> + If a starting point cannot not be calculated with the previous method, + and if a replication slot is used, an extra + <command>READ_REPLICATION_SLOT</command> command is issued to retrieve + the slot's <literal>restart_lsn</literal> to use as starting point. + This option is only available when streaming write-ahead logs from + <productname>PostgreSQL</productname> 15 and up. + </para> + </listitem> + + <listitem> + <para> If a starting point cannot be calculated with the previous method, the latest WAL flush location is used as reported by the server from a <literal>IDENTIFY_SYSTEM</literal> command. diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c index d5140a79fea..04ba20b1974 100644 --- a/src/bin/pg_basebackup/pg_receivewal.c +++ b/src/bin/pg_basebackup/pg_receivewal.c @@ -404,15 +404,40 @@ StreamLog(void) exit(1); /* - * Figure out where to start streaming. + * Figure out where to start streaming. First scan the local directory. */ stream.startpos = FindStreamingStart(&stream.timeline); if (stream.startpos == InvalidXLogRecPtr) { - stream.startpos = serverpos; - stream.timeline = servertli; + /* + * Try to get the starting point from the slot if any. This is + * supported in PostgreSQL 15 and newer. + */ + if (replication_slot != NULL && + PQserverVersion(conn) >= 150000) + { + if (!GetSlotInformation(conn, replication_slot, &stream.startpos, + &stream.timeline)) + { + /* Error is logged by GetSlotInformation() */ + return; + } + } + + /* + * If it the starting point is still not known, use the current WAL + * flush value as last resort. + */ + if (stream.startpos == InvalidXLogRecPtr) + { + stream.startpos = serverpos; + stream.timeline = servertli; + } } + Assert(stream.startpos != InvalidXLogRecPtr && + stream.timeline != 0); + /* * Always start streaming at the beginning of a segment */ diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c index a9bc1ce2149..2a3e0c688fd 100644 --- a/src/bin/pg_basebackup/streamutil.c +++ b/src/bin/pg_basebackup/streamutil.c @@ -480,6 +480,103 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, } /* + * Run READ_REPLICATION_SLOT through a given connection and give back to + * caller some result information if requested for this slot: + * - Start LSN position, InvalidXLogRecPtr if unknown. + * - Current timeline ID, 0 if unknown. + * Returns false on failure, and true otherwise. + */ +bool +GetSlotInformation(PGconn *conn, const char *slot_name, + XLogRecPtr *restart_lsn, TimeLineID *restart_tli) +{ + PGresult *res; + PQExpBuffer query; + XLogRecPtr lsn_loc = InvalidXLogRecPtr; + TimeLineID tli_loc = 0; + + if (restart_lsn) + *restart_lsn = lsn_loc; + if (restart_tli) + *restart_tli = tli_loc; + + query = createPQExpBuffer(); + appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s", slot_name); + res = PQexec(conn, query->data); + destroyPQExpBuffer(query); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not send replication command \"%s\": %s", + "READ_REPLICATION_SLOT", PQerrorMessage(conn)); + PQclear(res); + return false; + } + + /* The command should always return precisely one tuple and three fields */ + if (PQntuples(res) != 1 || PQnfields(res) != 3) + { + pg_log_error("could not read replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields", + slot_name, PQntuples(res), PQnfields(res), 1, 3); + PQclear(res); + return false; + } + + /* + * When the slot doesn't exist, the command returns a tuple with NULL + * values. This checks only the slot type field. + */ + if (PQgetisnull(res, 0, 0)) + { + pg_log_error("could not find replication slot \"%s\"", slot_name); + PQclear(res); + return false; + } + + /* + * Note that this cannot happen as READ_REPLICATION_SLOT supports only + * physical slots, but play it safe. + */ + if (strcmp(PQgetvalue(res, 0, 0), "physical") != 0) + { + pg_log_error("expected a physical replication slot, got type \"%s\" instead", + PQgetvalue(res, 0, 0)); + PQclear(res); + return false; + } + + /* restart LSN */ + if (!PQgetisnull(res, 0, 1)) + { + uint32 hi, + lo; + + if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2) + { + pg_log_error("could not parse restart_lsn \"%s\" for replication slot \"%s\"", + PQgetvalue(res, 0, 1), slot_name); + PQclear(res); + return false; + } + lsn_loc = ((uint64) hi) << 32 | lo; + } + + /* current TLI */ + if (!PQgetisnull(res, 0, 2)) + tli_loc = (TimeLineID) atol(PQgetvalue(res, 0, 2)); + + PQclear(res); + + /* Assign results if requested */ + if (restart_lsn) + *restart_lsn = lsn_loc; + if (restart_tli) + *restart_tli = tli_loc; + + return true; +} + +/* * Create a replication slot for the given connection. This function * returns true in case of success. */ diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h index 65135c79e08..7918935cb31 100644 --- a/src/bin/pg_basebackup/streamutil.h +++ b/src/bin/pg_basebackup/streamutil.h @@ -52,6 +52,9 @@ extern void AppendIntegerCommandOption(PQExpBuffer buf, bool use_new_option_syntax, char *option_name, int32 option_value); +extern bool GetSlotInformation(PGconn *conn, const char *slot_name, + XLogRecPtr *restart_lsn, + TimeLineID *restart_tli); extern bool RetrieveWalSegSize(PGconn *conn); extern TimestampTz feGetCurrentTimestamp(void); extern void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl index b93493b5e9b..092c9b6f258 100644 --- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl +++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl @@ -5,7 +5,7 @@ use strict; use warnings; use PostgreSQL::Test::Utils; use PostgreSQL::Test::Cluster; -use Test::More tests => 27; +use Test::More tests => 31; program_help_ok('pg_receivewal'); program_version_ok('pg_receivewal'); @@ -72,6 +72,8 @@ $primary->command_ok( my @partial_wals = glob "$stream_dir/*\.partial"; is(scalar(@partial_wals), 1, "one partial WAL segment was created"); +note "Testing pg_receivewal with compression methods"; + # Check ZLIB compression if available. SKIP: { @@ -155,3 +157,52 @@ SKIP: ok(check_mode_recursive($stream_dir, 0700, 0600), "check stream dir permissions"); } + +note "Testing pg_receivewal with slot as starting streaming point"; + +# When using a replication slot, archiving should be resumed from the slot's +# restart LSN. Use a new archive location and new slot for this test. +my $slot_dir = $primary->basedir . '/slot_wal'; +mkdir($slot_dir); +$slot_name = 'archive_slot'; + +# Setup the slot, reserving WAL at creation (corresponding to the +# last redo LSN here, actually). +$primary->psql('postgres', + "SELECT pg_create_physical_replication_slot('$slot_name', true);"); + +# Get the segment name associated with the slot's restart LSN, that should +# be archived. +my $walfile_streamed = $primary->safe_psql( + 'postgres', + "SELECT pg_walfile_name(restart_lsn) + FROM pg_replication_slots + WHERE slot_name = '$slot_name';"); + +# Switch to a new segment, to make sure that the segment retained by the +# slot is still streamed. This may not be necessary, but play it safe. +$primary->psql('postgres', + 'INSERT INTO test_table VALUES (generate_series(1,100));'); +$primary->psql('postgres', 'SELECT pg_switch_wal();'); +$nextlsn = + $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();'); +chomp($nextlsn); + +# Check case where the slot does not exist. +$primary->command_fails_like( + [ + 'pg_receivewal', '-D', $slot_dir, '--slot', + 'nonexistentslot', '-n', '--no-sync', '--verbose', + '--endpos', $nextlsn + ], + qr/pg_receivewal: error: could not find replication slot "nonexistentslot"/, + 'pg_receivewal fails with non-existing slot'); +$primary->command_ok( + [ + 'pg_receivewal', '-D', $slot_dir, '--slot', + $slot_name, '-n', '--no-sync', '--verbose', + '--endpos', $nextlsn + ], + "WAL streamed from the slot's restart_lsn"); +ok(-e "$slot_dir/$walfile_streamed", + "WAL from the slot's restart_lsn has been archived"); |