aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-09-02 08:13:46 +0530
committerAmit Kapila <akapila@postgresql.org>2021-09-02 08:13:46 +0530
commit31c389d8de915b705ee06c7a6e9246e20f36b9dc (patch)
treed90d353d2b81389163684cacadc780bf90dbeeec /src/backend/replication/logical/worker.c
parent163074ea84efec6ffa4813db43cc956ac5d12565 (diff)
downloadpostgresql-31c389d8de915b705ee06c7a6e9246e20f36b9dc.tar.gz
postgresql-31c389d8de915b705ee06c7a6e9246e20f36b9dc.zip
Optimize fileset usage in apply worker.
Use one fileset for the entire worker lifetime instead of using separate filesets for each streaming transaction. Now, the changes/subxacts files for every streaming transaction will be created under the same fileset and the files will be deleted after the transaction is completed. This patch extends the BufFileOpenFileSet and BufFileDeleteFileSet APIs to allow users to specify whether to give an error on missing files. Author: Dilip Kumar, based on suggestion by Thomas Munro Reviewed-by: Hou Zhijie, Masahiko Sawada, Amit Kapila Discussion: https://postgr.es/m/E1mCC6U-0004Ik-Fs@gemulon.postgresql.org
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c257
1 files changed, 50 insertions, 207 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index bfb7d1a261c..8d96c926b4f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -236,20 +236,6 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.ts = 0,
};
-/*
- * Stream xid hash entry. Whenever we see a new xid we create this entry in the
- * xidhash and along with it create the streaming file and store the fileset handle.
- * The subxact file is created iff there is any subxact info under this xid. This
- * entry is used on the subsequent streams for the xid to get the corresponding
- * fileset handles, so storing them in hash makes the search faster.
- */
-typedef struct StreamXidHash
-{
- TransactionId xid; /* xid is the hash key and must be first */
- FileSet *stream_fileset; /* file set for stream data */
- FileSet *subxact_fileset; /* file set for subxact info */
-} StreamXidHash;
-
static MemoryContext ApplyMessageContext = NULL;
MemoryContext ApplyContext = NULL;
@@ -269,12 +255,6 @@ static bool in_streamed_transaction = false;
static TransactionId stream_xid = InvalidTransactionId;
-/*
- * Hash table for storing the streaming xid information along with filesets
- * for streaming and subxact files.
- */
-static HTAB *xidhash = NULL;
-
/* BufFile handle of the current streaming file */
static BufFile *stream_fd = NULL;
@@ -1118,7 +1098,6 @@ static void
apply_handle_stream_start(StringInfo s)
{
bool first_segment;
- HASHCTL hash_ctl;
if (in_streamed_transaction)
ereport(ERROR,
@@ -1148,17 +1127,23 @@ apply_handle_stream_start(StringInfo s)
set_apply_error_context_xact(stream_xid, 0);
/*
- * Initialize the xidhash table if we haven't yet. This will be used for
- * the entire duration of the apply worker so create it in permanent
- * context.
+ * Initialize the worker's stream_fileset if we haven't yet. This will be
+ * used for the entire duration of the worker so create it in a permanent
+ * context. We create this on the very first streaming message from any
+ * transaction and then use it for this and other streaming transactions.
+ * Now, we could create a fileset at the start of the worker as well but
+ * then we won't be sure that it will ever be used.
*/
- if (xidhash == NULL)
+ if (MyLogicalRepWorker->stream_fileset == NULL)
{
- hash_ctl.keysize = sizeof(TransactionId);
- hash_ctl.entrysize = sizeof(StreamXidHash);
- hash_ctl.hcxt = ApplyContext;
- xidhash = hash_create("StreamXidHash", 1024, &hash_ctl,
- HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ MemoryContext oldctx;
+
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+
+ MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
+ FileSetInit(MyLogicalRepWorker->stream_fileset);
+
+ MemoryContextSwitchTo(oldctx);
}
/* open the spool file for this transaction */
@@ -1253,7 +1238,6 @@ apply_handle_stream_abort(StringInfo s)
BufFile *fd;
bool found = false;
char path[MAXPGPATH];
- StreamXidHash *ent;
set_apply_error_context_xact(subxid, 0);
@@ -1285,19 +1269,10 @@ apply_handle_stream_abort(StringInfo s)
return;
}
- ent = (StreamXidHash *) hash_search(xidhash,
- (void *) &xid,
- HASH_FIND,
- NULL);
- if (!ent)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("transaction %u not found in stream XID hash table",
- xid)));
-
/* open the changes file */
changes_filename(path, MyLogicalRepWorker->subid, xid);
- fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
+ fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
+ O_RDWR, false);
/* OK, truncate the file at the right offset */
BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
@@ -1327,7 +1302,6 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
int nchanges;
char path[MAXPGPATH];
char *buffer = NULL;
- StreamXidHash *ent;
MemoryContext oldcxt;
BufFile *fd;
@@ -1345,17 +1319,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
changes_filename(path, MyLogicalRepWorker->subid, xid);
elog(DEBUG1, "replaying changes from file \"%s\"", path);
- ent = (StreamXidHash *) hash_search(xidhash,
- (void *) &xid,
- HASH_FIND,
- NULL);
- if (!ent)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("transaction %u not found in stream XID hash table",
- xid)));
-
- fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDONLY);
+ fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
+ false);
buffer = palloc(BLCKSZ);
initStringInfo(&s2);
@@ -2542,30 +2507,6 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
}
/*
- * Cleanup filesets.
- */
-void
-logicalrep_worker_cleanupfileset(void)
-{
- HASH_SEQ_STATUS status;
- StreamXidHash *hentry;
-
- /* Remove all the pending stream and subxact filesets. */
- if (xidhash)
- {
- hash_seq_init(&status, xidhash);
- while ((hentry = (StreamXidHash *) hash_seq_search(&status)) != NULL)
- {
- FileSetDeleteAll(hentry->stream_fileset);
-
- /* Delete the subxact fileset iff it is created. */
- if (hentry->subxact_fileset)
- FileSetDeleteAll(hentry->subxact_fileset);
- }
- }
-}
-
-/*
* Apply main loop.
*/
static void
@@ -3026,58 +2967,30 @@ subxact_info_write(Oid subid, TransactionId xid)
{
char path[MAXPGPATH];
Size len;
- StreamXidHash *ent;
BufFile *fd;
Assert(TransactionIdIsValid(xid));
- /* Find the xid entry in the xidhash */
- ent = (StreamXidHash *) hash_search(xidhash,
- (void *) &xid,
- HASH_FIND,
- NULL);
- /* By this time we must have created the transaction entry */
- Assert(ent);
+ /* construct the subxact filename */
+ subxact_filename(path, subid, xid);
- /*
- * If there is no subtransaction then nothing to do, but if already have
- * subxact file then delete that.
- */
+ /* Delete the subxacts file, if exists. */
if (subxact_data.nsubxacts == 0)
{
- if (ent->subxact_fileset)
- {
- cleanup_subxact_info();
- FileSetDeleteAll(ent->subxact_fileset);
- pfree(ent->subxact_fileset);
- ent->subxact_fileset = NULL;
- }
+ cleanup_subxact_info();
+ BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
+
return;
}
- subxact_filename(path, subid, xid);
-
/*
* Create the subxact file if it not already created, otherwise open the
* existing file.
*/
- if (ent->subxact_fileset == NULL)
- {
- MemoryContext oldctx;
-
- /*
- * We need to maintain fileset across multiple stream start/stop
- * calls. So, need to allocate it in a persistent context.
- */
- oldctx = MemoryContextSwitchTo(ApplyContext);
- ent->subxact_fileset = palloc(sizeof(FileSet));
- FileSetInit(ent->subxact_fileset);
- MemoryContextSwitchTo(oldctx);
-
- fd = BufFileCreateFileSet(ent->subxact_fileset, path);
- }
- else
- fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDWR);
+ fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
+ true);
+ if (fd == NULL)
+ fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
@@ -3104,34 +3017,21 @@ subxact_info_read(Oid subid, TransactionId xid)
char path[MAXPGPATH];
Size len;
BufFile *fd;
- StreamXidHash *ent;
MemoryContext oldctx;
Assert(!subxact_data.subxacts);
Assert(subxact_data.nsubxacts == 0);
Assert(subxact_data.nsubxacts_max == 0);
- /* Find the stream xid entry in the xidhash */
- ent = (StreamXidHash *) hash_search(xidhash,
- (void *) &xid,
- HASH_FIND,
- NULL);
- if (!ent)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("transaction %u not found in stream XID hash table",
- xid)));
-
/*
- * If subxact_fileset is not valid that mean we don't have any subxact
- * info
+ * If the subxact file doesn't exist that means we don't have any subxact
+ * info.
*/
- if (ent->subxact_fileset == NULL)
- return;
-
subxact_filename(path, subid, xid);
-
- fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDONLY);
+ fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
+ true);
+ if (fd == NULL)
+ return;
/* read number of subxact items */
if (BufFileRead(fd, &subxact_data.nsubxacts,
@@ -3267,42 +3167,21 @@ changes_filename(char *path, Oid subid, TransactionId xid)
* Cleanup files for a subscription / toplevel transaction.
*
* Remove files with serialized changes and subxact info for a particular
- * toplevel transaction. Each subscription has a separate set of files.
+ * toplevel transaction. Each subscription has a separate set of files
+ * for any toplevel transaction.
*/
static void
stream_cleanup_files(Oid subid, TransactionId xid)
{
char path[MAXPGPATH];
- StreamXidHash *ent;
-
- /* Find the xid entry in the xidhash */
- ent = (StreamXidHash *) hash_search(xidhash,
- (void *) &xid,
- HASH_FIND,
- NULL);
- if (!ent)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("transaction %u not found in stream XID hash table",
- xid)));
- /* Delete the change file and release the stream fileset memory */
+ /* Delete the changes file. */
changes_filename(path, subid, xid);
- FileSetDeleteAll(ent->stream_fileset);
- pfree(ent->stream_fileset);
- ent->stream_fileset = NULL;
+ BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
- /* Delete the subxact file and release the memory, if it exist */
- if (ent->subxact_fileset)
- {
- subxact_filename(path, subid, xid);
- FileSetDeleteAll(ent->subxact_fileset);
- pfree(ent->subxact_fileset);
- ent->subxact_fileset = NULL;
- }
-
- /* Remove the xid entry from the stream xid hash */
- hash_search(xidhash, (void *) &xid, HASH_REMOVE, NULL);
+ /* Delete the subxact file, if it exists. */
+ subxact_filename(path, subid, xid);
+ BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
}
/*
@@ -3312,8 +3191,8 @@ stream_cleanup_files(Oid subid, TransactionId xid)
*
* Open a file for streamed changes from a toplevel transaction identified
* by stream_xid (global variable). If it's the first chunk of streamed
- * changes for this transaction, initialize the fileset and create the buffile,
- * otherwise open the previously created file.
+ * changes for this transaction, create the buffile, otherwise open the
+ * previously created file.
*
* This can only be called at the beginning of a "streaming" block, i.e.
* between stream_start/stream_stop messages from the upstream.
@@ -3322,20 +3201,13 @@ static void
stream_open_file(Oid subid, TransactionId xid, bool first_segment)
{
char path[MAXPGPATH];
- bool found;
MemoryContext oldcxt;
- StreamXidHash *ent;
Assert(in_streamed_transaction);
Assert(OidIsValid(subid));
Assert(TransactionIdIsValid(xid));
Assert(stream_fd == NULL);
- /* create or find the xid entry in the xidhash */
- ent = (StreamXidHash *) hash_search(xidhash,
- (void *) &xid,
- HASH_ENTER,
- &found);
changes_filename(path, subid, xid);
elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
@@ -3347,49 +3219,20 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
/*
- * If this is the first streamed segment, the file must not exist, so make
- * sure we're the ones creating it. Otherwise just open the file for
- * writing, in append mode.
+ * If this is the first streamed segment, create the changes file.
+ * Otherwise, just open the file for writing, in append mode.
*/
if (first_segment)
- {
- MemoryContext savectx;
- FileSet *fileset;
-
- if (found)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
-
- /*
- * We need to maintain fileset across multiple stream start/stop
- * calls. So, need to allocate it in a persistent context.
- */
- savectx = MemoryContextSwitchTo(ApplyContext);
- fileset = palloc(sizeof(FileSet));
-
- FileSetInit(fileset);
- MemoryContextSwitchTo(savectx);
-
- stream_fd = BufFileCreateFileSet(fileset, path);
-
- /* Remember the fileset for the next stream of the same transaction */
- ent->xid = xid;
- ent->stream_fileset = fileset;
- ent->subxact_fileset = NULL;
- }
+ stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
+ path);
else
{
- if (!found)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
-
/*
* Open the file and seek to the end of the file because we always
* append the changes file.
*/
- stream_fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
+ stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
+ path, O_RDWR, false);
BufFileSeek(stream_fd, 0, 0, SEEK_END);
}