diff options
author | Andrew Dunstan <andrew@dunslane.net> | 2009-02-02 20:07:37 +0000 |
---|---|---|
committer | Andrew Dunstan <andrew@dunslane.net> | 2009-02-02 20:07:37 +0000 |
commit | 775f1b379e3a282140f60ef65a11d1444dc80ccf (patch) | |
tree | 9d6a1320d8d85588d927c45096df47e9044d6248 /src/bin/pg_dump/pg_backup_custom.c | |
parent | 3a5b77371522b64feda006a7aed2a0e57bfb2b22 (diff) | |
download | postgresql-775f1b379e3a282140f60ef65a11d1444dc80ccf.tar.gz postgresql-775f1b379e3a282140f60ef65a11d1444dc80ccf.zip |
Provide for parallel restoration from a custom format archive. Each data and
post-data step is run in a separate worker child (a thread on Windows, a child
process elsewhere) up to the concurrent number specified by the new pg_restore
command-line --multi-thread | -m switch.
Andrew Dunstan, with some editing by Tom Lane.
Diffstat (limited to 'src/bin/pg_dump/pg_backup_custom.c')
-rw-r--r-- | src/bin/pg_dump/pg_backup_custom.c | 104 |
1 files changed, 93 insertions, 11 deletions
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c index 5791ec78128..ebe2d6ce181 100644 --- a/src/bin/pg_dump/pg_backup_custom.c +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -19,7 +19,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_custom.c,v 1.40 2007/10/28 21:55:52 tgl Exp $ + * $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_custom.c,v 1.41 2009/02/02 20:07:37 adunstan Exp $ * *------------------------------------------------------------------------- */ @@ -40,6 +40,7 @@ static int _ReadByte(ArchiveHandle *); static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len); static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len); static void _CloseArchive(ArchiveHandle *AH); +static void _ReopenArchive(ArchiveHandle *AH); static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te); static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te); @@ -54,6 +55,8 @@ static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); static void _EndBlobs(ArchiveHandle *AH, TocEntry *te); static void _LoadBlobs(ArchiveHandle *AH); +static void _Clone(ArchiveHandle *AH); +static void _DeClone(ArchiveHandle *AH); /*------------ * Buffers used in zlib compression and extra data stored in archive and @@ -120,6 +123,7 @@ InitArchiveFmt_Custom(ArchiveHandle *AH) AH->WriteBufPtr = _WriteBuf; AH->ReadBufPtr = _ReadBuf; AH->ClosePtr = _CloseArchive; + AH->ReopenPtr = _ReopenArchive; AH->PrintTocDataPtr = _PrintTocData; AH->ReadExtraTocPtr = _ReadExtraToc; AH->WriteExtraTocPtr = _WriteExtraToc; @@ -129,6 +133,8 @@ InitArchiveFmt_Custom(ArchiveHandle *AH) AH->StartBlobPtr = _StartBlob; AH->EndBlobPtr = _EndBlob; AH->EndBlobsPtr = _EndBlobs; + AH->ClonePtr = _Clone; + AH->DeClonePtr = _DeClone; /* * Set up some special context used in compressing data. @@ -569,7 +575,6 @@ _PrintData(ArchiveHandle *AH) zp->avail_in = blkLen; #ifdef HAVE_LIBZ - if (AH->compression != 0) { while (zp->avail_in != 0) @@ -585,15 +590,12 @@ _PrintData(ArchiveHandle *AH) } } else - { #endif + { in[zp->avail_in] = '\0'; ahwrite(in, 1, zp->avail_in, AH); zp->avail_in = 0; - -#ifdef HAVE_LIBZ } -#endif blkLen = ReadInt(AH); } @@ -822,11 +824,9 @@ _CloseArchive(ArchiveHandle *AH) * expect to be doing seeks to read the data back - it may be ok to * just use the existing self-consistent block formatting. */ - if (ctx->hasSeek) - { - fseeko(AH->FH, tpos, SEEK_SET); + if (ctx->hasSeek && + fseeko(AH->FH, tpos, SEEK_SET) == 0) WriteToc(AH); - } } if (fclose(AH->FH) != 0) @@ -835,6 +835,48 @@ _CloseArchive(ArchiveHandle *AH) AH->FH = NULL; } +/* + * Reopen the archive's file handle. + * + * We close the original file handle, except on Windows. (The difference + * is because on Windows, this is used within a multithreading context, + * and we don't want a thread closing the parent file handle.) + */ +static void +_ReopenArchive(ArchiveHandle *AH) +{ + lclContext *ctx = (lclContext *) AH->formatData; + pgoff_t tpos; + + if (AH->mode == archModeWrite) + die_horribly(AH,modulename,"can only reopen input archives\n"); + if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0) + die_horribly(AH,modulename,"cannot reopen stdin\n"); + if (!ctx->hasSeek) + die_horribly(AH,modulename,"cannot reopen non-seekable file\n"); + + errno = 0; + tpos = ftello(AH->FH); + if (errno) + die_horribly(AH, modulename, "could not determine seek position in archive file: %s\n", + strerror(errno)); + +#ifndef WIN32 + if (fclose(AH->FH) != 0) + die_horribly(AH, modulename, "could not close archive file: %s\n", + strerror(errno)); +#endif + + AH->FH = fopen(AH->fSpec, PG_BINARY_R); + if (!AH->FH) + die_horribly(AH, modulename, "could not open input file \"%s\": %s\n", + AH->fSpec, strerror(errno)); + + if (fseeko(AH->FH, tpos, SEEK_SET) != 0) + die_horribly(AH, modulename, "could not set seek position in archive file: %s\n", + strerror(errno)); +} + /*-------------------------------------------------- * END OF FORMAT CALLBACKS *-------------------------------------------------- @@ -990,7 +1032,6 @@ _DoDeflate(ArchiveHandle *AH, lclContext *ctx, int flush) /* * Terminate zlib context and flush it's buffers. If no zlib * then just return. - * */ static void _EndDataCompressor(ArchiveHandle *AH, TocEntry *te) @@ -1020,3 +1061,44 @@ _EndDataCompressor(ArchiveHandle *AH, TocEntry *te) /* Send the end marker */ WriteInt(AH, 0); } + + +/* + * Clone format-specific fields during parallel restoration. + */ +static void +_Clone(ArchiveHandle *AH) +{ + lclContext *ctx = (lclContext *) AH->formatData; + + AH->formatData = (lclContext *) malloc(sizeof(lclContext)); + if (AH->formatData == NULL) + die_horribly(AH, modulename, "out of memory\n"); + memcpy(AH->formatData, ctx, sizeof(lclContext)); + ctx = (lclContext *) AH->formatData; + + ctx->zp = (z_streamp) malloc(sizeof(z_stream)); + ctx->zlibOut = (char *) malloc(zlibOutSize + 1); + ctx->zlibIn = (char *) malloc(ctx->inSize); + + if (ctx->zp == NULL || ctx->zlibOut == NULL || ctx->zlibIn == NULL) + die_horribly(AH, modulename, "out of memory\n"); + + /* + * Note: we do not make a local lo_buf because we expect at most one + * BLOBS entry per archive, so no parallelism is possible. Likewise, + * TOC-entry-local state isn't an issue because any one TOC entry is + * touched by just one worker child. + */ +} + +static void +_DeClone(ArchiveHandle *AH) +{ + lclContext *ctx = (lclContext *) AH->formatData; + + free(ctx->zlibOut); + free(ctx->zlibIn); + free(ctx->zp); + free(ctx); +} |