aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/bin/pg_rewind/Makefile5
-rw-r--r--src/bin/pg_rewind/copy_fetch.c266
-rw-r--r--src/bin/pg_rewind/fetch.c60
-rw-r--r--src/bin/pg_rewind/fetch.h44
-rw-r--r--src/bin/pg_rewind/file_ops.c133
-rw-r--r--src/bin/pg_rewind/file_ops.h3
-rw-r--r--src/bin/pg_rewind/libpq_source.c (renamed from src/bin/pg_rewind/libpq_fetch.c)395
-rw-r--r--src/bin/pg_rewind/local_source.c131
-rw-r--r--src/bin/pg_rewind/pg_rewind.c208
-rw-r--r--src/bin/pg_rewind/pg_rewind.h5
-rw-r--r--src/bin/pg_rewind/rewind_source.h73
-rw-r--r--src/tools/pgindent/typedefs.list5
12 files changed, 697 insertions, 631 deletions
diff --git a/src/bin/pg_rewind/Makefile b/src/bin/pg_rewind/Makefile
index f398c3d8488..9bfde5c087b 100644
--- a/src/bin/pg_rewind/Makefile
+++ b/src/bin/pg_rewind/Makefile
@@ -20,12 +20,11 @@ LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport)
OBJS = \
$(WIN32RES) \
- copy_fetch.o \
datapagemap.o \
- fetch.o \
file_ops.o \
filemap.o \
- libpq_fetch.o \
+ libpq_source.o \
+ local_source.o \
parsexlog.o \
pg_rewind.o \
timeline.o \
diff --git a/src/bin/pg_rewind/copy_fetch.c b/src/bin/pg_rewind/copy_fetch.c
deleted file mode 100644
index 1cd4449314d..00000000000
--- a/src/bin/pg_rewind/copy_fetch.c
+++ /dev/null
@@ -1,266 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * copy_fetch.c
- * Functions for using a data directory as the source.
- *
- * Portions Copyright (c) 2013-2020, PostgreSQL Global Development Group
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres_fe.h"
-
-#include <sys/stat.h>
-#include <dirent.h>
-#include <fcntl.h>
-#include <unistd.h>
-
-#include "datapagemap.h"
-#include "fetch.h"
-#include "file_ops.h"
-#include "filemap.h"
-#include "pg_rewind.h"
-
-static void recurse_dir(const char *datadir, const char *path,
- process_file_callback_t callback);
-
-static void execute_pagemap(datapagemap_t *pagemap, const char *path);
-
-/*
- * Traverse through all files in a data directory, calling 'callback'
- * for each file.
- */
-void
-traverse_datadir(const char *datadir, process_file_callback_t callback)
-{
- recurse_dir(datadir, NULL, callback);
-}
-
-/*
- * recursive part of traverse_datadir
- *
- * parentpath is the current subdirectory's path relative to datadir,
- * or NULL at the top level.
- */
-static void
-recurse_dir(const char *datadir, const char *parentpath,
- process_file_callback_t callback)
-{
- DIR *xldir;
- struct dirent *xlde;
- char fullparentpath[MAXPGPATH];
-
- if (parentpath)
- snprintf(fullparentpath, MAXPGPATH, "%s/%s", datadir, parentpath);
- else
- snprintf(fullparentpath, MAXPGPATH, "%s", datadir);
-
- xldir = opendir(fullparentpath);
- if (xldir == NULL)
- pg_fatal("could not open directory \"%s\": %m",
- fullparentpath);
-
- while (errno = 0, (xlde = readdir(xldir)) != NULL)
- {
- struct stat fst;
- char fullpath[MAXPGPATH * 2];
- char path[MAXPGPATH * 2];
-
- if (strcmp(xlde->d_name, ".") == 0 ||
- strcmp(xlde->d_name, "..") == 0)
- continue;
-
- snprintf(fullpath, sizeof(fullpath), "%s/%s", fullparentpath, xlde->d_name);
-
- if (lstat(fullpath, &fst) < 0)
- {
- if (errno == ENOENT)
- {
- /*
- * File doesn't exist anymore. This is ok, if the new primary
- * is running and the file was just removed. If it was a data
- * file, there should be a WAL record of the removal. If it
- * was something else, it couldn't have been anyway.
- *
- * TODO: But complain if we're processing the target dir!
- */
- }
- else
- pg_fatal("could not stat file \"%s\": %m",
- fullpath);
- }
-
- if (parentpath)
- snprintf(path, sizeof(path), "%s/%s", parentpath, xlde->d_name);
- else
- snprintf(path, sizeof(path), "%s", xlde->d_name);
-
- if (S_ISREG(fst.st_mode))
- callback(path, FILE_TYPE_REGULAR, fst.st_size, NULL);
- else if (S_ISDIR(fst.st_mode))
- {
- callback(path, FILE_TYPE_DIRECTORY, 0, NULL);
- /* recurse to handle subdirectories */
- recurse_dir(datadir, path, callback);
- }
-#ifndef WIN32
- else if (S_ISLNK(fst.st_mode))
-#else
- else if (pgwin32_is_junction(fullpath))
-#endif
- {
-#if defined(HAVE_READLINK) || defined(WIN32)
- char link_target[MAXPGPATH];
- int len;
-
- len = readlink(fullpath, link_target, sizeof(link_target));
- if (len < 0)
- pg_fatal("could not read symbolic link \"%s\": %m",
- fullpath);
- if (len >= sizeof(link_target))
- pg_fatal("symbolic link \"%s\" target is too long",
- fullpath);
- link_target[len] = '\0';
-
- callback(path, FILE_TYPE_SYMLINK, 0, link_target);
-
- /*
- * If it's a symlink within pg_tblspc, we need to recurse into it,
- * to process all the tablespaces. We also follow a symlink if
- * it's for pg_wal. Symlinks elsewhere are ignored.
- */
- if ((parentpath && strcmp(parentpath, "pg_tblspc") == 0) ||
- strcmp(path, "pg_wal") == 0)
- recurse_dir(datadir, path, callback);
-#else
- pg_fatal("\"%s\" is a symbolic link, but symbolic links are not supported on this platform",
- fullpath);
-#endif /* HAVE_READLINK */
- }
- }
-
- if (errno)
- pg_fatal("could not read directory \"%s\": %m",
- fullparentpath);
-
- if (closedir(xldir))
- pg_fatal("could not close directory \"%s\": %m",
- fullparentpath);
-}
-
-/*
- * Copy a file from source to target, between 'begin' and 'end' offsets.
- *
- * If 'trunc' is true, any existing file with the same name is truncated.
- */
-static void
-rewind_copy_file_range(const char *path, off_t begin, off_t end, bool trunc)
-{
- PGAlignedBlock buf;
- char srcpath[MAXPGPATH];
- int srcfd;
-
- snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir_source, path);
-
- srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
- if (srcfd < 0)
- pg_fatal("could not open source file \"%s\": %m",
- srcpath);
-
- if (lseek(srcfd, begin, SEEK_SET) == -1)
- pg_fatal("could not seek in source file: %m");
-
- open_target_file(path, trunc);
-
- while (end - begin > 0)
- {
- int readlen;
- int len;
-
- if (end - begin > sizeof(buf))
- len = sizeof(buf);
- else
- len = end - begin;
-
- readlen = read(srcfd, buf.data, len);
-
- if (readlen < 0)
- pg_fatal("could not read file \"%s\": %m",
- srcpath);
- else if (readlen == 0)
- pg_fatal("unexpected EOF while reading file \"%s\"", srcpath);
-
- write_target_range(buf.data, begin, readlen);
- begin += readlen;
- }
-
- if (close(srcfd) != 0)
- pg_fatal("could not close file \"%s\": %m", srcpath);
-}
-
-/*
- * Copy all relation data files from datadir_source to datadir_target, which
- * are marked in the given data page map.
- */
-void
-copy_executeFileMap(filemap_t *map)
-{
- file_entry_t *entry;
- int i;
-
- for (i = 0; i < map->nentries; i++)
- {
- entry = map->entries[i];
- execute_pagemap(&entry->target_pages_to_overwrite, entry->path);
-
- switch (entry->action)
- {
- case FILE_ACTION_NONE:
- /* ok, do nothing.. */
- break;
-
- case FILE_ACTION_COPY:
- rewind_copy_file_range(entry->path, 0, entry->source_size, true);
- break;
-
- case FILE_ACTION_TRUNCATE:
- truncate_target_file(entry->path, entry->source_size);
- break;
-
- case FILE_ACTION_COPY_TAIL:
- rewind_copy_file_range(entry->path, entry->target_size,
- entry->source_size, false);
- break;
-
- case FILE_ACTION_CREATE:
- create_target(entry);
- break;
-
- case FILE_ACTION_REMOVE:
- remove_target(entry);
- break;
-
- case FILE_ACTION_UNDECIDED:
- pg_fatal("no action decided for \"%s\"", entry->path);
- break;
- }
- }
-
- close_target_file();
-}
-
-static void
-execute_pagemap(datapagemap_t *pagemap, const char *path)
-{
- datapagemap_iterator_t *iter;
- BlockNumber blkno;
- off_t offset;
-
- iter = datapagemap_iterate(pagemap);
- while (datapagemap_next(iter, &blkno))
- {
- offset = blkno * BLCKSZ;
- rewind_copy_file_range(path, offset, offset + BLCKSZ, false);
- /* Ok, this block has now been copied from new data dir to old */
- }
- pg_free(iter);
-}
diff --git a/src/bin/pg_rewind/fetch.c b/src/bin/pg_rewind/fetch.c
deleted file mode 100644
index f41d0f295ea..00000000000
--- a/src/bin/pg_rewind/fetch.c
+++ /dev/null
@@ -1,60 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * fetch.c
- * Functions for fetching files from a local or remote data dir
- *
- * This file forms an abstraction of getting files from the "source".
- * There are two implementations of this interface: one for copying files
- * from a data directory via normal filesystem operations (copy_fetch.c),
- * and another for fetching files from a remote server via a libpq
- * connection (libpq_fetch.c)
- *
- *
- * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres_fe.h"
-
-#include <sys/stat.h>
-#include <unistd.h>
-
-#include "fetch.h"
-#include "file_ops.h"
-#include "filemap.h"
-#include "pg_rewind.h"
-
-void
-fetchSourceFileList(void)
-{
- if (datadir_source)
- traverse_datadir(datadir_source, &process_source_file);
- else
- libpqProcessFileList();
-}
-
-/*
- * Fetch all relation data files that are marked in the given data page map.
- */
-void
-execute_file_actions(filemap_t *filemap)
-{
- if (datadir_source)
- copy_executeFileMap(filemap);
- else
- libpq_executeFileMap(filemap);
-}
-
-/*
- * Fetch a single file into a malloc'd buffer. The file size is returned
- * in *filesize. The returned buffer is always zero-terminated, which is
- * handy for text files.
- */
-char *
-fetchFile(const char *filename, size_t *filesize)
-{
- if (datadir_source)
- return slurpFile(datadir_source, filename, filesize);
- else
- return libpqGetFile(filename, filesize);
-}
diff --git a/src/bin/pg_rewind/fetch.h b/src/bin/pg_rewind/fetch.h
deleted file mode 100644
index b20df8b1537..00000000000
--- a/src/bin/pg_rewind/fetch.h
+++ /dev/null
@@ -1,44 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * fetch.h
- * Fetching data from a local or remote data directory.
- *
- * This file includes the prototypes for functions used to copy files from
- * one data directory to another. The source to copy from can be a local
- * directory (copy method), or a remote PostgreSQL server (libpq fetch
- * method).
- *
- * Copyright (c) 2013-2020, PostgreSQL Global Development Group
- *
- *-------------------------------------------------------------------------
- */
-#ifndef FETCH_H
-#define FETCH_H
-
-#include "access/xlogdefs.h"
-
-#include "filemap.h"
-
-/*
- * Common interface. Calls the copy or libpq method depending on global
- * config options.
- */
-extern void fetchSourceFileList(void);
-extern char *fetchFile(const char *filename, size_t *filesize);
-extern void execute_file_actions(filemap_t *filemap);
-
-/* in libpq_fetch.c */
-extern void libpqProcessFileList(void);
-extern char *libpqGetFile(const char *filename, size_t *filesize);
-extern void libpq_executeFileMap(filemap_t *map);
-
-extern void libpqConnect(const char *connstr);
-extern XLogRecPtr libpqGetCurrentXlogInsertLocation(void);
-
-/* in copy_fetch.c */
-extern void copy_executeFileMap(filemap_t *map);
-
-typedef void (*process_file_callback_t) (const char *path, file_type_t type, size_t size, const char *link_target);
-extern void traverse_datadir(const char *datadir, process_file_callback_t callback);
-
-#endif /* FETCH_H */
diff --git a/src/bin/pg_rewind/file_ops.c b/src/bin/pg_rewind/file_ops.c
index ec37d0b2e0d..065368a2208 100644
--- a/src/bin/pg_rewind/file_ops.c
+++ b/src/bin/pg_rewind/file_ops.c
@@ -15,6 +15,7 @@
#include "postgres_fe.h"
#include <sys/stat.h>
+#include <dirent.h>
#include <fcntl.h>
#include <unistd.h>
@@ -35,6 +36,9 @@ static void remove_target_dir(const char *path);
static void create_target_symlink(const char *path, const char *link);
static void remove_target_symlink(const char *path);
+static void recurse_dir(const char *datadir, const char *parentpath,
+ process_file_callback_t callback);
+
/*
* Open a target file for writing. If 'trunc' is true and the file already
* exists, it will be truncated.
@@ -83,7 +87,7 @@ close_target_file(void)
void
write_target_range(char *buf, off_t begin, size_t size)
{
- int writeleft;
+ size_t writeleft;
char *p;
/* update progress report */
@@ -101,7 +105,7 @@ write_target_range(char *buf, off_t begin, size_t size)
p = buf;
while (writeleft > 0)
{
- int writelen;
+ ssize_t writelen;
errno = 0;
writelen = write(dstfd, p, writeleft);
@@ -305,9 +309,6 @@ sync_target_dir(void)
* buffer is actually *filesize + 1. That's handy when reading a text file.
* This function can be used to read binary files as well, you can just
* ignore the zero-terminator in that case.
- *
- * This function is used to implement the fetchFile function in the "fetch"
- * interface (see fetch.c), but is also called directly.
*/
char *
slurpFile(const char *datadir, const char *path, size_t *filesize)
@@ -352,3 +353,125 @@ slurpFile(const char *datadir, const char *path, size_t *filesize)
*filesize = len;
return buffer;
}
+
+/*
+ * Traverse through all files in a data directory, calling 'callback'
+ * for each file.
+ */
+void
+traverse_datadir(const char *datadir, process_file_callback_t callback)
+{
+ recurse_dir(datadir, NULL, callback);
+}
+
+/*
+ * recursive part of traverse_datadir
+ *
+ * parentpath is the current subdirectory's path relative to datadir,
+ * or NULL at the top level.
+ */
+static void
+recurse_dir(const char *datadir, const char *parentpath,
+ process_file_callback_t callback)
+{
+ DIR *xldir;
+ struct dirent *xlde;
+ char fullparentpath[MAXPGPATH];
+
+ if (parentpath)
+ snprintf(fullparentpath, MAXPGPATH, "%s/%s", datadir, parentpath);
+ else
+ snprintf(fullparentpath, MAXPGPATH, "%s", datadir);
+
+ xldir = opendir(fullparentpath);
+ if (xldir == NULL)
+ pg_fatal("could not open directory \"%s\": %m",
+ fullparentpath);
+
+ while (errno = 0, (xlde = readdir(xldir)) != NULL)
+ {
+ struct stat fst;
+ char fullpath[MAXPGPATH * 2];
+ char path[MAXPGPATH * 2];
+
+ if (strcmp(xlde->d_name, ".") == 0 ||
+ strcmp(xlde->d_name, "..") == 0)
+ continue;
+
+ snprintf(fullpath, sizeof(fullpath), "%s/%s", fullparentpath, xlde->d_name);
+
+ if (lstat(fullpath, &fst) < 0)
+ {
+ if (errno == ENOENT)
+ {
+ /*
+ * File doesn't exist anymore. This is ok, if the new primary
+ * is running and the file was just removed. If it was a data
+ * file, there should be a WAL record of the removal. If it
+ * was something else, it couldn't have been anyway.
+ *
+ * TODO: But complain if we're processing the target dir!
+ */
+ }
+ else
+ pg_fatal("could not stat file \"%s\": %m",
+ fullpath);
+ }
+
+ if (parentpath)
+ snprintf(path, sizeof(path), "%s/%s", parentpath, xlde->d_name);
+ else
+ snprintf(path, sizeof(path), "%s", xlde->d_name);
+
+ if (S_ISREG(fst.st_mode))
+ callback(path, FILE_TYPE_REGULAR, fst.st_size, NULL);
+ else if (S_ISDIR(fst.st_mode))
+ {
+ callback(path, FILE_TYPE_DIRECTORY, 0, NULL);
+ /* recurse to handle subdirectories */
+ recurse_dir(datadir, path, callback);
+ }
+#ifndef WIN32
+ else if (S_ISLNK(fst.st_mode))
+#else
+ else if (pgwin32_is_junction(fullpath))
+#endif
+ {
+#if defined(HAVE_READLINK) || defined(WIN32)
+ char link_target[MAXPGPATH];
+ int len;
+
+ len = readlink(fullpath, link_target, sizeof(link_target));
+ if (len < 0)
+ pg_fatal("could not read symbolic link \"%s\": %m",
+ fullpath);
+ if (len >= sizeof(link_target))
+ pg_fatal("symbolic link \"%s\" target is too long",
+ fullpath);
+ link_target[len] = '\0';
+
+ callback(path, FILE_TYPE_SYMLINK, 0, link_target);
+
+ /*
+ * If it's a symlink within pg_tblspc, we need to recurse into it,
+ * to process all the tablespaces. We also follow a symlink if
+ * it's for pg_wal. Symlinks elsewhere are ignored.
+ */
+ if ((parentpath && strcmp(parentpath, "pg_tblspc") == 0) ||
+ strcmp(path, "pg_wal") == 0)
+ recurse_dir(datadir, path, callback);
+#else
+ pg_fatal("\"%s\" is a symbolic link, but symbolic links are not supported on this platform",
+ fullpath);
+#endif /* HAVE_READLINK */
+ }
+ }
+
+ if (errno)
+ pg_fatal("could not read directory \"%s\": %m",
+ fullparentpath);
+
+ if (closedir(xldir))
+ pg_fatal("could not close directory \"%s\": %m",
+ fullparentpath);
+}
diff --git a/src/bin/pg_rewind/file_ops.h b/src/bin/pg_rewind/file_ops.h
index d8466385cf5..c7630859768 100644
--- a/src/bin/pg_rewind/file_ops.h
+++ b/src/bin/pg_rewind/file_ops.h
@@ -23,4 +23,7 @@ extern void sync_target_dir(void);
extern char *slurpFile(const char *datadir, const char *path, size_t *filesize);
+typedef void (*process_file_callback_t) (const char *path, file_type_t type, size_t size, const char *link_target);
+extern void traverse_datadir(const char *datadir, process_file_callback_t callback);
+
#endif /* FILE_OPS_H */
diff --git a/src/bin/pg_rewind/libpq_fetch.c b/src/bin/pg_rewind/libpq_source.c
index 16d451ae167..c73e8bf4704 100644
--- a/src/bin/pg_rewind/libpq_fetch.c
+++ b/src/bin/pg_rewind/libpq_source.c
@@ -1,7 +1,7 @@
/*-------------------------------------------------------------------------
*
- * libpq_fetch.c
- * Functions for fetching files from a remote server.
+ * libpq_source.c
+ * Functions for fetching files from a remote server via libpq.
*
* Copyright (c) 2013-2020, PostgreSQL Global Development Group
*
@@ -9,21 +9,14 @@
*/
#include "postgres_fe.h"
-#include <sys/stat.h>
-#include <dirent.h>
-#include <fcntl.h>
-#include <unistd.h>
-
#include "catalog/pg_type_d.h"
#include "common/connect.h"
#include "datapagemap.h"
-#include "fetch.h"
#include "file_ops.h"
#include "filemap.h"
#include "pg_rewind.h"
#include "port/pg_bswap.h"
-
-PGconn *conn = NULL;
+#include "rewind_source.h"
/*
* Files are fetched max CHUNKSIZE bytes at a time.
@@ -34,30 +27,71 @@ PGconn *conn = NULL;
*/
#define CHUNKSIZE 1000000
-static void receiveFileChunks(const char *sql);
-static void execute_pagemap(datapagemap_t *pagemap, const char *path);
-static char *run_simple_query(const char *sql);
-static void run_simple_command(const char *sql);
+typedef struct
+{
+ rewind_source common; /* common interface functions */
+
+ PGconn *conn;
+ bool copy_started;
+} libpq_source;
+
+static void init_libpq_conn(PGconn *conn);
+static char *run_simple_query(PGconn *conn, const char *sql);
+static void run_simple_command(PGconn *conn, const char *sql);
+
+/* public interface functions */
+static void libpq_traverse_files(rewind_source *source,
+ process_file_callback_t callback);
+static void libpq_queue_fetch_range(rewind_source *source, const char *path,
+ off_t off, size_t len);
+static void libpq_finish_fetch(rewind_source *source);
+static char *libpq_fetch_file(rewind_source *source, const char *path,
+ size_t *filesize);
+static XLogRecPtr libpq_get_current_wal_insert_lsn(rewind_source *source);
+static void libpq_destroy(rewind_source *source);
-void
-libpqConnect(const char *connstr)
+/*
+ * Create a new libpq source.
+ *
+ * The caller has already established the connection, but should not try
+ * to use it while the source is active.
+ */
+rewind_source *
+init_libpq_source(PGconn *conn)
{
- char *str;
- PGresult *res;
+ libpq_source *src;
+
+ init_libpq_conn(conn);
+
+ src = pg_malloc0(sizeof(libpq_source));
+
+ src->common.traverse_files = libpq_traverse_files;
+ src->common.fetch_file = libpq_fetch_file;
+ src->common.queue_fetch_range = libpq_queue_fetch_range;
+ src->common.finish_fetch = libpq_finish_fetch;
+ src->common.get_current_wal_insert_lsn = libpq_get_current_wal_insert_lsn;
+ src->common.destroy = libpq_destroy;
- conn = PQconnectdb(connstr);
- if (PQstatus(conn) == CONNECTION_BAD)
- pg_fatal("could not connect to server: %s",
- PQerrorMessage(conn));
+ src->conn = conn;
- if (showprogress)
- pg_log_info("connected to server");
+ return &src->common;
+}
+
+/*
+ * Initialize a libpq connection for use.
+ */
+static void
+init_libpq_conn(PGconn *conn)
+{
+ PGresult *res;
+ char *str;
/* disable all types of timeouts */
- run_simple_command("SET statement_timeout = 0");
- run_simple_command("SET lock_timeout = 0");
- run_simple_command("SET idle_in_transaction_session_timeout = 0");
+ run_simple_command(conn, "SET statement_timeout = 0");
+ run_simple_command(conn, "SET lock_timeout = 0");
+ run_simple_command(conn, "SET idle_in_transaction_session_timeout = 0");
+ /* secure search_path */
res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("could not clear search_path: %s",
@@ -70,7 +104,7 @@ libpqConnect(const char *connstr)
* currently because we use a temporary table. Better to check for it
* explicitly than error out, for a better error message.
*/
- str = run_simple_query("SELECT pg_is_in_recovery()");
+ str = run_simple_query(conn, "SELECT pg_is_in_recovery()");
if (strcmp(str, "f") != 0)
pg_fatal("source server must not be in recovery mode");
pg_free(str);
@@ -80,27 +114,19 @@ libpqConnect(const char *connstr)
* a page is modified while we read it with pg_read_binary_file(), and we
* rely on full page images to fix them.
*/
- str = run_simple_query("SHOW full_page_writes");
+ str = run_simple_query(conn, "SHOW full_page_writes");
if (strcmp(str, "on") != 0)
pg_fatal("full_page_writes must be enabled in the source server");
pg_free(str);
-
- /*
- * Although we don't do any "real" updates, we do work with a temporary
- * table. We don't care about synchronous commit for that. It doesn't
- * otherwise matter much, but if the server is using synchronous
- * replication, and replication isn't working for some reason, we don't
- * want to get stuck, waiting for it to start working again.
- */
- run_simple_command("SET synchronous_commit = off");
}
/*
- * Runs a query that returns a single value.
+ * Run a query that returns a single value.
+ *
* The result should be pg_free'd after use.
*/
static char *
-run_simple_query(const char *sql)
+run_simple_query(PGconn *conn, const char *sql)
{
PGresult *res;
char *result;
@@ -123,11 +149,12 @@ run_simple_query(const char *sql)
}
/*
- * Runs a command.
+ * Run a command.
+ *
* In the event of a failure, exit immediately.
*/
static void
-run_simple_command(const char *sql)
+run_simple_command(PGconn *conn, const char *sql)
{
PGresult *res;
@@ -141,17 +168,18 @@ run_simple_command(const char *sql)
}
/*
- * Calls pg_current_wal_insert_lsn() function
+ * Call the pg_current_wal_insert_lsn() function in the remote system.
*/
-XLogRecPtr
-libpqGetCurrentXlogInsertLocation(void)
+static XLogRecPtr
+libpq_get_current_wal_insert_lsn(rewind_source *source)
{
+ PGconn *conn = ((libpq_source *) source)->conn;
XLogRecPtr result;
uint32 hi;
uint32 lo;
char *val;
- val = run_simple_query("SELECT pg_current_wal_insert_lsn()");
+ val = run_simple_query(conn, "SELECT pg_current_wal_insert_lsn()");
if (sscanf(val, "%X/%X", &hi, &lo) != 2)
pg_fatal("unrecognized result \"%s\" for current WAL insert location", val);
@@ -166,9 +194,10 @@ libpqGetCurrentXlogInsertLocation(void)
/*
* Get a list of all files in the data directory.
*/
-void
-libpqProcessFileList(void)
+static void
+libpq_traverse_files(rewind_source *source, process_file_callback_t callback)
{
+ PGconn *conn = ((libpq_source *) source)->conn;
PGresult *res;
const char *sql;
int i;
@@ -246,30 +275,114 @@ libpqProcessFileList(void)
PQclear(res);
}
-/*----
- * Runs a query, which returns pieces of files from the remote source data
- * directory, and overwrites the corresponding parts of target files with
- * the received parts. The result set is expected to be of format:
- *
- * path text -- path in the data directory, e.g "base/1/123"
- * begin int8 -- offset within the file
- * chunk bytea -- file content
- *----
+/*
+ * Queue up a request to fetch a piece of a file from remote system.
*/
static void
-receiveFileChunks(const char *sql)
+libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off,
+ size_t len)
{
+ libpq_source *src = (libpq_source *) source;
+ uint64 begin = off;
+ uint64 end = off + len;
+
+ /*
+ * On first call, create a temporary table, and start COPYing to it.
+ * We will load it with the list of blocks that we need to fetch.
+ */
+ if (!src->copy_started)
+ {
+ PGresult *res;
+
+ run_simple_command(src->conn, "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4)");
+
+ res = PQexec(src->conn, "COPY fetchchunks FROM STDIN");
+ if (PQresultStatus(res) != PGRES_COPY_IN)
+ pg_fatal("could not send file list: %s",
+ PQresultErrorMessage(res));
+ PQclear(res);
+
+ src->copy_started = true;
+ }
+
+ /*
+ * Write the file range to a temporary table in the server.
+ *
+ * The range is sent to the server as a COPY formatted line, to be inserted
+ * into the 'fetchchunks' temporary table. The libpq_finish_fetch() uses
+ * the temporary table to actually fetch the data.
+ */
+
+ /* Split the range into CHUNKSIZE chunks */
+ while (end - begin > 0)
+ {
+ char linebuf[MAXPGPATH + 23];
+ unsigned int len;
+
+ /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
+ if (end - begin > CHUNKSIZE)
+ len = CHUNKSIZE;
+ else
+ len = (unsigned int) (end - begin);
+
+ snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
+
+ if (PQputCopyData(src->conn, linebuf, strlen(linebuf)) != 1)
+ pg_fatal("could not send COPY data: %s",
+ PQerrorMessage(src->conn));
+
+ begin += len;
+ }
+}
+
+/*
+ * Receive all the queued chunks and write them to the target data directory.
+ */
+static void
+libpq_finish_fetch(rewind_source *source)
+{
+ libpq_source *src = (libpq_source *) source;
PGresult *res;
+ const char *sql;
- if (PQsendQueryParams(conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
- pg_fatal("could not send query: %s", PQerrorMessage(conn));
+ if (PQputCopyEnd(src->conn, NULL) != 1)
+ pg_fatal("could not send end-of-COPY: %s",
+ PQerrorMessage(src->conn));
+
+ while ((res = PQgetResult(src->conn)) != NULL)
+ {
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("unexpected result while sending file list: %s",
+ PQresultErrorMessage(res));
+ PQclear(res);
+ }
+
+ /*
+ * We've now copied the list of file ranges that we need to fetch to the
+ * temporary table. Now, actually fetch all of those ranges.
+ */
+ sql =
+ "SELECT path, begin,\n"
+ " pg_read_binary_file(path, begin, len, true) AS chunk\n"
+ "FROM fetchchunks\n";
+
+ if (PQsendQueryParams(src->conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
+ pg_fatal("could not send query: %s", PQerrorMessage(src->conn));
pg_log_debug("getting file chunks");
- if (PQsetSingleRowMode(conn) != 1)
+ if (PQsetSingleRowMode(src->conn) != 1)
pg_fatal("could not set libpq connection to single row mode");
- while ((res = PQgetResult(conn)) != NULL)
+ /*----
+ * The result set is of format:
+ *
+ * path text -- path in the data directory, e.g "base/1/123"
+ * begin int8 -- offset within the file
+ * chunk bytea -- file content
+ *----
+ */
+ while ((res = PQgetResult(src->conn)) != NULL)
{
char *filename;
int filenamelen;
@@ -349,8 +462,8 @@ receiveFileChunks(const char *sql)
continue;
}
- pg_log_debug("received chunk for file \"%s\", offset %lld, size %d",
- filename, (long long int) chunkoff, chunksize);
+ pg_log_debug("received chunk for file \"%s\", offset " INT64_FORMAT ", size %d",
+ filename, chunkoff, chunksize);
open_target_file(filename, false);
@@ -363,28 +476,29 @@ receiveFileChunks(const char *sql)
}
/*
- * Receive a single file as a malloc'd buffer.
+ * Fetch a single file as a malloc'd buffer.
*/
-char *
-libpqGetFile(const char *filename, size_t *filesize)
+static char *
+libpq_fetch_file(rewind_source *source, const char *path, size_t *filesize)
{
+ PGconn *conn = ((libpq_source *) source)->conn;
PGresult *res;
char *result;
int len;
const char *paramValues[1];
- paramValues[0] = filename;
+ paramValues[0] = path;
res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
1, NULL, paramValues, NULL, NULL, 1);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("could not fetch remote file \"%s\": %s",
- filename, PQresultErrorMessage(res));
+ path, PQresultErrorMessage(res));
/* sanity check the result set */
if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
pg_fatal("unexpected result set while fetching remote file \"%s\"",
- filename);
+ path);
/* Read result to local variables */
len = PQgetlength(res, 0, 0);
@@ -394,7 +508,7 @@ libpqGetFile(const char *filename, size_t *filesize)
PQclear(res);
- pg_log_debug("fetched file \"%s\", length %d", filename, len);
+ pg_log_debug("fetched file \"%s\", length %d", path, len);
if (filesize)
*filesize = len;
@@ -402,142 +516,11 @@ libpqGetFile(const char *filename, size_t *filesize)
}
/*
- * Write a file range to a temporary table in the server.
- *
- * The range is sent to the server as a COPY formatted line, to be inserted
- * into the 'fetchchunks' temporary table. It is used in receiveFileChunks()
- * function to actually fetch the data.
- */
-static void
-fetch_file_range(const char *path, uint64 begin, uint64 end)
-{
- char linebuf[MAXPGPATH + 23];
-
- /* Split the range into CHUNKSIZE chunks */
- while (end - begin > 0)
- {
- unsigned int len;
-
- /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
- if (end - begin > CHUNKSIZE)
- len = CHUNKSIZE;
- else
- len = (unsigned int) (end - begin);
-
- snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
-
- if (PQputCopyData(conn, linebuf, strlen(linebuf)) != 1)
- pg_fatal("could not send COPY data: %s",
- PQerrorMessage(conn));
-
- begin += len;
- }
-}
-
-/*
- * Fetch all changed blocks from remote source data directory.
+ * Close a libpq source.
*/
-void
-libpq_executeFileMap(filemap_t *map)
-{
- file_entry_t *entry;
- const char *sql;
- PGresult *res;
- int i;
-
- /*
- * First create a temporary table, and load it with the blocks that we
- * need to fetch.
- */
- sql = "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4);";
- run_simple_command(sql);
-
- sql = "COPY fetchchunks FROM STDIN";
- res = PQexec(conn, sql);
-
- if (PQresultStatus(res) != PGRES_COPY_IN)
- pg_fatal("could not send file list: %s",
- PQresultErrorMessage(res));
- PQclear(res);
-
- for (i = 0; i < map->nentries; i++)
- {
- entry = map->entries[i];
-
- /* If this is a relation file, copy the modified blocks */
- execute_pagemap(&entry->target_pages_to_overwrite, entry->path);
-
- switch (entry->action)
- {
- case FILE_ACTION_NONE:
- /* nothing else to do */
- break;
-
- case FILE_ACTION_COPY:
- /* Truncate the old file out of the way, if any */
- open_target_file(entry->path, true);
- fetch_file_range(entry->path, 0, entry->source_size);
- break;
-
- case FILE_ACTION_TRUNCATE:
- truncate_target_file(entry->path, entry->source_size);
- break;
-
- case FILE_ACTION_COPY_TAIL:
- fetch_file_range(entry->path, entry->target_size, entry->source_size);
- break;
-
- case FILE_ACTION_REMOVE:
- remove_target(entry);
- break;
-
- case FILE_ACTION_CREATE:
- create_target(entry);
- break;
-
- case FILE_ACTION_UNDECIDED:
- pg_fatal("no action decided for \"%s\"", entry->path);
- break;
- }
- }
-
- if (PQputCopyEnd(conn, NULL) != 1)
- pg_fatal("could not send end-of-COPY: %s",
- PQerrorMessage(conn));
-
- while ((res = PQgetResult(conn)) != NULL)
- {
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pg_fatal("unexpected result while sending file list: %s",
- PQresultErrorMessage(res));
- PQclear(res);
- }
-
- /*
- * We've now copied the list of file ranges that we need to fetch to the
- * temporary table. Now, actually fetch all of those ranges.
- */
- sql =
- "SELECT path, begin,\n"
- " pg_read_binary_file(path, begin, len, true) AS chunk\n"
- "FROM fetchchunks\n";
-
- receiveFileChunks(sql);
-}
-
static void
-execute_pagemap(datapagemap_t *pagemap, const char *path)
+libpq_destroy(rewind_source *source)
{
- datapagemap_iterator_t *iter;
- BlockNumber blkno;
- off_t offset;
-
- iter = datapagemap_iterate(pagemap);
- while (datapagemap_next(iter, &blkno))
- {
- offset = blkno * BLCKSZ;
-
- fetch_file_range(path, offset, offset + BLCKSZ);
- }
- pg_free(iter);
+ pfree(source);
+ /* NOTE: we don't close the connection here, as it was not opened by us. */
}
diff --git a/src/bin/pg_rewind/local_source.c b/src/bin/pg_rewind/local_source.c
new file mode 100644
index 00000000000..fa1b6e80ec3
--- /dev/null
+++ b/src/bin/pg_rewind/local_source.c
@@ -0,0 +1,131 @@
+/*-------------------------------------------------------------------------
+ *
+ * local_source.c
+ * Functions for using a local data directory as the source.
+ *
+ * Portions Copyright (c) 2013-2020, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres_fe.h"
+
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "datapagemap.h"
+#include "file_ops.h"
+#include "filemap.h"
+#include "pg_rewind.h"
+#include "rewind_source.h"
+
+typedef struct
+{
+ rewind_source common; /* common interface functions */
+
+ const char *datadir; /* path to the source data directory */
+} local_source;
+
+static void local_traverse_files(rewind_source *source,
+ process_file_callback_t callback);
+static char *local_fetch_file(rewind_source *source, const char *path,
+ size_t *filesize);
+static void local_fetch_file_range(rewind_source *source, const char *path,
+ off_t off, size_t len);
+static void local_finish_fetch(rewind_source *source);
+static void local_destroy(rewind_source *source);
+
+rewind_source *
+init_local_source(const char *datadir)
+{
+ local_source *src;
+
+ src = pg_malloc0(sizeof(local_source));
+
+ src->common.traverse_files = local_traverse_files;
+ src->common.fetch_file = local_fetch_file;
+ src->common.queue_fetch_range = local_fetch_file_range;
+ src->common.finish_fetch = local_finish_fetch;
+ src->common.get_current_wal_insert_lsn = NULL;
+ src->common.destroy = local_destroy;
+
+ src->datadir = datadir;
+
+ return &src->common;
+}
+
+static void
+local_traverse_files(rewind_source *source, process_file_callback_t callback)
+{
+ traverse_datadir(((local_source *) source)->datadir, &process_source_file);
+}
+
+static char *
+local_fetch_file(rewind_source *source, const char *path, size_t *filesize)
+{
+ return slurpFile(((local_source *) source)->datadir, path, filesize);
+}
+
+/*
+ * Copy a file from source to target, starting at 'off', for 'len' bytes.
+ */
+static void
+local_fetch_file_range(rewind_source *source, const char *path, off_t off,
+ size_t len)
+{
+ const char *datadir = ((local_source *) source)->datadir;
+ PGAlignedBlock buf;
+ char srcpath[MAXPGPATH];
+ int srcfd;
+ off_t begin = off;
+ off_t end = off + len;
+
+ snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
+
+ srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
+ if (srcfd < 0)
+ pg_fatal("could not open source file \"%s\": %m",
+ srcpath);
+
+ if (lseek(srcfd, begin, SEEK_SET) == -1)
+ pg_fatal("could not seek in source file: %m");
+
+ open_target_file(path, false);
+
+ while (end - begin > 0)
+ {
+ ssize_t readlen;
+ size_t len;
+
+ if (end - begin > sizeof(buf))
+ len = sizeof(buf);
+ else
+ len = end - begin;
+
+ readlen = read(srcfd, buf.data, len);
+
+ if (readlen < 0)
+ pg_fatal("could not read file \"%s\": %m", srcpath);
+ else if (readlen == 0)
+ pg_fatal("unexpected EOF while reading file \"%s\"", srcpath);
+
+ write_target_range(buf.data, begin, readlen);
+ begin += readlen;
+ }
+
+ if (close(srcfd) != 0)
+ pg_fatal("could not close file \"%s\": %m", srcpath);
+}
+
+static void
+local_finish_fetch(rewind_source *source)
+{
+ /*
+ * Nothing to do, local_fetch_file_range() copies the ranges immediately.
+ */
+}
+
+static void
+local_destroy(rewind_source *source)
+{
+ pfree(source);
+}
diff --git a/src/bin/pg_rewind/pg_rewind.c b/src/bin/pg_rewind/pg_rewind.c
index 574d7f7163b..421a45ef5b1 100644
--- a/src/bin/pg_rewind/pg_rewind.c
+++ b/src/bin/pg_rewind/pg_rewind.c
@@ -23,20 +23,25 @@
#include "common/restricted_token.h"
#include "common/string.h"
#include "fe_utils/recovery_gen.h"
-#include "fetch.h"
#include "file_ops.h"
#include "filemap.h"
#include "getopt_long.h"
#include "pg_rewind.h"
+#include "rewind_source.h"
#include "storage/bufpage.h"
static void usage(const char *progname);
+static void perform_rewind(filemap_t *filemap, rewind_source *source,
+ XLogRecPtr chkptrec,
+ TimeLineID chkpttli,
+ XLogRecPtr chkptredo);
+
static void createBackupLabel(XLogRecPtr startpoint, TimeLineID starttli,
XLogRecPtr checkpointloc);
-static void digestControlFile(ControlFileData *ControlFile, char *source,
- size_t size);
+static void digestControlFile(ControlFileData *ControlFile,
+ const char *content, size_t size);
static void getRestoreCommand(const char *argv0);
static void sanityChecks(void);
static void findCommonAncestorTimeline(XLogRecPtr *recptr, int *tliIndex);
@@ -69,6 +74,8 @@ int targetNentries;
uint64 fetch_size;
uint64 fetch_done;
+static PGconn *conn;
+static rewind_source *source;
static void
usage(const char *progname)
@@ -125,9 +132,6 @@ main(int argc, char **argv)
char *buffer;
bool no_ensure_shutdown = false;
bool rewind_needed;
- XLogRecPtr endrec;
- TimeLineID endtli;
- ControlFileData ControlFile_new;
bool writerecoveryconf = false;
filemap_t *filemap;
@@ -269,19 +273,29 @@ main(int argc, char **argv)
atexit(disconnect_atexit);
- /* Connect to remote server */
- if (connstr_source)
- libpqConnect(connstr_source);
-
/*
- * Ok, we have all the options and we're ready to start. Read in all the
- * information we need from both clusters.
+ * Ok, we have all the options and we're ready to start. First, connect to
+ * remote server.
*/
- buffer = slurpFile(datadir_target, "global/pg_control", &size);
- digestControlFile(&ControlFile_target, buffer, size);
- pg_free(buffer);
+ if (connstr_source)
+ {
+ conn = PQconnectdb(connstr_source);
+
+ if (PQstatus(conn) == CONNECTION_BAD)
+ pg_fatal("could not connect to server: %s",
+ PQerrorMessage(conn));
+
+ if (showprogress)
+ pg_log_info("connected to server");
+
+ source = init_libpq_source(conn);
+ }
+ else
+ source = init_local_source(datadir_source);
/*
+ * Check the status of the target instance.
+ *
* If the target instance was not cleanly shut down, start and stop the
* target cluster once in single-user mode to enforce recovery to finish,
* ensuring that the cluster can be used by pg_rewind. Note that if
@@ -289,6 +303,10 @@ main(int argc, char **argv)
* need to make sure by themselves that the target cluster is in a clean
* state.
*/
+ buffer = slurpFile(datadir_target, "global/pg_control", &size);
+ digestControlFile(&ControlFile_target, buffer, size);
+ pg_free(buffer);
+
if (!no_ensure_shutdown &&
ControlFile_target.state != DB_SHUTDOWNED &&
ControlFile_target.state != DB_SHUTDOWNED_IN_RECOVERY)
@@ -300,17 +318,20 @@ main(int argc, char **argv)
pg_free(buffer);
}
- buffer = fetchFile("global/pg_control", &size);
+ buffer = source->fetch_file(source, "global/pg_control", &size);
digestControlFile(&ControlFile_source, buffer, size);
pg_free(buffer);
sanityChecks();
/*
+ * Find the common ancestor timeline between the clusters.
+ *
* If both clusters are already on the same timeline, there's nothing to
* do.
*/
- if (ControlFile_target.checkPointCopy.ThisTimeLineID == ControlFile_source.checkPointCopy.ThisTimeLineID)
+ if (ControlFile_target.checkPointCopy.ThisTimeLineID ==
+ ControlFile_source.checkPointCopy.ThisTimeLineID)
{
pg_log_info("source and target cluster are on the same timeline");
rewind_needed = false;
@@ -373,11 +394,11 @@ main(int argc, char **argv)
filehash_init();
/*
- * Collect information about all files in the target and source systems.
+ * Collect information about all files in the both data directories.
*/
if (showprogress)
pg_log_info("reading source file list");
- fetchSourceFileList();
+ source->traverse_files(source, &process_source_file);
if (showprogress)
pg_log_info("reading target file list");
@@ -421,11 +442,124 @@ main(int argc, char **argv)
}
/*
- * This is the point of no return. Once we start copying things, we have
- * modified the target directory and there is no turning back!
+ * We have now collected all the information we need from both systems,
+ * and we are ready to start modifying the target directory.
+ *
+ * This is the point of no return. Once we start copying things, there is
+ * no turning back!
*/
+ perform_rewind(filemap, source, chkptrec, chkpttli, chkptredo);
- execute_file_actions(filemap);
+ if (showprogress)
+ pg_log_info("syncing target data directory");
+ sync_target_dir();
+
+ /* Also update the standby configuration, if requested. */
+ if (writerecoveryconf && !dry_run)
+ WriteRecoveryConfig(conn, datadir_target,
+ GenerateRecoveryConfig(conn, NULL));
+
+ /* don't need the source connection anymore */
+ source->destroy(source);
+ if (conn)
+ {
+ PQfinish(conn);
+ conn = NULL;
+ }
+
+ pg_log_info("Done!");
+
+ return 0;
+}
+
+/*
+ * Perform the rewind.
+ *
+ * We have already collected all the information we need from the
+ * target and the source.
+ */
+static void
+perform_rewind(filemap_t *filemap, rewind_source *source,
+ XLogRecPtr chkptrec,
+ TimeLineID chkpttli,
+ XLogRecPtr chkptredo)
+{
+ XLogRecPtr endrec;
+ TimeLineID endtli;
+ ControlFileData ControlFile_new;
+
+ /*
+ * Execute the actions in the file map, fetching data from the source
+ * system as needed.
+ */
+ for (int i = 0; i < filemap->nentries; i++)
+ {
+ file_entry_t *entry = filemap->entries[i];
+
+ /*
+ * If this is a relation file, copy the modified blocks.
+ *
+ * This is in addition to any other changes.
+ */
+ if (entry->target_pages_to_overwrite.bitmapsize > 0)
+ {
+ datapagemap_iterator_t *iter;
+ BlockNumber blkno;
+ off_t offset;
+
+ iter = datapagemap_iterate(&entry->target_pages_to_overwrite);
+ while (datapagemap_next(iter, &blkno))
+ {
+ offset = blkno * BLCKSZ;
+ source->queue_fetch_range(source, entry->path, offset, BLCKSZ);
+ }
+ pg_free(iter);
+ }
+
+ switch (entry->action)
+ {
+ case FILE_ACTION_NONE:
+ /* nothing else to do */
+ break;
+
+ case FILE_ACTION_COPY:
+ /* Truncate the old file out of the way, if any */
+ open_target_file(entry->path, true);
+ source->queue_fetch_range(source, entry->path,
+ 0, entry->source_size);
+ break;
+
+ case FILE_ACTION_TRUNCATE:
+ truncate_target_file(entry->path, entry->source_size);
+ break;
+
+ case FILE_ACTION_COPY_TAIL:
+ source->queue_fetch_range(source, entry->path,
+ entry->target_size,
+ entry->source_size - entry->target_size);
+ break;
+
+ case FILE_ACTION_REMOVE:
+ remove_target(entry);
+ break;
+
+ case FILE_ACTION_CREATE:
+ create_target(entry);
+ break;
+
+ case FILE_ACTION_UNDECIDED:
+ pg_fatal("no action decided for \"%s\"", entry->path);
+ break;
+ }
+ }
+
+ /*
+ * We've now copied the list of file ranges that we need to fetch to the
+ * temporary table. Now, actually fetch all of those ranges.
+ */
+ source->finish_fetch(source);
+
+ close_target_file();
progress_report(true);
@@ -437,15 +571,15 @@ main(int argc, char **argv)
* Update control file of target. Make it ready to perform archive
* recovery when restarting.
*
- * minRecoveryPoint is set to the current WAL insert location in the
- * source server. Like in an online backup, it's important that we recover
- * all the WAL that was generated while we copied the files over.
+ * Like in an online backup, it's important that we replay all the WAL
+ * that was generated while we copied the files over. To enforce that, set
+ * 'minRecoveryPoint' in the control file.
*/
memcpy(&ControlFile_new, &ControlFile_source, sizeof(ControlFileData));
if (connstr_source)
{
- endrec = libpqGetCurrentXlogInsertLocation();
+ endrec = source->get_current_wal_insert_lsn(source);
endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
}
else
@@ -458,18 +592,6 @@ main(int argc, char **argv)
ControlFile_new.state = DB_IN_ARCHIVE_RECOVERY;
if (!dry_run)
update_controlfile(datadir_target, &ControlFile_new, do_sync);
-
- if (showprogress)
- pg_log_info("syncing target data directory");
- sync_target_dir();
-
- if (writerecoveryconf && !dry_run)
- WriteRecoveryConfig(conn, datadir_target,
- GenerateRecoveryConfig(conn, NULL));
-
- pg_log_info("Done!");
-
- return 0;
}
static void
@@ -629,7 +751,7 @@ getTimelineHistory(ControlFileData *controlFile, int *nentries)
/* Get history file from appropriate source */
if (controlFile == &ControlFile_source)
- histfile = fetchFile(path, NULL);
+ histfile = source->fetch_file(source, path, NULL);
else if (controlFile == &ControlFile_target)
histfile = slurpFile(datadir_target, path, NULL);
else
@@ -785,16 +907,18 @@ checkControlFile(ControlFileData *ControlFile)
}
/*
- * Verify control file contents in the buffer src, and copy it to *ControlFile.
+ * Verify control file contents in the buffer 'content', and copy it to
+ * *ControlFile.
*/
static void
-digestControlFile(ControlFileData *ControlFile, char *src, size_t size)
+digestControlFile(ControlFileData *ControlFile, const char *content,
+ size_t size)
{
if (size != PG_CONTROL_FILE_SIZE)
pg_fatal("unexpected control file size %d, expected %d",
(int) size, PG_CONTROL_FILE_SIZE);
- memcpy(ControlFile, src, sizeof(ControlFileData));
+ memcpy(ControlFile, content, sizeof(ControlFileData));
/* set and validate WalSegSz */
WalSegSz = ControlFile->xlog_seg_size;
diff --git a/src/bin/pg_rewind/pg_rewind.h b/src/bin/pg_rewind/pg_rewind.h
index 67f90c2a38c..0dc3dbd5255 100644
--- a/src/bin/pg_rewind/pg_rewind.h
+++ b/src/bin/pg_rewind/pg_rewind.h
@@ -20,8 +20,6 @@
/* Configuration options */
extern char *datadir_target;
-extern char *datadir_source;
-extern char *connstr_source;
extern bool showprogress;
extern bool dry_run;
extern bool do_sync;
@@ -31,9 +29,6 @@ extern int WalSegSz;
extern TimeLineHistoryEntry *targetHistory;
extern int targetNentries;
-/* general state */
-extern PGconn *conn;
-
/* Progress counters */
extern uint64 fetch_size;
extern uint64 fetch_done;
diff --git a/src/bin/pg_rewind/rewind_source.h b/src/bin/pg_rewind/rewind_source.h
new file mode 100644
index 00000000000..e87f239a47a
--- /dev/null
+++ b/src/bin/pg_rewind/rewind_source.h
@@ -0,0 +1,73 @@
+/*-------------------------------------------------------------------------
+ *
+ * rewind_source.h
+ * Abstraction for fetching from source server.
+ *
+ * The source server can be either a libpq connection to a live system,
+ * or a local data directory. The 'rewind_source' struct abstracts the
+ * operations to fetch data from the source system, so that the rest of
+ * the code doesn't need to care what kind of a source its dealing with.
+ *
+ * Copyright (c) 2013-2020, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef REWIND_SOURCE_H
+#define REWIND_SOURCE_H
+
+#include "access/xlogdefs.h"
+#include "file_ops.h"
+#include "filemap.h"
+#include "libpq-fe.h"
+
+typedef struct rewind_source
+{
+ /*
+ * Traverse all files in the source data directory, and call 'callback' on
+ * each file.
+ */
+ void (*traverse_files) (struct rewind_source *,
+ process_file_callback_t callback);
+
+ /*
+ * Fetch a single file into a malloc'd buffer. The file size is returned
+ * in *filesize. The returned buffer is always zero-terminated, which is
+ * handy for text files.
+ */
+ char *(*fetch_file) (struct rewind_source *, const char *path,
+ size_t *filesize);
+
+ /*
+ * Request to fetch (part of) a file in the source system, specified by an
+ * offset and length, and write it to the same offset in the corresponding
+ * target file. The source implementation may queue up the request and
+ * execute it later when convenient. Call finish_fetch() to flush the
+ * queue and execute all requests.
+ */
+ void (*queue_fetch_range) (struct rewind_source *, const char *path,
+ off_t offset, size_t len);
+
+ /*
+ * Execute all requests queued up with queue_fetch_range().
+ */
+ void (*finish_fetch) (struct rewind_source *);
+
+ /*
+ * Get the current WAL insert position in the source system.
+ */
+ XLogRecPtr (*get_current_wal_insert_lsn) (struct rewind_source *);
+
+ /*
+ * Free this rewind_source object.
+ */
+ void (*destroy) (struct rewind_source *);
+
+} rewind_source;
+
+/* in libpq_source.c */
+extern rewind_source *init_libpq_source(PGconn *conn);
+
+/* in local_source.c */
+extern rewind_source *init_local_source(const char *datadir);
+
+#endif /* FETCH_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index da3e5f73d0f..f2ba92be533 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2956,9 +2956,11 @@ f_smgr
fd_set
fe_scram_state
fe_scram_state_enum
+fetch_range_request
file_action_t
file_entry_t
file_type_t
+filehash_hash
filemap_t
fill_string_relopt
finalize_primnode_context
@@ -3084,11 +3086,13 @@ lclContext
lclTocEntry
leafSegmentInfo
leaf_item
+libpq_source
line_t
lineno_t
list_sort_comparator
local_relopt
local_relopts
+local_source
locale_t
locate_agg_of_level_context
locate_var_of_level_context
@@ -3312,6 +3316,7 @@ rendezvousHashEntry
replace_rte_variables_callback
replace_rte_variables_context
ret_type
+rewind_source
rewrite_event
rijndael_ctx
rm_detail_t