aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/src/sgml/ref/pg_receivewal.sgml11
-rw-r--r--src/bin/pg_basebackup/pg_receivewal.c31
-rw-r--r--src/bin/pg_basebackup/streamutil.c97
-rw-r--r--src/bin/pg_basebackup/streamutil.h3
-rw-r--r--src/bin/pg_basebackup/t/020_pg_receivewal.pl53
5 files changed, 191 insertions, 4 deletions
diff --git a/doc/src/sgml/ref/pg_receivewal.sgml b/doc/src/sgml/ref/pg_receivewal.sgml
index 6da8b2be8c0..d3c74882937 100644
--- a/doc/src/sgml/ref/pg_receivewal.sgml
+++ b/doc/src/sgml/ref/pg_receivewal.sgml
@@ -90,6 +90,17 @@ PostgreSQL documentation
<listitem>
<para>
+ If a starting point cannot not be calculated with the previous method,
+ and if a replication slot is used, an extra
+ <command>READ_REPLICATION_SLOT</command> command is issued to retrieve
+ the slot's <literal>restart_lsn</literal> to use as starting point.
+ This option is only available when streaming write-ahead logs from
+ <productname>PostgreSQL</productname> 15 and up.
+ </para>
+ </listitem>
+
+ <listitem>
+ <para>
If a starting point cannot be calculated with the previous method,
the latest WAL flush location is used as reported by the server from
a <literal>IDENTIFY_SYSTEM</literal> command.
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index d5140a79fea..04ba20b1974 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -404,15 +404,40 @@ StreamLog(void)
exit(1);
/*
- * Figure out where to start streaming.
+ * Figure out where to start streaming. First scan the local directory.
*/
stream.startpos = FindStreamingStart(&stream.timeline);
if (stream.startpos == InvalidXLogRecPtr)
{
- stream.startpos = serverpos;
- stream.timeline = servertli;
+ /*
+ * Try to get the starting point from the slot if any. This is
+ * supported in PostgreSQL 15 and newer.
+ */
+ if (replication_slot != NULL &&
+ PQserverVersion(conn) >= 150000)
+ {
+ if (!GetSlotInformation(conn, replication_slot, &stream.startpos,
+ &stream.timeline))
+ {
+ /* Error is logged by GetSlotInformation() */
+ return;
+ }
+ }
+
+ /*
+ * If it the starting point is still not known, use the current WAL
+ * flush value as last resort.
+ */
+ if (stream.startpos == InvalidXLogRecPtr)
+ {
+ stream.startpos = serverpos;
+ stream.timeline = servertli;
+ }
}
+ Assert(stream.startpos != InvalidXLogRecPtr &&
+ stream.timeline != 0);
+
/*
* Always start streaming at the beginning of a segment
*/
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index a9bc1ce2149..2a3e0c688fd 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -480,6 +480,103 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
}
/*
+ * Run READ_REPLICATION_SLOT through a given connection and give back to
+ * caller some result information if requested for this slot:
+ * - Start LSN position, InvalidXLogRecPtr if unknown.
+ * - Current timeline ID, 0 if unknown.
+ * Returns false on failure, and true otherwise.
+ */
+bool
+GetSlotInformation(PGconn *conn, const char *slot_name,
+ XLogRecPtr *restart_lsn, TimeLineID *restart_tli)
+{
+ PGresult *res;
+ PQExpBuffer query;
+ XLogRecPtr lsn_loc = InvalidXLogRecPtr;
+ TimeLineID tli_loc = 0;
+
+ if (restart_lsn)
+ *restart_lsn = lsn_loc;
+ if (restart_tli)
+ *restart_tli = tli_loc;
+
+ query = createPQExpBuffer();
+ appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s", slot_name);
+ res = PQexec(conn, query->data);
+ destroyPQExpBuffer(query);
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not send replication command \"%s\": %s",
+ "READ_REPLICATION_SLOT", PQerrorMessage(conn));
+ PQclear(res);
+ return false;
+ }
+
+ /* The command should always return precisely one tuple and three fields */
+ if (PQntuples(res) != 1 || PQnfields(res) != 3)
+ {
+ pg_log_error("could not read replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
+ slot_name, PQntuples(res), PQnfields(res), 1, 3);
+ PQclear(res);
+ return false;
+ }
+
+ /*
+ * When the slot doesn't exist, the command returns a tuple with NULL
+ * values. This checks only the slot type field.
+ */
+ if (PQgetisnull(res, 0, 0))
+ {
+ pg_log_error("could not find replication slot \"%s\"", slot_name);
+ PQclear(res);
+ return false;
+ }
+
+ /*
+ * Note that this cannot happen as READ_REPLICATION_SLOT supports only
+ * physical slots, but play it safe.
+ */
+ if (strcmp(PQgetvalue(res, 0, 0), "physical") != 0)
+ {
+ pg_log_error("expected a physical replication slot, got type \"%s\" instead",
+ PQgetvalue(res, 0, 0));
+ PQclear(res);
+ return false;
+ }
+
+ /* restart LSN */
+ if (!PQgetisnull(res, 0, 1))
+ {
+ uint32 hi,
+ lo;
+
+ if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
+ {
+ pg_log_error("could not parse restart_lsn \"%s\" for replication slot \"%s\"",
+ PQgetvalue(res, 0, 1), slot_name);
+ PQclear(res);
+ return false;
+ }
+ lsn_loc = ((uint64) hi) << 32 | lo;
+ }
+
+ /* current TLI */
+ if (!PQgetisnull(res, 0, 2))
+ tli_loc = (TimeLineID) atol(PQgetvalue(res, 0, 2));
+
+ PQclear(res);
+
+ /* Assign results if requested */
+ if (restart_lsn)
+ *restart_lsn = lsn_loc;
+ if (restart_tli)
+ *restart_tli = tli_loc;
+
+ return true;
+}
+
+/*
* Create a replication slot for the given connection. This function
* returns true in case of success.
*/
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 65135c79e08..7918935cb31 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -52,6 +52,9 @@ extern void AppendIntegerCommandOption(PQExpBuffer buf,
bool use_new_option_syntax,
char *option_name, int32 option_value);
+extern bool GetSlotInformation(PGconn *conn, const char *slot_name,
+ XLogRecPtr *restart_lsn,
+ TimeLineID *restart_tli);
extern bool RetrieveWalSegSize(PGconn *conn);
extern TimestampTz feGetCurrentTimestamp(void);
extern void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
index b93493b5e9b..092c9b6f258 100644
--- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl
+++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
@@ -5,7 +5,7 @@ use strict;
use warnings;
use PostgreSQL::Test::Utils;
use PostgreSQL::Test::Cluster;
-use Test::More tests => 27;
+use Test::More tests => 31;
program_help_ok('pg_receivewal');
program_version_ok('pg_receivewal');
@@ -72,6 +72,8 @@ $primary->command_ok(
my @partial_wals = glob "$stream_dir/*\.partial";
is(scalar(@partial_wals), 1, "one partial WAL segment was created");
+note "Testing pg_receivewal with compression methods";
+
# Check ZLIB compression if available.
SKIP:
{
@@ -155,3 +157,52 @@ SKIP:
ok(check_mode_recursive($stream_dir, 0700, 0600),
"check stream dir permissions");
}
+
+note "Testing pg_receivewal with slot as starting streaming point";
+
+# When using a replication slot, archiving should be resumed from the slot's
+# restart LSN. Use a new archive location and new slot for this test.
+my $slot_dir = $primary->basedir . '/slot_wal';
+mkdir($slot_dir);
+$slot_name = 'archive_slot';
+
+# Setup the slot, reserving WAL at creation (corresponding to the
+# last redo LSN here, actually).
+$primary->psql('postgres',
+ "SELECT pg_create_physical_replication_slot('$slot_name', true);");
+
+# Get the segment name associated with the slot's restart LSN, that should
+# be archived.
+my $walfile_streamed = $primary->safe_psql(
+ 'postgres',
+ "SELECT pg_walfile_name(restart_lsn)
+ FROM pg_replication_slots
+ WHERE slot_name = '$slot_name';");
+
+# Switch to a new segment, to make sure that the segment retained by the
+# slot is still streamed. This may not be necessary, but play it safe.
+$primary->psql('postgres',
+ 'INSERT INTO test_table VALUES (generate_series(1,100));');
+$primary->psql('postgres', 'SELECT pg_switch_wal();');
+$nextlsn =
+ $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+chomp($nextlsn);
+
+# Check case where the slot does not exist.
+$primary->command_fails_like(
+ [
+ 'pg_receivewal', '-D', $slot_dir, '--slot',
+ 'nonexistentslot', '-n', '--no-sync', '--verbose',
+ '--endpos', $nextlsn
+ ],
+ qr/pg_receivewal: error: could not find replication slot "nonexistentslot"/,
+ 'pg_receivewal fails with non-existing slot');
+$primary->command_ok(
+ [
+ 'pg_receivewal', '-D', $slot_dir, '--slot',
+ $slot_name, '-n', '--no-sync', '--verbose',
+ '--endpos', $nextlsn
+ ],
+ "WAL streamed from the slot's restart_lsn");
+ok(-e "$slot_dir/$walfile_streamed",
+ "WAL from the slot's restart_lsn has been archived");