diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 257 |
1 files changed, 50 insertions, 207 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index bfb7d1a261c..8d96c926b4f 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -236,20 +236,6 @@ static ApplyErrorCallbackArg apply_error_callback_arg = .ts = 0, }; -/* - * Stream xid hash entry. Whenever we see a new xid we create this entry in the - * xidhash and along with it create the streaming file and store the fileset handle. - * The subxact file is created iff there is any subxact info under this xid. This - * entry is used on the subsequent streams for the xid to get the corresponding - * fileset handles, so storing them in hash makes the search faster. - */ -typedef struct StreamXidHash -{ - TransactionId xid; /* xid is the hash key and must be first */ - FileSet *stream_fileset; /* file set for stream data */ - FileSet *subxact_fileset; /* file set for subxact info */ -} StreamXidHash; - static MemoryContext ApplyMessageContext = NULL; MemoryContext ApplyContext = NULL; @@ -269,12 +255,6 @@ static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; -/* - * Hash table for storing the streaming xid information along with filesets - * for streaming and subxact files. - */ -static HTAB *xidhash = NULL; - /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; @@ -1118,7 +1098,6 @@ static void apply_handle_stream_start(StringInfo s) { bool first_segment; - HASHCTL hash_ctl; if (in_streamed_transaction) ereport(ERROR, @@ -1148,17 +1127,23 @@ apply_handle_stream_start(StringInfo s) set_apply_error_context_xact(stream_xid, 0); /* - * Initialize the xidhash table if we haven't yet. This will be used for - * the entire duration of the apply worker so create it in permanent - * context. + * Initialize the worker's stream_fileset if we haven't yet. This will be + * used for the entire duration of the worker so create it in a permanent + * context. We create this on the very first streaming message from any + * transaction and then use it for this and other streaming transactions. + * Now, we could create a fileset at the start of the worker as well but + * then we won't be sure that it will ever be used. */ - if (xidhash == NULL) + if (MyLogicalRepWorker->stream_fileset == NULL) { - hash_ctl.keysize = sizeof(TransactionId); - hash_ctl.entrysize = sizeof(StreamXidHash); - hash_ctl.hcxt = ApplyContext; - xidhash = hash_create("StreamXidHash", 1024, &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + MemoryContext oldctx; + + oldctx = MemoryContextSwitchTo(ApplyContext); + + MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet)); + FileSetInit(MyLogicalRepWorker->stream_fileset); + + MemoryContextSwitchTo(oldctx); } /* open the spool file for this transaction */ @@ -1253,7 +1238,6 @@ apply_handle_stream_abort(StringInfo s) BufFile *fd; bool found = false; char path[MAXPGPATH]; - StreamXidHash *ent; set_apply_error_context_xact(subxid, 0); @@ -1285,19 +1269,10 @@ apply_handle_stream_abort(StringInfo s) return; } - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - /* open the changes file */ changes_filename(path, MyLogicalRepWorker->subid, xid); - fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR); + fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, + O_RDWR, false); /* OK, truncate the file at the right offset */ BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno, @@ -1327,7 +1302,6 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) int nchanges; char path[MAXPGPATH]; char *buffer = NULL; - StreamXidHash *ent; MemoryContext oldcxt; BufFile *fd; @@ -1345,17 +1319,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) changes_filename(path, MyLogicalRepWorker->subid, xid); elog(DEBUG1, "replaying changes from file \"%s\"", path); - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - - fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDONLY); + fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY, + false); buffer = palloc(BLCKSZ); initStringInfo(&s2); @@ -2542,30 +2507,6 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) } /* - * Cleanup filesets. - */ -void -logicalrep_worker_cleanupfileset(void) -{ - HASH_SEQ_STATUS status; - StreamXidHash *hentry; - - /* Remove all the pending stream and subxact filesets. */ - if (xidhash) - { - hash_seq_init(&status, xidhash); - while ((hentry = (StreamXidHash *) hash_seq_search(&status)) != NULL) - { - FileSetDeleteAll(hentry->stream_fileset); - - /* Delete the subxact fileset iff it is created. */ - if (hentry->subxact_fileset) - FileSetDeleteAll(hentry->subxact_fileset); - } - } -} - -/* * Apply main loop. */ static void @@ -3026,58 +2967,30 @@ subxact_info_write(Oid subid, TransactionId xid) { char path[MAXPGPATH]; Size len; - StreamXidHash *ent; BufFile *fd; Assert(TransactionIdIsValid(xid)); - /* Find the xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - /* By this time we must have created the transaction entry */ - Assert(ent); + /* construct the subxact filename */ + subxact_filename(path, subid, xid); - /* - * If there is no subtransaction then nothing to do, but if already have - * subxact file then delete that. - */ + /* Delete the subxacts file, if exists. */ if (subxact_data.nsubxacts == 0) { - if (ent->subxact_fileset) - { - cleanup_subxact_info(); - FileSetDeleteAll(ent->subxact_fileset); - pfree(ent->subxact_fileset); - ent->subxact_fileset = NULL; - } + cleanup_subxact_info(); + BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true); + return; } - subxact_filename(path, subid, xid); - /* * Create the subxact file if it not already created, otherwise open the * existing file. */ - if (ent->subxact_fileset == NULL) - { - MemoryContext oldctx; - - /* - * We need to maintain fileset across multiple stream start/stop - * calls. So, need to allocate it in a persistent context. - */ - oldctx = MemoryContextSwitchTo(ApplyContext); - ent->subxact_fileset = palloc(sizeof(FileSet)); - FileSetInit(ent->subxact_fileset); - MemoryContextSwitchTo(oldctx); - - fd = BufFileCreateFileSet(ent->subxact_fileset, path); - } - else - fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDWR); + fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR, + true); + if (fd == NULL) + fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path); len = sizeof(SubXactInfo) * subxact_data.nsubxacts; @@ -3104,34 +3017,21 @@ subxact_info_read(Oid subid, TransactionId xid) char path[MAXPGPATH]; Size len; BufFile *fd; - StreamXidHash *ent; MemoryContext oldctx; Assert(!subxact_data.subxacts); Assert(subxact_data.nsubxacts == 0); Assert(subxact_data.nsubxacts_max == 0); - /* Find the stream xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - /* - * If subxact_fileset is not valid that mean we don't have any subxact - * info + * If the subxact file doesn't exist that means we don't have any subxact + * info. */ - if (ent->subxact_fileset == NULL) - return; - subxact_filename(path, subid, xid); - - fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDONLY); + fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY, + true); + if (fd == NULL) + return; /* read number of subxact items */ if (BufFileRead(fd, &subxact_data.nsubxacts, @@ -3267,42 +3167,21 @@ changes_filename(char *path, Oid subid, TransactionId xid) * Cleanup files for a subscription / toplevel transaction. * * Remove files with serialized changes and subxact info for a particular - * toplevel transaction. Each subscription has a separate set of files. + * toplevel transaction. Each subscription has a separate set of files + * for any toplevel transaction. */ static void stream_cleanup_files(Oid subid, TransactionId xid) { char path[MAXPGPATH]; - StreamXidHash *ent; - - /* Find the xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - /* Delete the change file and release the stream fileset memory */ + /* Delete the changes file. */ changes_filename(path, subid, xid); - FileSetDeleteAll(ent->stream_fileset); - pfree(ent->stream_fileset); - ent->stream_fileset = NULL; + BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false); - /* Delete the subxact file and release the memory, if it exist */ - if (ent->subxact_fileset) - { - subxact_filename(path, subid, xid); - FileSetDeleteAll(ent->subxact_fileset); - pfree(ent->subxact_fileset); - ent->subxact_fileset = NULL; - } - - /* Remove the xid entry from the stream xid hash */ - hash_search(xidhash, (void *) &xid, HASH_REMOVE, NULL); + /* Delete the subxact file, if it exists. */ + subxact_filename(path, subid, xid); + BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true); } /* @@ -3312,8 +3191,8 @@ stream_cleanup_files(Oid subid, TransactionId xid) * * Open a file for streamed changes from a toplevel transaction identified * by stream_xid (global variable). If it's the first chunk of streamed - * changes for this transaction, initialize the fileset and create the buffile, - * otherwise open the previously created file. + * changes for this transaction, create the buffile, otherwise open the + * previously created file. * * This can only be called at the beginning of a "streaming" block, i.e. * between stream_start/stream_stop messages from the upstream. @@ -3322,20 +3201,13 @@ static void stream_open_file(Oid subid, TransactionId xid, bool first_segment) { char path[MAXPGPATH]; - bool found; MemoryContext oldcxt; - StreamXidHash *ent; Assert(in_streamed_transaction); Assert(OidIsValid(subid)); Assert(TransactionIdIsValid(xid)); Assert(stream_fd == NULL); - /* create or find the xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_ENTER, - &found); changes_filename(path, subid, xid); elog(DEBUG1, "opening file \"%s\" for streamed changes", path); @@ -3347,49 +3219,20 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) oldcxt = MemoryContextSwitchTo(LogicalStreamingContext); /* - * If this is the first streamed segment, the file must not exist, so make - * sure we're the ones creating it. Otherwise just open the file for - * writing, in append mode. + * If this is the first streamed segment, create the changes file. + * Otherwise, just open the file for writing, in append mode. */ if (first_segment) - { - MemoryContext savectx; - FileSet *fileset; - - if (found) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("incorrect first-segment flag for streamed replication transaction"))); - - /* - * We need to maintain fileset across multiple stream start/stop - * calls. So, need to allocate it in a persistent context. - */ - savectx = MemoryContextSwitchTo(ApplyContext); - fileset = palloc(sizeof(FileSet)); - - FileSetInit(fileset); - MemoryContextSwitchTo(savectx); - - stream_fd = BufFileCreateFileSet(fileset, path); - - /* Remember the fileset for the next stream of the same transaction */ - ent->xid = xid; - ent->stream_fileset = fileset; - ent->subxact_fileset = NULL; - } + stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, + path); else { - if (!found) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("incorrect first-segment flag for streamed replication transaction"))); - /* * Open the file and seek to the end of the file because we always * append the changes file. */ - stream_fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR); + stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, + path, O_RDWR, false); BufFileSeek(stream_fd, 0, 0, SEEK_END); } |