diff options
Diffstat (limited to 'src/bin/pg_dump/pg_backup_custom.c')
-rw-r--r-- | src/bin/pg_dump/pg_backup_custom.c | 365 |
1 files changed, 74 insertions, 291 deletions
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c index 2bc7e8f13c2..f2969e2fac4 100644 --- a/src/bin/pg_dump/pg_backup_custom.c +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -25,6 +25,7 @@ */ #include "pg_backup_archiver.h" +#include "compress_io.h" /*-------- * Routines in the format interface @@ -58,20 +59,9 @@ static void _LoadBlobs(ArchiveHandle *AH, bool drop); static void _Clone(ArchiveHandle *AH); static void _DeClone(ArchiveHandle *AH); -/*------------ - * Buffers used in zlib compression and extra data stored in archive and - * in TOC entries. - *------------ - */ -#define zlibOutSize 4096 -#define zlibInSize 4096 - typedef struct { - z_streamp zp; - char *zlibOut; - char *zlibIn; - size_t inSize; + CompressorState *cs; int hasSeek; pgoff_t filePos; pgoff_t dataStart; @@ -89,10 +79,10 @@ typedef struct *------ */ static void _readBlockHeader(ArchiveHandle *AH, int *type, int *id); -static void _StartDataCompressor(ArchiveHandle *AH, TocEntry *te); -static void _EndDataCompressor(ArchiveHandle *AH, TocEntry *te); static pgoff_t _getFilePos(ArchiveHandle *AH, lclContext *ctx); -static int _DoDeflate(ArchiveHandle *AH, lclContext *ctx, int flush); + +static size_t _CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len); +static size_t _CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen); static const char *modulename = gettext_noop("custom archiver"); @@ -136,39 +126,20 @@ InitArchiveFmt_Custom(ArchiveHandle *AH) AH->ClonePtr = _Clone; AH->DeClonePtr = _DeClone; - /* - * Set up some special context used in compressing data. - */ + /* Set up a private area. */ ctx = (lclContext *) calloc(1, sizeof(lclContext)); if (ctx == NULL) die_horribly(AH, modulename, "out of memory\n"); AH->formatData = (void *) ctx; - ctx->zp = (z_streamp) malloc(sizeof(z_stream)); - if (ctx->zp == NULL) - die_horribly(AH, modulename, "out of memory\n"); - /* Initialize LO buffering */ AH->lo_buf_size = LOBBUFSIZE; AH->lo_buf = (void *) malloc(LOBBUFSIZE); if (AH->lo_buf == NULL) die_horribly(AH, modulename, "out of memory\n"); - /* - * zlibOutSize is the buffer size we tell zlib it can output to. We - * actually allocate one extra byte because some routines want to append a - * trailing zero byte to the zlib output. The input buffer is expansible - * and is always of size ctx->inSize; zlibInSize is just the initial - * default size for it. - */ - ctx->zlibOut = (char *) malloc(zlibOutSize + 1); - ctx->zlibIn = (char *) malloc(zlibInSize); - ctx->inSize = zlibInSize; ctx->filePos = 0; - if (ctx->zlibOut == NULL || ctx->zlibIn == NULL) - die_horribly(AH, modulename, "out of memory\n"); - /* * Now open the file */ @@ -324,7 +295,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te) _WriteByte(AH, BLK_DATA); /* Block type */ WriteInt(AH, te->dumpId); /* For sanity check */ - _StartDataCompressor(AH, te); + ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc); } /* @@ -340,17 +311,12 @@ static size_t _WriteData(ArchiveHandle *AH, const void *data, size_t dLen) { lclContext *ctx = (lclContext *) AH->formatData; - z_streamp zp = ctx->zp; + CompressorState *cs = ctx->cs; - zp->next_in = (void *) data; - zp->avail_in = dLen; + if (dLen == 0) + return 0; - while (zp->avail_in != 0) - { - /* printf("Deflating %lu bytes\n", (unsigned long) dLen); */ - _DoDeflate(AH, ctx, 0); - } - return dLen; + return WriteDataToArchive(AH, cs, data, dLen); } /* @@ -363,10 +329,11 @@ _WriteData(ArchiveHandle *AH, const void *data, size_t dLen) static void _EndData(ArchiveHandle *AH, TocEntry *te) { -/* lclContext *ctx = (lclContext *) AH->formatData; */ -/* lclTocEntry *tctx = (lclTocEntry *) te->formatData; */ + lclContext *ctx = (lclContext *) AH->formatData; - _EndDataCompressor(AH, te); + EndCompressor(AH, ctx->cs); + /* Send the end marker */ + WriteInt(AH, 0); } /* @@ -401,11 +368,14 @@ _StartBlobs(ArchiveHandle *AH, TocEntry *te) static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) { + lclContext *ctx = (lclContext *) AH->formatData; + if (oid == 0) die_horribly(AH, modulename, "invalid OID for large object\n"); WriteInt(AH, oid); - _StartDataCompressor(AH, te); + + ctx->cs = AllocateCompressor(AH->compression, _CustomWriteFunc); } /* @@ -416,7 +386,11 @@ _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid) { - _EndDataCompressor(AH, te); + lclContext *ctx = (lclContext *) AH->formatData; + + EndCompressor(AH, ctx->cs); + /* Send the end marker */ + WriteInt(AH, 0); } /* @@ -532,108 +506,7 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt) static void _PrintData(ArchiveHandle *AH) { - lclContext *ctx = (lclContext *) AH->formatData; - z_streamp zp = ctx->zp; - size_t blkLen; - char *in = ctx->zlibIn; - size_t cnt; - -#ifdef HAVE_LIBZ - int res; - char *out = ctx->zlibOut; -#endif - -#ifdef HAVE_LIBZ - - res = Z_OK; - - if (AH->compression != 0) - { - zp->zalloc = Z_NULL; - zp->zfree = Z_NULL; - zp->opaque = Z_NULL; - - if (inflateInit(zp) != Z_OK) - die_horribly(AH, modulename, "could not initialize compression library: %s\n", zp->msg); - } -#endif - - blkLen = ReadInt(AH); - while (blkLen != 0) - { - if (blkLen + 1 > ctx->inSize) - { - free(ctx->zlibIn); - ctx->zlibIn = NULL; - ctx->zlibIn = (char *) malloc(blkLen + 1); - if (!ctx->zlibIn) - die_horribly(AH, modulename, "out of memory\n"); - - ctx->inSize = blkLen + 1; - in = ctx->zlibIn; - } - - cnt = fread(in, 1, blkLen, AH->FH); - if (cnt != blkLen) - { - if (feof(AH->FH)) - die_horribly(AH, modulename, - "could not read from input file: end of file\n"); - else - die_horribly(AH, modulename, - "could not read from input file: %s\n", strerror(errno)); - } - - ctx->filePos += blkLen; - - zp->next_in = (void *) in; - zp->avail_in = blkLen; - -#ifdef HAVE_LIBZ - if (AH->compression != 0) - { - while (zp->avail_in != 0) - { - zp->next_out = (void *) out; - zp->avail_out = zlibOutSize; - res = inflate(zp, 0); - if (res != Z_OK && res != Z_STREAM_END) - die_horribly(AH, modulename, "could not uncompress data: %s\n", zp->msg); - - out[zlibOutSize - zp->avail_out] = '\0'; - ahwrite(out, 1, zlibOutSize - zp->avail_out, AH); - } - } - else -#endif - { - in[zp->avail_in] = '\0'; - ahwrite(in, 1, zp->avail_in, AH); - zp->avail_in = 0; - } - blkLen = ReadInt(AH); - } - -#ifdef HAVE_LIBZ - if (AH->compression != 0) - { - zp->next_in = NULL; - zp->avail_in = 0; - while (res != Z_STREAM_END) - { - zp->next_out = (void *) out; - zp->avail_out = zlibOutSize; - res = inflate(zp, 0); - if (res != Z_OK && res != Z_STREAM_END) - die_horribly(AH, modulename, "could not uncompress data: %s\n", zp->msg); - - out[zlibOutSize - zp->avail_out] = '\0'; - ahwrite(out, 1, zlibOutSize - zp->avail_out, AH); - } - if (inflateEnd(zp) != Z_OK) - die_horribly(AH, modulename, "could not close compression library: %s\n", zp->msg); - } -#endif + ReadDataFromArchive(AH, AH->compression, _CustomReadFunc); } static void @@ -684,20 +557,21 @@ _skipData(ArchiveHandle *AH) { lclContext *ctx = (lclContext *) AH->formatData; size_t blkLen; - char *in = ctx->zlibIn; + char *buf = NULL; + int buflen = 0; size_t cnt; blkLen = ReadInt(AH); while (blkLen != 0) { - if (blkLen > ctx->inSize) + if (blkLen > buflen) { - free(ctx->zlibIn); - ctx->zlibIn = (char *) malloc(blkLen); - ctx->inSize = blkLen; - in = ctx->zlibIn; + if (buf) + free(buf); + buf = (char *) malloc(blkLen); + buflen = blkLen; } - cnt = fread(in, 1, blkLen, AH->FH); + cnt = fread(buf, 1, blkLen, AH->FH); if (cnt != blkLen) { if (feof(AH->FH)) @@ -712,6 +586,9 @@ _skipData(ArchiveHandle *AH) blkLen = ReadInt(AH); } + + if (buf) + free(buf); } /* @@ -961,145 +838,58 @@ _readBlockHeader(ArchiveHandle *AH, int *type, int *id) } /* - * If zlib is available, then startit up. This is called from - * StartData & StartBlob. The buffers are setup in the Init routine. + * Callback function for WriteDataToArchive. Writes one block of (compressed) + * data to the archive. */ -static void -_StartDataCompressor(ArchiveHandle *AH, TocEntry *te) +static size_t +_CustomWriteFunc(ArchiveHandle *AH, const char *buf, size_t len) { - lclContext *ctx = (lclContext *) AH->formatData; - z_streamp zp = ctx->zp; - -#ifdef HAVE_LIBZ - - if (AH->compression < 0 || AH->compression > 9) - AH->compression = Z_DEFAULT_COMPRESSION; - - if (AH->compression != 0) - { - zp->zalloc = Z_NULL; - zp->zfree = Z_NULL; - zp->opaque = Z_NULL; - - if (deflateInit(zp, AH->compression) != Z_OK) - die_horribly(AH, modulename, "could not initialize compression library: %s\n", zp->msg); - } -#else + /* never write 0-byte blocks (this should not happen) */ + if (len == 0) + return 0; - AH->compression = 0; -#endif - - /* Just be paranoid - maybe End is called after Start, with no Write */ - zp->next_out = (void *) ctx->zlibOut; - zp->avail_out = zlibOutSize; + WriteInt(AH, len); + return _WriteBuf(AH, buf, len); } /* - * Send compressed data to the output stream (via ahwrite). - * Each data chunk is preceded by it's length. - * In the case of Z0, or no zlib, just write the raw data. - * + * Callback function for ReadDataFromArchive. To keep things simple, we + * always read one compressed block at a time. */ -static int -_DoDeflate(ArchiveHandle *AH, lclContext *ctx, int flush) +static size_t +_CustomReadFunc(ArchiveHandle *AH, char **buf, size_t *buflen) { - z_streamp zp = ctx->zp; + size_t blkLen; + size_t cnt; -#ifdef HAVE_LIBZ - char *out = ctx->zlibOut; - int res = Z_OK; + /* Read length */ + blkLen = ReadInt(AH); + if (blkLen == 0) + return 0; - if (AH->compression != 0) + /* If the caller's buffer is not large enough, allocate a bigger one */ + if (blkLen > *buflen) { - res = deflate(zp, flush); - if (res == Z_STREAM_ERROR) - die_horribly(AH, modulename, "could not compress data: %s\n", zp->msg); - - if (((flush == Z_FINISH) && (zp->avail_out < zlibOutSize)) - || (zp->avail_out == 0) - || (zp->avail_in != 0) - ) - { - /* - * Extra paranoia: avoid zero-length chunks since a zero length - * chunk is the EOF marker. This should never happen but... - */ - if (zp->avail_out < zlibOutSize) - { - /* - * printf("Wrote %lu byte deflated chunk\n", (unsigned long) - * (zlibOutSize - zp->avail_out)); - */ - WriteInt(AH, zlibOutSize - zp->avail_out); - if (fwrite(out, 1, zlibOutSize - zp->avail_out, AH->FH) != (zlibOutSize - zp->avail_out)) - die_horribly(AH, modulename, "could not write to output file: %s\n", strerror(errno)); - ctx->filePos += zlibOutSize - zp->avail_out; - } - zp->next_out = (void *) out; - zp->avail_out = zlibOutSize; - } + free(*buf); + *buf = (char *) malloc(blkLen); + if (!(*buf)) + die_horribly(AH, modulename, "out of memory\n"); + *buflen = blkLen; } - else -#endif - { - if (zp->avail_in > 0) - { - WriteInt(AH, zp->avail_in); - if (fwrite(zp->next_in, 1, zp->avail_in, AH->FH) != zp->avail_in) - die_horribly(AH, modulename, "could not write to output file: %s\n", strerror(errno)); - ctx->filePos += zp->avail_in; - zp->avail_in = 0; - } - else - { -#ifdef HAVE_LIBZ - if (flush == Z_FINISH) - res = Z_STREAM_END; -#endif - } - } - -#ifdef HAVE_LIBZ - return res; -#else - return 1; -#endif -} - -/* - * Terminate zlib context and flush it's buffers. If no zlib - * then just return. - */ -static void -_EndDataCompressor(ArchiveHandle *AH, TocEntry *te) -{ - -#ifdef HAVE_LIBZ - lclContext *ctx = (lclContext *) AH->formatData; - z_streamp zp = ctx->zp; - int res; - if (AH->compression != 0) + cnt = _ReadBuf(AH, *buf, blkLen); + if (cnt != blkLen) { - zp->next_in = NULL; - zp->avail_in = 0; - - do - { - /* printf("Ending data output\n"); */ - res = _DoDeflate(AH, ctx, Z_FINISH); - } while (res != Z_STREAM_END); - - if (deflateEnd(zp) != Z_OK) - die_horribly(AH, modulename, "could not close compression stream: %s\n", zp->msg); + if (feof(AH->FH)) + die_horribly(AH, modulename, + "could not read from input file: end of file\n"); + else + die_horribly(AH, modulename, + "could not read from input file: %s\n", strerror(errno)); } -#endif - - /* Send the end marker */ - WriteInt(AH, 0); + return cnt; } - /* * Clone format-specific fields during parallel restoration. */ @@ -1114,12 +904,9 @@ _Clone(ArchiveHandle *AH) memcpy(AH->formatData, ctx, sizeof(lclContext)); ctx = (lclContext *) AH->formatData; - ctx->zp = (z_streamp) malloc(sizeof(z_stream)); - ctx->zlibOut = (char *) malloc(zlibOutSize + 1); - ctx->zlibIn = (char *) malloc(ctx->inSize); - - if (ctx->zp == NULL || ctx->zlibOut == NULL || ctx->zlibIn == NULL) - die_horribly(AH, modulename, "out of memory\n"); + /* sanity check, shouldn't happen */ + if (ctx->cs != NULL) + die_horribly(AH, modulename, "compressor active\n"); /* * Note: we do not make a local lo_buf because we expect at most one BLOBS @@ -1133,9 +920,5 @@ static void _DeClone(ArchiveHandle *AH) { lclContext *ctx = (lclContext *) AH->formatData; - - free(ctx->zlibOut); - free(ctx->zlibIn); - free(ctx->zp); free(ctx); } |