aboutsummaryrefslogtreecommitdiff
path: root/src/fe_utils/astreamer_zstd.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fe_utils/astreamer_zstd.c')
-rw-r--r--src/fe_utils/astreamer_zstd.c368
1 files changed, 368 insertions, 0 deletions
diff --git a/src/fe_utils/astreamer_zstd.c b/src/fe_utils/astreamer_zstd.c
new file mode 100644
index 00000000000..45f6cb67363
--- /dev/null
+++ b/src/fe_utils/astreamer_zstd.c
@@ -0,0 +1,368 @@
+/*-------------------------------------------------------------------------
+ *
+ * astreamer_zstd.c
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/astreamer_zstd.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <unistd.h>
+
+#ifdef USE_ZSTD
+#include <zstd.h>
+#endif
+
+#include "common/logging.h"
+#include "fe_utils/astreamer.h"
+
+#ifdef USE_ZSTD
+
+typedef struct astreamer_zstd_frame
+{
+ astreamer base;
+
+ ZSTD_CCtx *cctx;
+ ZSTD_DCtx *dctx;
+ ZSTD_outBuffer zstd_outBuf;
+} astreamer_zstd_frame;
+
+static void astreamer_zstd_compressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_zstd_compressor_finalize(astreamer *streamer);
+static void astreamer_zstd_compressor_free(astreamer *streamer);
+
+static const astreamer_ops astreamer_zstd_compressor_ops = {
+ .content = astreamer_zstd_compressor_content,
+ .finalize = astreamer_zstd_compressor_finalize,
+ .free = astreamer_zstd_compressor_free
+};
+
+static void astreamer_zstd_decompressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context);
+static void astreamer_zstd_decompressor_finalize(astreamer *streamer);
+static void astreamer_zstd_decompressor_free(astreamer *streamer);
+
+static const astreamer_ops astreamer_zstd_decompressor_ops = {
+ .content = astreamer_zstd_decompressor_content,
+ .finalize = astreamer_zstd_decompressor_finalize,
+ .free = astreamer_zstd_decompressor_free
+};
+#endif
+
+/*
+ * Create a new base backup streamer that performs zstd compression of tar
+ * blocks.
+ */
+astreamer *
+astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress)
+{
+#ifdef USE_ZSTD
+ astreamer_zstd_frame *streamer;
+ size_t ret;
+
+ Assert(next != NULL);
+
+ streamer = palloc0(sizeof(astreamer_zstd_frame));
+
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_zstd_compressor_ops;
+
+ streamer->base.bbs_next = next;
+ initStringInfo(&streamer->base.bbs_buffer);
+ enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
+
+ streamer->cctx = ZSTD_createCCtx();
+ if (!streamer->cctx)
+ pg_fatal("could not create zstd compression context");
+
+ /* Set compression level */
+ ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
+ compress->level);
+ if (ZSTD_isError(ret))
+ pg_fatal("could not set zstd compression level to %d: %s",
+ compress->level, ZSTD_getErrorName(ret));
+
+ /* Set # of workers, if specified */
+ if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
+ {
+ /*
+ * On older versions of libzstd, this option does not exist, and
+ * trying to set it will fail. Similarly for newer versions if they
+ * are compiled without threading support.
+ */
+ ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
+ compress->workers);
+ if (ZSTD_isError(ret))
+ pg_fatal("could not set compression worker count to %d: %s",
+ compress->workers, ZSTD_getErrorName(ret));
+ }
+
+ if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
+ {
+ ret = ZSTD_CCtx_setParameter(streamer->cctx,
+ ZSTD_c_enableLongDistanceMatching,
+ compress->long_distance);
+ if (ZSTD_isError(ret))
+ {
+ pg_log_error("could not enable long-distance mode: %s",
+ ZSTD_getErrorName(ret));
+ exit(1);
+ }
+ }
+
+ /* Initialize the ZSTD output buffer. */
+ streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
+ streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
+ streamer->zstd_outBuf.pos = 0;
+
+ return &streamer->base;
+#else
+ pg_fatal("this build does not support compression with %s", "ZSTD");
+ return NULL; /* keep compiler quiet */
+#endif
+}
+
+#ifdef USE_ZSTD
+/*
+ * Compress the input data to output buffer.
+ *
+ * Find out the compression bound based on input data length for each
+ * invocation to make sure that output buffer has enough capacity to
+ * accommodate the compressed data. In case if the output buffer
+ * capacity falls short of compression bound then forward the content
+ * of output buffer to next streamer and empty the buffer.
+ */
+static void
+astreamer_zstd_compressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
+{
+ astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
+ ZSTD_inBuffer inBuf = {data, len, 0};
+
+ while (inBuf.pos < inBuf.size)
+ {
+ size_t yet_to_flush;
+ size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
+
+ /*
+ * If the output buffer is not left with enough space, send the
+ * compressed bytes to the next streamer, and empty the buffer.
+ */
+ if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
+ max_needed)
+ {
+ astreamer_content(mystreamer->base.bbs_next, member,
+ mystreamer->zstd_outBuf.dst,
+ mystreamer->zstd_outBuf.pos,
+ context);
+
+ /* Reset the ZSTD output buffer. */
+ mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
+ mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
+ mystreamer->zstd_outBuf.pos = 0;
+ }
+
+ yet_to_flush =
+ ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
+ &inBuf, ZSTD_e_continue);
+
+ if (ZSTD_isError(yet_to_flush))
+ pg_log_error("could not compress data: %s",
+ ZSTD_getErrorName(yet_to_flush));
+ }
+}
+
+/*
+ * End-of-stream processing.
+ */
+static void
+astreamer_zstd_compressor_finalize(astreamer *streamer)
+{
+ astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
+ size_t yet_to_flush;
+
+ do
+ {
+ ZSTD_inBuffer in = {NULL, 0, 0};
+ size_t max_needed = ZSTD_compressBound(0);
+
+ /*
+ * If the output buffer is not left with enough space, send the
+ * compressed bytes to the next streamer, and empty the buffer.
+ */
+ if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
+ max_needed)
+ {
+ astreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->zstd_outBuf.dst,
+ mystreamer->zstd_outBuf.pos,
+ ASTREAMER_UNKNOWN);
+
+ /* Reset the ZSTD output buffer. */
+ mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
+ mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
+ mystreamer->zstd_outBuf.pos = 0;
+ }
+
+ yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
+ &mystreamer->zstd_outBuf,
+ &in, ZSTD_e_end);
+
+ if (ZSTD_isError(yet_to_flush))
+ pg_log_error("could not compress data: %s",
+ ZSTD_getErrorName(yet_to_flush));
+
+ } while (yet_to_flush > 0);
+
+ /* Make sure to pass any remaining bytes to the next streamer. */
+ if (mystreamer->zstd_outBuf.pos > 0)
+ astreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->zstd_outBuf.dst,
+ mystreamer->zstd_outBuf.pos,
+ ASTREAMER_UNKNOWN);
+
+ astreamer_finalize(mystreamer->base.bbs_next);
+}
+
+/*
+ * Free memory.
+ */
+static void
+astreamer_zstd_compressor_free(astreamer *streamer)
+{
+ astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
+
+ astreamer_free(streamer->bbs_next);
+ ZSTD_freeCCtx(mystreamer->cctx);
+ pfree(streamer->bbs_buffer.data);
+ pfree(streamer);
+}
+#endif
+
+/*
+ * Create a new base backup streamer that performs decompression of zstd
+ * compressed blocks.
+ */
+astreamer *
+astreamer_zstd_decompressor_new(astreamer *next)
+{
+#ifdef USE_ZSTD
+ astreamer_zstd_frame *streamer;
+
+ Assert(next != NULL);
+
+ streamer = palloc0(sizeof(astreamer_zstd_frame));
+ *((const astreamer_ops **) &streamer->base.bbs_ops) =
+ &astreamer_zstd_decompressor_ops;
+
+ streamer->base.bbs_next = next;
+ initStringInfo(&streamer->base.bbs_buffer);
+ enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
+
+ streamer->dctx = ZSTD_createDCtx();
+ if (!streamer->dctx)
+ pg_fatal("could not create zstd decompression context");
+
+ /* Initialize the ZSTD output buffer. */
+ streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
+ streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
+ streamer->zstd_outBuf.pos = 0;
+
+ return &streamer->base;
+#else
+ pg_fatal("this build does not support compression with %s", "ZSTD");
+ return NULL; /* keep compiler quiet */
+#endif
+}
+
+#ifdef USE_ZSTD
+/*
+ * Decompress the input data to output buffer until we run out of input
+ * data. Each time the output buffer is full, pass on the decompressed data
+ * to the next streamer.
+ */
+static void
+astreamer_zstd_decompressor_content(astreamer *streamer,
+ astreamer_member *member,
+ const char *data, int len,
+ astreamer_archive_context context)
+{
+ astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
+ ZSTD_inBuffer inBuf = {data, len, 0};
+
+ while (inBuf.pos < inBuf.size)
+ {
+ size_t ret;
+
+ /*
+ * If output buffer is full then forward the content to next streamer
+ * and update the output buffer.
+ */
+ if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
+ {
+ astreamer_content(mystreamer->base.bbs_next, member,
+ mystreamer->zstd_outBuf.dst,
+ mystreamer->zstd_outBuf.pos,
+ context);
+
+ /* Reset the ZSTD output buffer. */
+ mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
+ mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
+ mystreamer->zstd_outBuf.pos = 0;
+ }
+
+ ret = ZSTD_decompressStream(mystreamer->dctx,
+ &mystreamer->zstd_outBuf, &inBuf);
+
+ if (ZSTD_isError(ret))
+ pg_log_error("could not decompress data: %s",
+ ZSTD_getErrorName(ret));
+ }
+}
+
+/*
+ * End-of-stream processing.
+ */
+static void
+astreamer_zstd_decompressor_finalize(astreamer *streamer)
+{
+ astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
+
+ /*
+ * End of the stream, if there is some pending data in output buffers then
+ * we must forward it to next streamer.
+ */
+ if (mystreamer->zstd_outBuf.pos > 0)
+ astreamer_content(mystreamer->base.bbs_next, NULL,
+ mystreamer->base.bbs_buffer.data,
+ mystreamer->base.bbs_buffer.maxlen,
+ ASTREAMER_UNKNOWN);
+
+ astreamer_finalize(mystreamer->base.bbs_next);
+}
+
+/*
+ * Free memory.
+ */
+static void
+astreamer_zstd_decompressor_free(astreamer *streamer)
+{
+ astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
+
+ astreamer_free(streamer->bbs_next);
+ ZSTD_freeDCtx(mystreamer->dctx);
+ pfree(streamer->bbs_buffer.data);
+ pfree(streamer);
+}
+#endif