aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2019-12-05 15:14:09 -0500
committerRobert Haas <rhaas@postgresql.org>2019-12-05 15:14:09 -0500
commit431ba7bebf139b6edf5544ce18f39a1a4dcb8110 (patch)
treec9b50cad50359c2350980f2e95280a92c455bd0c /src
parent42f362967d9f82043608610c689c24046e07497c (diff)
downloadpostgresql-431ba7bebf139b6edf5544ce18f39a1a4dcb8110.tar.gz
postgresql-431ba7bebf139b6edf5544ce18f39a1a4dcb8110.zip
pg_basebackup: Refactor code for reading COPY and tar data.
Add a new function ReceiveCopyData that does just that, taking a callback as an argument to specify what should be done with each chunk as it is received. This allows a single copy of the logic to be shared between ReceiveTarFile and ReceiveAndUnpackTarFile, and eliminates a few #ifdef conditions based on HAVE_LIBZ. While this is slightly more code, it's arguably clearer, and there is a pending patch that introduces additional calls to ReceiveCopyData. This commit is not intended to result in any functional change. Discussion: http://postgr.es/m/CA+TgmoYZDTHbSpwZtW=JDgAhwVAYvmdSrRUjOd+AYdfNNXVBDg@mail.gmail.com
Diffstat (limited to 'src')
-rw-r--r--src/bin/pg_basebackup/pg_basebackup.c1005
1 files changed, 507 insertions, 498 deletions
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index a9d162a7da2..16886fbe71e 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -57,6 +57,40 @@ typedef struct TablespaceList
TablespaceListCell *tail;
} TablespaceList;
+typedef struct WriteTarState
+{
+ int tablespacenum;
+ char filename[MAXPGPATH];
+ FILE *tarfile;
+ char tarhdr[512];
+ bool basetablespace;
+ bool in_tarhdr;
+ bool skip_file;
+ bool is_recovery_guc_supported;
+ bool is_postgresql_auto_conf;
+ bool found_postgresql_auto_conf;
+ int file_padding_len;
+ size_t tarhdrsz;
+ pgoff_t filesz;
+#ifdef HAVE_LIBZ
+ gzFile ztarfile;
+#endif
+} WriteTarState;
+
+typedef struct UnpackTarState
+{
+ int tablespacenum;
+ char current_path[MAXPGPATH];
+ char filename[MAXPGPATH];
+ const char *mapped_tblspc_path;
+ pgoff_t current_len_left;
+ int current_padding;
+ FILE *file;
+} UnpackTarState;
+
+typedef void (*WriteDataCallback) (size_t nbytes, char *buf,
+ void *callback_data);
+
/*
* pg_xlog has been renamed to pg_wal in version 10. This version number
* should be compared with PQserverVersion().
@@ -142,7 +176,10 @@ static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *fo
static void progress_report(int tablespacenum, const char *filename, bool force);
static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
+static void ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data);
static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
+static void ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf,
+ void *callback_data);
static void BaseBackup(void);
static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline,
@@ -874,42 +911,78 @@ parse_max_rate(char *src)
}
/*
+ * Read a stream of COPY data and invoke the provided callback for each
+ * chunk.
+ */
+static void
+ReceiveCopyData(PGconn *conn, WriteDataCallback callback,
+ void *callback_data)
+{
+ PGresult *res;
+
+ /* Get the COPY data stream. */
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COPY_OUT)
+ {
+ pg_log_error("could not get COPY data stream: %s",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+ PQclear(res);
+
+ /* Loop over chunks until done. */
+ while (1)
+ {
+ int r;
+ char *copybuf;
+
+ r = PQgetCopyData(conn, &copybuf, 0);
+ if (r == -1)
+ {
+ /* End of chunk. */
+ break;
+ }
+ else if (r == -2)
+ {
+ pg_log_error("could not read COPY data: %s",
+ PQerrorMessage(conn));
+ exit(1);
+ }
+
+ (*callback) (r, copybuf, callback_data);
+
+ PQfreemem(copybuf);
+ }
+}
+
+/*
* Write a piece of tar data
*/
static void
-writeTarData(
-#ifdef HAVE_LIBZ
- gzFile ztarfile,
-#endif
- FILE *tarfile, char *buf, int r, char *current_file)
+writeTarData(WriteTarState *state, char *buf, int r)
{
#ifdef HAVE_LIBZ
- if (ztarfile != NULL)
+ if (state->ztarfile != NULL)
{
- if (gzwrite(ztarfile, buf, r) != r)
+ if (gzwrite(state->ztarfile, buf, r) != r)
{
pg_log_error("could not write to compressed file \"%s\": %s",
- current_file, get_gz_error(ztarfile));
+ state->filename, get_gz_error(state->ztarfile));
exit(1);
}
}
else
#endif
{
- if (fwrite(buf, r, 1, tarfile) != 1)
+ if (fwrite(buf, r, 1, state->tarfile) != 1)
{
- pg_log_error("could not write to file \"%s\": %m", current_file);
+ pg_log_error("could not write to file \"%s\": %m",
+ state->filename);
exit(1);
}
}
}
-#ifdef HAVE_LIBZ
-#define WRITE_TAR_DATA(buf, sz) writeTarData(ztarfile, tarfile, buf, sz, filename)
-#else
-#define WRITE_TAR_DATA(buf, sz) writeTarData(tarfile, buf, sz, filename)
-#endif
-
/*
* Receive a tar format file from the connection to the server, and write
* the data from this file directly into a tar file. If compression is
@@ -923,29 +996,19 @@ writeTarData(
static void
ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
{
- char filename[MAXPGPATH];
- char *copybuf = NULL;
- FILE *tarfile = NULL;
- char tarhdr[512];
- bool basetablespace = PQgetisnull(res, rownum, 0);
- bool in_tarhdr = true;
- bool skip_file = false;
- bool is_recovery_guc_supported = true;
- bool is_postgresql_auto_conf = false;
- bool found_postgresql_auto_conf = false;
- int file_padding_len = 0;
- size_t tarhdrsz = 0;
- pgoff_t filesz = 0;
+ char zerobuf[1024];
+ WriteTarState state;
-#ifdef HAVE_LIBZ
- gzFile ztarfile = NULL;
-#endif
+ memset(&state, 0, sizeof(state));
+ state.tablespacenum = rownum;
+ state.basetablespace = PQgetisnull(res, rownum, 0);
+ state.in_tarhdr = true;
/* recovery.conf is integrated into postgresql.conf in 12 and newer */
- if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_RECOVERY_GUC)
- is_recovery_guc_supported = false;
+ if (PQserverVersion(conn) >= MINIMUM_VERSION_FOR_RECOVERY_GUC)
+ state.is_recovery_guc_supported = true;
- if (basetablespace)
+ if (state.basetablespace)
{
/*
* Base tablespaces
@@ -959,40 +1022,42 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
#ifdef HAVE_LIBZ
if (compresslevel != 0)
{
- ztarfile = gzdopen(dup(fileno(stdout)), "wb");
- if (gzsetparams(ztarfile, compresslevel,
+ state.ztarfile = gzdopen(dup(fileno(stdout)), "wb");
+ if (gzsetparams(state.ztarfile, compresslevel,
Z_DEFAULT_STRATEGY) != Z_OK)
{
pg_log_error("could not set compression level %d: %s",
- compresslevel, get_gz_error(ztarfile));
+ compresslevel, get_gz_error(state.ztarfile));
exit(1);
}
}
else
#endif
- tarfile = stdout;
- strcpy(filename, "-");
+ state.tarfile = stdout;
+ strcpy(state.filename, "-");
}
else
{
#ifdef HAVE_LIBZ
if (compresslevel != 0)
{
- snprintf(filename, sizeof(filename), "%s/base.tar.gz", basedir);
- ztarfile = gzopen(filename, "wb");
- if (gzsetparams(ztarfile, compresslevel,
+ snprintf(state.filename, sizeof(state.filename),
+ "%s/base.tar.gz", basedir);
+ state.ztarfile = gzopen(state.filename, "wb");
+ if (gzsetparams(state.ztarfile, compresslevel,
Z_DEFAULT_STRATEGY) != Z_OK)
{
pg_log_error("could not set compression level %d: %s",
- compresslevel, get_gz_error(ztarfile));
+ compresslevel, get_gz_error(state.ztarfile));
exit(1);
}
}
else
#endif
{
- snprintf(filename, sizeof(filename), "%s/base.tar", basedir);
- tarfile = fopen(filename, "wb");
+ snprintf(state.filename, sizeof(state.filename),
+ "%s/base.tar", basedir);
+ state.tarfile = fopen(state.filename, "wb");
}
}
}
@@ -1004,34 +1069,35 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
#ifdef HAVE_LIBZ
if (compresslevel != 0)
{
- snprintf(filename, sizeof(filename), "%s/%s.tar.gz", basedir,
- PQgetvalue(res, rownum, 0));
- ztarfile = gzopen(filename, "wb");
- if (gzsetparams(ztarfile, compresslevel,
+ snprintf(state.filename, sizeof(state.filename),
+ "%s/%s.tar.gz",
+ basedir, PQgetvalue(res, rownum, 0));
+ state.ztarfile = gzopen(state.filename, "wb");
+ if (gzsetparams(state.ztarfile, compresslevel,
Z_DEFAULT_STRATEGY) != Z_OK)
{
pg_log_error("could not set compression level %d: %s",
- compresslevel, get_gz_error(ztarfile));
+ compresslevel, get_gz_error(state.ztarfile));
exit(1);
}
}
else
#endif
{
- snprintf(filename, sizeof(filename), "%s/%s.tar", basedir,
- PQgetvalue(res, rownum, 0));
- tarfile = fopen(filename, "wb");
+ snprintf(state.filename, sizeof(state.filename), "%s/%s.tar",
+ basedir, PQgetvalue(res, rownum, 0));
+ state.tarfile = fopen(state.filename, "wb");
}
}
#ifdef HAVE_LIBZ
if (compresslevel != 0)
{
- if (!ztarfile)
+ if (!state.ztarfile)
{
/* Compression is in use */
pg_log_error("could not create compressed file \"%s\": %s",
- filename, get_gz_error(ztarfile));
+ state.filename, get_gz_error(state.ztarfile));
exit(1);
}
}
@@ -1039,314 +1105,292 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
#endif
{
/* Either no zlib support, or zlib support but compresslevel = 0 */
- if (!tarfile)
+ if (!state.tarfile)
{
- pg_log_error("could not create file \"%s\": %m", filename);
+ pg_log_error("could not create file \"%s\": %m", state.filename);
exit(1);
}
}
+ ReceiveCopyData(conn, ReceiveTarCopyChunk, &state);
+
/*
- * Get the COPY data stream
+ * End of copy data. If requested, and this is the base tablespace, write
+ * configuration file into the tarfile. When done, close the file (but not
+ * stdout).
+ *
+ * Also, write two completely empty blocks at the end of the tar file, as
+ * required by some tar programs.
*/
- res = PQgetResult(conn);
- if (PQresultStatus(res) != PGRES_COPY_OUT)
- {
- pg_log_error("could not get COPY data stream: %s",
- PQerrorMessage(conn));
- exit(1);
- }
- while (1)
+ MemSet(zerobuf, 0, sizeof(zerobuf));
+
+ if (state.basetablespace && writerecoveryconf)
{
- int r;
+ char header[512];
- if (copybuf != NULL)
+ /*
+ * If postgresql.auto.conf has not been found in the streamed data,
+ * add recovery configuration to postgresql.auto.conf if recovery
+ * parameters are GUCs. If the instance connected to is older than
+ * 12, create recovery.conf with this data otherwise.
+ */
+ if (!state.found_postgresql_auto_conf || !state.is_recovery_guc_supported)
{
- PQfreemem(copybuf);
- copybuf = NULL;
+ int padding;
+
+ tarCreateHeader(header,
+ state.is_recovery_guc_supported ? "postgresql.auto.conf" : "recovery.conf",
+ NULL,
+ recoveryconfcontents->len,
+ pg_file_create_mode, 04000, 02000,
+ time(NULL));
+
+ padding = ((recoveryconfcontents->len + 511) & ~511) - recoveryconfcontents->len;
+
+ writeTarData(&state, header, sizeof(header));
+ writeTarData(&state, recoveryconfcontents->data,
+ recoveryconfcontents->len);
+ if (padding)
+ writeTarData(&state, zerobuf, padding);
}
- r = PQgetCopyData(conn, &copybuf, 0);
- if (r == -1)
+ /*
+ * standby.signal is supported only if recovery parameters are GUCs.
+ */
+ if (state.is_recovery_guc_supported)
{
- /*
- * End of chunk. If requested, and this is the base tablespace,
- * write configuration file into the tarfile. When done, close the
- * file (but not stdout).
- *
- * Also, write two completely empty blocks at the end of the tar
- * file, as required by some tar programs.
- */
- char zerobuf[1024];
+ tarCreateHeader(header, "standby.signal", NULL,
+ 0, /* zero-length file */
+ pg_file_create_mode, 04000, 02000,
+ time(NULL));
+
+ writeTarData(&state, header, sizeof(header));
+ writeTarData(&state, zerobuf, 511);
+ }
+ }
- MemSet(zerobuf, 0, sizeof(zerobuf));
+ /* 2 * 512 bytes empty data at end of file */
+ writeTarData(&state, zerobuf, sizeof(zerobuf));
- if (basetablespace && writerecoveryconf)
+#ifdef HAVE_LIBZ
+ if (state.ztarfile != NULL)
+ {
+ if (gzclose(state.ztarfile) != 0)
+ {
+ pg_log_error("could not close compressed file \"%s\": %s",
+ state.filename, get_gz_error(state.ztarfile));
+ exit(1);
+ }
+ }
+ else
+#endif
+ {
+ if (strcmp(basedir, "-") != 0)
+ {
+ if (fclose(state.tarfile) != 0)
{
- char header[512];
+ pg_log_error("could not close file \"%s\": %m",
+ state.filename);
+ exit(1);
+ }
+ }
+ }
- /*
- * If postgresql.auto.conf has not been found in the streamed
- * data, add recovery configuration to postgresql.auto.conf if
- * recovery parameters are GUCs. If the instance connected to
- * is older than 12, create recovery.conf with this data
- * otherwise.
- */
- if (!found_postgresql_auto_conf || !is_recovery_guc_supported)
- {
- int padding;
+ progress_report(rownum, state.filename, true);
- tarCreateHeader(header,
- is_recovery_guc_supported ? "postgresql.auto.conf" : "recovery.conf",
- NULL,
- recoveryconfcontents->len,
- pg_file_create_mode, 04000, 02000,
- time(NULL));
+ /*
+ * Do not sync the resulting tar file yet, all files are synced once at
+ * the end.
+ */
+}
- padding = ((recoveryconfcontents->len + 511) & ~511) - recoveryconfcontents->len;
+/*
+ * Receive one chunk of tar-format data from the server.
+ */
+static void
+ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data)
+{
+ WriteTarState *state = callback_data;
- WRITE_TAR_DATA(header, sizeof(header));
- WRITE_TAR_DATA(recoveryconfcontents->data,
- recoveryconfcontents->len);
- if (padding)
- WRITE_TAR_DATA(zerobuf, padding);
- }
+ if (!writerecoveryconf || !state->basetablespace)
+ {
+ /*
+ * When not writing config file, or when not working on the base
+ * tablespace, we never have to look for an existing configuration
+ * file in the stream.
+ */
+ writeTarData(state, copybuf, r);
+ }
+ else
+ {
+ /*
+ * Look for a config file in the existing tar stream. If it's there,
+ * we must skip it so we can later overwrite it with our own version
+ * of the file.
+ *
+ * To do this, we have to process the individual files inside the TAR
+ * stream. The stream consists of a header and zero or more chunks,
+ * all 512 bytes long. The stream from the server is broken up into
+ * smaller pieces, so we have to track the size of the files to find
+ * the next header structure.
+ */
+ int rr = r;
+ int pos = 0;
+ while (rr > 0)
+ {
+ if (state->in_tarhdr)
+ {
/*
- * standby.signal is supported only if recovery parameters are
- * GUCs.
+ * We're currently reading a header structure inside the TAR
+ * stream, i.e. the file metadata.
*/
- if (is_recovery_guc_supported)
+ if (state->tarhdrsz < 512)
{
- tarCreateHeader(header, "standby.signal", NULL,
- 0, /* zero-length file */
- pg_file_create_mode, 04000, 02000,
- time(NULL));
+ /*
+ * Copy the header structure into tarhdr in case the
+ * header is not aligned to 512 bytes or it's not returned
+ * in whole by the last PQgetCopyData call.
+ */
+ int hdrleft;
+ int bytes2copy;
- WRITE_TAR_DATA(header, sizeof(header));
- WRITE_TAR_DATA(zerobuf, 511);
- }
- }
+ hdrleft = 512 - state->tarhdrsz;
+ bytes2copy = (rr > hdrleft ? hdrleft : rr);
- /* 2 * 512 bytes empty data at end of file */
- WRITE_TAR_DATA(zerobuf, sizeof(zerobuf));
+ memcpy(&state->tarhdr[state->tarhdrsz], copybuf + pos,
+ bytes2copy);
-#ifdef HAVE_LIBZ
- if (ztarfile != NULL)
- {
- if (gzclose(ztarfile) != 0)
- {
- pg_log_error("could not close compressed file \"%s\": %s",
- filename, get_gz_error(ztarfile));
- exit(1);
+ rr -= bytes2copy;
+ pos += bytes2copy;
+ state->tarhdrsz += bytes2copy;
}
- }
- else
-#endif
- {
- if (strcmp(basedir, "-") != 0)
- {
- if (fclose(tarfile) != 0)
- {
- pg_log_error("could not close file \"%s\": %m",
- filename);
- exit(1);
- }
- }
- }
-
- break;
- }
- else if (r == -2)
- {
- pg_log_error("could not read COPY data: %s",
- PQerrorMessage(conn));
- exit(1);
- }
-
- if (!writerecoveryconf || !basetablespace)
- {
- /*
- * When not writing config file, or when not working on the base
- * tablespace, we never have to look for an existing configuration
- * file in the stream.
- */
- WRITE_TAR_DATA(copybuf, r);
- }
- else
- {
- /*
- * Look for a config file in the existing tar stream. If it's
- * there, we must skip it so we can later overwrite it with our
- * own version of the file.
- *
- * To do this, we have to process the individual files inside the
- * TAR stream. The stream consists of a header and zero or more
- * chunks, all 512 bytes long. The stream from the server is
- * broken up into smaller pieces, so we have to track the size of
- * the files to find the next header structure.
- */
- int rr = r;
- int pos = 0;
-
- while (rr > 0)
- {
- if (in_tarhdr)
+ else
{
/*
- * We're currently reading a header structure inside the
- * TAR stream, i.e. the file metadata.
+ * We have the complete header structure in tarhdr, look
+ * at the file metadata: we may want append recovery info
+ * into postgresql.auto.conf and skip standby.signal file
+ * if recovery parameters are integrated as GUCs, and
+ * recovery.conf otherwise. In both cases we must
+ * calculate tar padding.
*/
- if (tarhdrsz < 512)
+ if (state->is_recovery_guc_supported)
{
- /*
- * Copy the header structure into tarhdr in case the
- * header is not aligned to 512 bytes or it's not
- * returned in whole by the last PQgetCopyData call.
- */
- int hdrleft;
- int bytes2copy;
-
- hdrleft = 512 - tarhdrsz;
- bytes2copy = (rr > hdrleft ? hdrleft : rr);
-
- memcpy(&tarhdr[tarhdrsz], copybuf + pos, bytes2copy);
-
- rr -= bytes2copy;
- pos += bytes2copy;
- tarhdrsz += bytes2copy;
+ state->skip_file =
+ (strcmp(&state->tarhdr[0], "standby.signal") == 0);
+ state->is_postgresql_auto_conf =
+ (strcmp(&state->tarhdr[0], "postgresql.auto.conf") == 0);
}
else
- {
- /*
- * We have the complete header structure in tarhdr,
- * look at the file metadata: we may want append
- * recovery info into postgresql.auto.conf and skip
- * standby.signal file if recovery parameters are
- * integrated as GUCs, and recovery.conf otherwise. In
- * both cases we must calculate tar padding.
- */
- if (is_recovery_guc_supported)
- {
- skip_file = (strcmp(&tarhdr[0], "standby.signal") == 0);
- is_postgresql_auto_conf = (strcmp(&tarhdr[0], "postgresql.auto.conf") == 0);
- }
- else
- skip_file = (strcmp(&tarhdr[0], "recovery.conf") == 0);
+ state->skip_file =
+ (strcmp(&state->tarhdr[0], "recovery.conf") == 0);
- filesz = read_tar_number(&tarhdr[124], 12);
- file_padding_len = ((filesz + 511) & ~511) - filesz;
+ state->filesz = read_tar_number(&state->tarhdr[124], 12);
+ state->file_padding_len =
+ ((state->filesz + 511) & ~511) - state->filesz;
- if (is_recovery_guc_supported &&
- is_postgresql_auto_conf &&
- writerecoveryconf)
- {
- /* replace tar header */
- char header[512];
+ if (state->is_recovery_guc_supported &&
+ state->is_postgresql_auto_conf &&
+ writerecoveryconf)
+ {
+ /* replace tar header */
+ char header[512];
- tarCreateHeader(header, "postgresql.auto.conf", NULL,
- filesz + recoveryconfcontents->len,
- pg_file_create_mode, 04000, 02000,
- time(NULL));
+ tarCreateHeader(header, "postgresql.auto.conf", NULL,
+ state->filesz + recoveryconfcontents->len,
+ pg_file_create_mode, 04000, 02000,
+ time(NULL));
- WRITE_TAR_DATA(header, sizeof(header));
- }
- else
+ writeTarData(state, header, sizeof(header));
+ }
+ else
+ {
+ /* copy stream with padding */
+ state->filesz += state->file_padding_len;
+
+ if (!state->skip_file)
{
- /* copy stream with padding */
- filesz += file_padding_len;
-
- if (!skip_file)
- {
- /*
- * If we're not skipping the file, write the
- * tar header unmodified.
- */
- WRITE_TAR_DATA(tarhdr, 512);
- }
+ /*
+ * If we're not skipping the file, write the tar
+ * header unmodified.
+ */
+ writeTarData(state, state->tarhdr, 512);
}
-
- /* Next part is the file, not the header */
- in_tarhdr = false;
}
+
+ /* Next part is the file, not the header */
+ state->in_tarhdr = false;
}
- else
+ }
+ else
+ {
+ /*
+ * We're processing a file's contents.
+ */
+ if (state->filesz > 0)
{
/*
- * We're processing a file's contents.
+ * We still have data to read (and possibly write).
*/
- if (filesz > 0)
- {
- /*
- * We still have data to read (and possibly write).
- */
- int bytes2write;
+ int bytes2write;
- bytes2write = (filesz > rr ? rr : filesz);
+ bytes2write = (state->filesz > rr ? rr : state->filesz);
- if (!skip_file)
- WRITE_TAR_DATA(copybuf + pos, bytes2write);
+ if (!state->skip_file)
+ writeTarData(state, copybuf + pos, bytes2write);
- rr -= bytes2write;
- pos += bytes2write;
- filesz -= bytes2write;
- }
- else if (is_recovery_guc_supported &&
- is_postgresql_auto_conf &&
- writerecoveryconf)
- {
- /* append recovery config to postgresql.auto.conf */
- int padding;
- int tailsize;
+ rr -= bytes2write;
+ pos += bytes2write;
+ state->filesz -= bytes2write;
+ }
+ else if (state->is_recovery_guc_supported &&
+ state->is_postgresql_auto_conf &&
+ writerecoveryconf)
+ {
+ /* append recovery config to postgresql.auto.conf */
+ int padding;
+ int tailsize;
- tailsize = (512 - file_padding_len) + recoveryconfcontents->len;
- padding = ((tailsize + 511) & ~511) - tailsize;
+ tailsize = (512 - state->file_padding_len) + recoveryconfcontents->len;
+ padding = ((tailsize + 511) & ~511) - tailsize;
- WRITE_TAR_DATA(recoveryconfcontents->data, recoveryconfcontents->len);
+ writeTarData(state, recoveryconfcontents->data,
+ recoveryconfcontents->len);
- if (padding)
- {
- char zerobuf[512];
+ if (padding)
+ {
+ char zerobuf[512];
- MemSet(zerobuf, 0, sizeof(zerobuf));
- WRITE_TAR_DATA(zerobuf, padding);
- }
+ MemSet(zerobuf, 0, sizeof(zerobuf));
+ writeTarData(state, zerobuf, padding);
+ }
- /* skip original file padding */
- is_postgresql_auto_conf = false;
- skip_file = true;
- filesz += file_padding_len;
+ /* skip original file padding */
+ state->is_postgresql_auto_conf = false;
+ state->skip_file = true;
+ state->filesz += state->file_padding_len;
- found_postgresql_auto_conf = true;
- }
- else
- {
- /*
- * No more data in the current file, the next piece of
- * data (if any) will be a new file header structure.
- */
- in_tarhdr = true;
- skip_file = false;
- is_postgresql_auto_conf = false;
- tarhdrsz = 0;
- filesz = 0;
- }
+ state->found_postgresql_auto_conf = true;
+ }
+ else
+ {
+ /*
+ * No more data in the current file, the next piece of
+ * data (if any) will be a new file header structure.
+ */
+ state->in_tarhdr = true;
+ state->skip_file = false;
+ state->is_postgresql_auto_conf = false;
+ state->tarhdrsz = 0;
+ state->filesz = 0;
}
}
}
- totaldone += r;
- progress_report(rownum, filename, false);
- } /* while (1) */
- progress_report(rownum, filename, true);
-
- if (copybuf != NULL)
- PQfreemem(copybuf);
-
- /*
- * Do not sync the resulting tar file yet, all files are synced once at
- * the end.
- */
+ }
+ totaldone += r;
+ progress_report(state->tablespacenum, state->filename, false);
}
@@ -1384,254 +1428,219 @@ get_tablespace_mapping(const char *dir)
static void
ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
{
- char current_path[MAXPGPATH];
- char filename[MAXPGPATH];
- const char *mapped_tblspc_path;
- pgoff_t current_len_left = 0;
- int current_padding = 0;
+ UnpackTarState state;
bool basetablespace;
- char *copybuf = NULL;
- FILE *file = NULL;
+
+ memset(&state, 0, sizeof(state));
+ state.tablespacenum = rownum;
basetablespace = PQgetisnull(res, rownum, 0);
if (basetablespace)
- strlcpy(current_path, basedir, sizeof(current_path));
+ strlcpy(state.current_path, basedir, sizeof(state.current_path));
else
- strlcpy(current_path,
+ strlcpy(state.current_path,
get_tablespace_mapping(PQgetvalue(res, rownum, 1)),
- sizeof(current_path));
+ sizeof(state.current_path));
- /*
- * Get the COPY data
- */
- res = PQgetResult(conn);
- if (PQresultStatus(res) != PGRES_COPY_OUT)
+ ReceiveCopyData(conn, ReceiveTarAndUnpackCopyChunk, &state);
+
+
+ if (state.file)
+ fclose(state.file);
+
+ progress_report(rownum, state.filename, true);
+
+ if (state.file != NULL)
{
- pg_log_error("could not get COPY data stream: %s",
- PQerrorMessage(conn));
+ pg_log_error("COPY stream ended before last file was finished");
exit(1);
}
- while (1)
- {
- int r;
+ if (basetablespace && writerecoveryconf)
+ WriteRecoveryConfig(conn, basedir, recoveryconfcontents);
- if (copybuf != NULL)
- {
- PQfreemem(copybuf);
- copybuf = NULL;
- }
+ /*
+ * No data is synced here, everything is done for all tablespaces at the
+ * end.
+ */
+}
- r = PQgetCopyData(conn, &copybuf, 0);
+static void
+ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data)
+{
+ UnpackTarState *state = callback_data;
- if (r == -1)
- {
- /*
- * End of chunk
- */
- if (file)
- fclose(file);
+ if (state->file == NULL)
+ {
+#ifndef WIN32
+ int filemode;
+#endif
- break;
- }
- else if (r == -2)
+ /*
+ * No current file, so this must be the header for a new file
+ */
+ if (r != 512)
{
- pg_log_error("could not read COPY data: %s",
- PQerrorMessage(conn));
+ pg_log_error("invalid tar block header size: %zu", r);
exit(1);
}
+ totaldone += 512;
- if (file == NULL)
- {
-#ifndef WIN32
- int filemode;
-#endif
-
- /*
- * No current file, so this must be the header for a new file
- */
- if (r != 512)
- {
- pg_log_error("invalid tar block header size: %d", r);
- exit(1);
- }
- totaldone += 512;
-
- current_len_left = read_tar_number(&copybuf[124], 12);
+ state->current_len_left = read_tar_number(&copybuf[124], 12);
#ifndef WIN32
- /* Set permissions on the file */
- filemode = read_tar_number(&copybuf[100], 8);
+ /* Set permissions on the file */
+ filemode = read_tar_number(&copybuf[100], 8);
#endif
- /*
- * All files are padded up to 512 bytes
- */
- current_padding =
- ((current_len_left + 511) & ~511) - current_len_left;
+ /*
+ * All files are padded up to 512 bytes
+ */
+ state->current_padding =
+ ((state->current_len_left + 511) & ~511) - state->current_len_left;
+ /*
+ * First part of header is zero terminated filename
+ */
+ snprintf(state->filename, sizeof(state->filename),
+ "%s/%s", state->current_path, copybuf);
+ if (state->filename[strlen(state->filename) - 1] == '/')
+ {
/*
- * First part of header is zero terminated filename
+ * Ends in a slash means directory or symlink to directory
*/
- snprintf(filename, sizeof(filename), "%s/%s", current_path,
- copybuf);
- if (filename[strlen(filename) - 1] == '/')
+ if (copybuf[156] == '5')
{
/*
- * Ends in a slash means directory or symlink to directory
+ * Directory. Remove trailing slash first.
*/
- if (copybuf[156] == '5')
+ state->filename[strlen(state->filename) - 1] = '\0';
+ if (mkdir(state->filename, pg_dir_create_mode) != 0)
{
/*
- * Directory
+ * When streaming WAL, pg_wal (or pg_xlog for pre-9.6
+ * clusters) will have been created by the wal receiver
+ * process. Also, when the WAL directory location was
+ * specified, pg_wal (or pg_xlog) has already been created
+ * as a symbolic link before starting the actual backup.
+ * So just ignore creation failures on related
+ * directories.
*/
- filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
- if (mkdir(filename, pg_dir_create_mode) != 0)
+ if (!((pg_str_endswith(state->filename, "/pg_wal") ||
+ pg_str_endswith(state->filename, "/pg_xlog") ||
+ pg_str_endswith(state->filename, "/archive_status")) &&
+ errno == EEXIST))
{
- /*
- * When streaming WAL, pg_wal (or pg_xlog for pre-9.6
- * clusters) will have been created by the wal
- * receiver process. Also, when the WAL directory
- * location was specified, pg_wal (or pg_xlog) has
- * already been created as a symbolic link before
- * starting the actual backup. So just ignore creation
- * failures on related directories.
- */
- if (!((pg_str_endswith(filename, "/pg_wal") ||
- pg_str_endswith(filename, "/pg_xlog") ||
- pg_str_endswith(filename, "/archive_status")) &&
- errno == EEXIST))
- {
- pg_log_error("could not create directory \"%s\": %m",
- filename);
- exit(1);
- }
+ pg_log_error("could not create directory \"%s\": %m",
+ state->filename);
+ exit(1);
}
+ }
#ifndef WIN32
- if (chmod(filename, (mode_t) filemode))
- pg_log_error("could not set permissions on directory \"%s\": %m",
- filename);
+ if (chmod(state->filename, (mode_t) filemode))
+ pg_log_error("could not set permissions on directory \"%s\": %m",
+ state->filename);
#endif
- }
- else if (copybuf[156] == '2')
- {
- /*
- * Symbolic link
- *
- * It's most likely a link in pg_tblspc directory, to the
- * location of a tablespace. Apply any tablespace mapping
- * given on the command line (--tablespace-mapping). (We
- * blindly apply the mapping without checking that the
- * link really is inside pg_tblspc. We don't expect there
- * to be other symlinks in a data directory, but if there
- * are, you can call it an undocumented feature that you
- * can map them too.)
- */
- filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */
+ }
+ else if (copybuf[156] == '2')
+ {
+ /*
+ * Symbolic link
+ *
+ * It's most likely a link in pg_tblspc directory, to the
+ * location of a tablespace. Apply any tablespace mapping
+ * given on the command line (--tablespace-mapping). (We
+ * blindly apply the mapping without checking that the link
+ * really is inside pg_tblspc. We don't expect there to be
+ * other symlinks in a data directory, but if there are, you
+ * can call it an undocumented feature that you can map them
+ * too.)
+ */
+ state->filename[strlen(state->filename) - 1] = '\0'; /* Remove trailing slash */
- mapped_tblspc_path = get_tablespace_mapping(&copybuf[157]);
- if (symlink(mapped_tblspc_path, filename) != 0)
- {
- pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m",
- filename, mapped_tblspc_path);
- exit(1);
- }
- }
- else
+ state->mapped_tblspc_path =
+ get_tablespace_mapping(&copybuf[157]);
+ if (symlink(state->mapped_tblspc_path, state->filename) != 0)
{
- pg_log_error("unrecognized link indicator \"%c\"",
- copybuf[156]);
+ pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m",
+ state->filename, state->mapped_tblspc_path);
exit(1);
}
- continue; /* directory or link handled */
}
-
- /*
- * regular file
- */
- file = fopen(filename, "wb");
- if (!file)
+ else
{
- pg_log_error("could not create file \"%s\": %m", filename);
+ pg_log_error("unrecognized link indicator \"%c\"",
+ copybuf[156]);
exit(1);
}
+ return; /* directory or link handled */
+ }
+
+ /*
+ * regular file
+ */
+ state->file = fopen(state->filename, "wb");
+ if (!state->file)
+ {
+ pg_log_error("could not create file \"%s\": %m", state->filename);
+ exit(1);
+ }
#ifndef WIN32
- if (chmod(filename, (mode_t) filemode))
- pg_log_error("could not set permissions on file \"%s\": %m",
- filename);
+ if (chmod(state->filename, (mode_t) filemode))
+ pg_log_error("could not set permissions on file \"%s\": %m",
+ state->filename);
#endif
- if (current_len_left == 0)
- {
- /*
- * Done with this file, next one will be a new tar header
- */
- fclose(file);
- file = NULL;
- continue;
- }
- } /* new file */
- else
+ if (state->current_len_left == 0)
{
/*
- * Continuing blocks in existing file
+ * Done with this file, next one will be a new tar header
*/
- if (current_len_left == 0 && r == current_padding)
- {
- /*
- * Received the padding block for this file, ignore it and
- * close the file, then move on to the next tar header.
- */
- fclose(file);
- file = NULL;
- totaldone += r;
- continue;
- }
-
- if (fwrite(copybuf, r, 1, file) != 1)
- {
- pg_log_error("could not write to file \"%s\": %m", filename);
- exit(1);
- }
- totaldone += r;
- progress_report(rownum, filename, false);
-
- current_len_left -= r;
- if (current_len_left == 0 && current_padding == 0)
- {
- /*
- * Received the last block, and there is no padding to be
- * expected. Close the file and move on to the next tar
- * header.
- */
- fclose(file);
- file = NULL;
- continue;
- }
- } /* continuing data in existing file */
- } /* loop over all data blocks */
- progress_report(rownum, filename, true);
-
- if (file != NULL)
+ fclose(state->file);
+ state->file = NULL;
+ return;
+ }
+ } /* new file */
+ else
{
- pg_log_error("COPY stream ended before last file was finished");
- exit(1);
- }
-
- if (copybuf != NULL)
- PQfreemem(copybuf);
+ /*
+ * Continuing blocks in existing file
+ */
+ if (state->current_len_left == 0 && r == state->current_padding)
+ {
+ /*
+ * Received the padding block for this file, ignore it and close
+ * the file, then move on to the next tar header.
+ */
+ fclose(state->file);
+ state->file = NULL;
+ totaldone += r;
+ return;
+ }
- if (basetablespace && writerecoveryconf)
- WriteRecoveryConfig(conn, basedir, recoveryconfcontents);
+ if (fwrite(copybuf, r, 1, state->file) != 1)
+ {
+ pg_log_error("could not write to file \"%s\": %m", state->filename);
+ exit(1);
+ }
+ totaldone += r;
+ progress_report(state->tablespacenum, state->filename, false);
- /*
- * No data is synced here, everything is done for all tablespaces at the
- * end.
- */
+ state->current_len_left -= r;
+ if (state->current_len_left == 0 && state->current_padding == 0)
+ {
+ /*
+ * Received the last block, and there is no padding to be
+ * expected. Close the file and move on to the next tar header.
+ */
+ fclose(state->file);
+ state->file = NULL;
+ return;
+ }
+ } /* continuing data in existing file */
}
-
static void
BaseBackup(void)
{