aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/launcher.c6
-rw-r--r--src/backend/replication/logical/worker.c257
-rw-r--r--src/backend/storage/file/buffile.c22
-rw-r--r--src/backend/utils/sort/logtape.c2
-rw-r--r--src/backend/utils/sort/sharedtuplestore.c3
-rw-r--r--src/include/replication/worker_internal.h10
-rw-r--r--src/include/storage/buffile.h5
7 files changed, 86 insertions, 219 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 8b1772db69e..3fb4caa8033 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -379,6 +379,7 @@ retry:
worker->relid = relid;
worker->relstate = SUBREL_STATE_UNKNOWN;
worker->relstate_lsn = InvalidXLogRecPtr;
+ worker->stream_fileset = NULL;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
TIMESTAMP_NOBEGIN(worker->last_recv_time);
@@ -648,8 +649,9 @@ logicalrep_worker_onexit(int code, Datum arg)
logicalrep_worker_detach();
- /* Cleanup filesets used for streaming transactions. */
- logicalrep_worker_cleanupfileset();
+ /* Cleanup fileset used for streaming transactions. */
+ if (MyLogicalRepWorker->stream_fileset != NULL)
+ FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
ApplyLauncherWakeup();
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index bfb7d1a261c..8d96c926b4f 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -236,20 +236,6 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.ts = 0,
};
-/*
- * Stream xid hash entry. Whenever we see a new xid we create this entry in the
- * xidhash and along with it create the streaming file and store the fileset handle.
- * The subxact file is created iff there is any subxact info under this xid. This
- * entry is used on the subsequent streams for the xid to get the corresponding
- * fileset handles, so storing them in hash makes the search faster.
- */
-typedef struct StreamXidHash
-{
- TransactionId xid; /* xid is the hash key and must be first */
- FileSet *stream_fileset; /* file set for stream data */
- FileSet *subxact_fileset; /* file set for subxact info */
-} StreamXidHash;
-
static MemoryContext ApplyMessageContext = NULL;
MemoryContext ApplyContext = NULL;
@@ -269,12 +255,6 @@ static bool in_streamed_transaction = false;
static TransactionId stream_xid = InvalidTransactionId;
-/*
- * Hash table for storing the streaming xid information along with filesets
- * for streaming and subxact files.
- */
-static HTAB *xidhash = NULL;
-
/* BufFile handle of the current streaming file */
static BufFile *stream_fd = NULL;
@@ -1118,7 +1098,6 @@ static void
apply_handle_stream_start(StringInfo s)
{
bool first_segment;
- HASHCTL hash_ctl;
if (in_streamed_transaction)
ereport(ERROR,
@@ -1148,17 +1127,23 @@ apply_handle_stream_start(StringInfo s)
set_apply_error_context_xact(stream_xid, 0);
/*
- * Initialize the xidhash table if we haven't yet. This will be used for
- * the entire duration of the apply worker so create it in permanent
- * context.
+ * Initialize the worker's stream_fileset if we haven't yet. This will be
+ * used for the entire duration of the worker so create it in a permanent
+ * context. We create this on the very first streaming message from any
+ * transaction and then use it for this and other streaming transactions.
+ * Now, we could create a fileset at the start of the worker as well but
+ * then we won't be sure that it will ever be used.
*/
- if (xidhash == NULL)
+ if (MyLogicalRepWorker->stream_fileset == NULL)
{
- hash_ctl.keysize = sizeof(TransactionId);
- hash_ctl.entrysize = sizeof(StreamXidHash);
- hash_ctl.hcxt = ApplyContext;
- xidhash = hash_create("StreamXidHash", 1024, &hash_ctl,
- HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ MemoryContext oldctx;
+
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+
+ MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
+ FileSetInit(MyLogicalRepWorker->stream_fileset);
+
+ MemoryContextSwitchTo(oldctx);
}
/* open the spool file for this transaction */
@@ -1253,7 +1238,6 @@ apply_handle_stream_abort(StringInfo s)
BufFile *fd;
bool found = false;
char path[MAXPGPATH];
- StreamXidHash *ent;
set_apply_error_context_xact(subxid, 0);
@@ -1285,19 +1269,10 @@ apply_handle_stream_abort(StringInfo s)
return;
}
- ent = (StreamXidHash *) hash_search(xidhash,
- (void *) &xid,
- HASH_FIND,
- NULL);
- if (!ent)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("transaction %u not found in stream XID hash table",
- xid)));
-
/* open the changes file */
changes_filename(path, MyLogicalRepWorker->subid, xid);
- fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
+ fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path,
+ O_RDWR, false);
/* OK, truncate the file at the right offset */
BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno,
@@ -1327,7 +1302,6 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
int nchanges;
char path[MAXPGPATH];
char *buffer = NULL;
- StreamXidHash *ent;
MemoryContext oldcxt;
BufFile *fd;
@@ -1345,17 +1319,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
changes_filename(path, MyLogicalRepWorker->subid, xid);
elog(DEBUG1, "replaying changes from file \"%s\"", path);
- ent = (StreamXidHash *) hash_search(xidhash,
- (void *) &xid,
- HASH_FIND,
- NULL);
- if (!ent)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("transaction %u not found in stream XID hash table",
- xid)));
-
- fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDONLY);
+ fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
+ false);
buffer = palloc(BLCKSZ);
initStringInfo(&s2);
@@ -2542,30 +2507,6 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
}
/*
- * Cleanup filesets.
- */
-void
-logicalrep_worker_cleanupfileset(void)
-{
- HASH_SEQ_STATUS status;
- StreamXidHash *hentry;
-
- /* Remove all the pending stream and subxact filesets. */
- if (xidhash)
- {
- hash_seq_init(&status, xidhash);
- while ((hentry = (StreamXidHash *) hash_seq_search(&status)) != NULL)
- {
- FileSetDeleteAll(hentry->stream_fileset);
-
- /* Delete the subxact fileset iff it is created. */
- if (hentry->subxact_fileset)
- FileSetDeleteAll(hentry->subxact_fileset);
- }
- }
-}
-
-/*
* Apply main loop.
*/
static void
@@ -3026,58 +2967,30 @@ subxact_info_write(Oid subid, TransactionId xid)
{
char path[MAXPGPATH];
Size len;
- StreamXidHash *ent;
BufFile *fd;
Assert(TransactionIdIsValid(xid));
- /* Find the xid entry in the xidhash */
- ent = (StreamXidHash *) hash_search(xidhash,
- (void *) &xid,
- HASH_FIND,
- NULL);
- /* By this time we must have created the transaction entry */
- Assert(ent);
+ /* construct the subxact filename */
+ subxact_filename(path, subid, xid);
- /*
- * If there is no subtransaction then nothing to do, but if already have
- * subxact file then delete that.
- */
+ /* Delete the subxacts file, if exists. */
if (subxact_data.nsubxacts == 0)
{
- if (ent->subxact_fileset)
- {
- cleanup_subxact_info();
- FileSetDeleteAll(ent->subxact_fileset);
- pfree(ent->subxact_fileset);
- ent->subxact_fileset = NULL;
- }
+ cleanup_subxact_info();
+ BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
+
return;
}
- subxact_filename(path, subid, xid);
-
/*
* Create the subxact file if it not already created, otherwise open the
* existing file.
*/
- if (ent->subxact_fileset == NULL)
- {
- MemoryContext oldctx;
-
- /*
- * We need to maintain fileset across multiple stream start/stop
- * calls. So, need to allocate it in a persistent context.
- */
- oldctx = MemoryContextSwitchTo(ApplyContext);
- ent->subxact_fileset = palloc(sizeof(FileSet));
- FileSetInit(ent->subxact_fileset);
- MemoryContextSwitchTo(oldctx);
-
- fd = BufFileCreateFileSet(ent->subxact_fileset, path);
- }
- else
- fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDWR);
+ fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR,
+ true);
+ if (fd == NULL)
+ fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path);
len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
@@ -3104,34 +3017,21 @@ subxact_info_read(Oid subid, TransactionId xid)
char path[MAXPGPATH];
Size len;
BufFile *fd;
- StreamXidHash *ent;
MemoryContext oldctx;
Assert(!subxact_data.subxacts);
Assert(subxact_data.nsubxacts == 0);
Assert(subxact_data.nsubxacts_max == 0);
- /* Find the stream xid entry in the xidhash */
- ent = (StreamXidHash *) hash_search(xidhash,
- (void *) &xid,
- HASH_FIND,
- NULL);
- if (!ent)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("transaction %u not found in stream XID hash table",
- xid)));
-
/*
- * If subxact_fileset is not valid that mean we don't have any subxact
- * info
+ * If the subxact file doesn't exist that means we don't have any subxact
+ * info.
*/
- if (ent->subxact_fileset == NULL)
- return;
-
subxact_filename(path, subid, xid);
-
- fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDONLY);
+ fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY,
+ true);
+ if (fd == NULL)
+ return;
/* read number of subxact items */
if (BufFileRead(fd, &subxact_data.nsubxacts,
@@ -3267,42 +3167,21 @@ changes_filename(char *path, Oid subid, TransactionId xid)
* Cleanup files for a subscription / toplevel transaction.
*
* Remove files with serialized changes and subxact info for a particular
- * toplevel transaction. Each subscription has a separate set of files.
+ * toplevel transaction. Each subscription has a separate set of files
+ * for any toplevel transaction.
*/
static void
stream_cleanup_files(Oid subid, TransactionId xid)
{
char path[MAXPGPATH];
- StreamXidHash *ent;
-
- /* Find the xid entry in the xidhash */
- ent = (StreamXidHash *) hash_search(xidhash,
- (void *) &xid,
- HASH_FIND,
- NULL);
- if (!ent)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("transaction %u not found in stream XID hash table",
- xid)));
- /* Delete the change file and release the stream fileset memory */
+ /* Delete the changes file. */
changes_filename(path, subid, xid);
- FileSetDeleteAll(ent->stream_fileset);
- pfree(ent->stream_fileset);
- ent->stream_fileset = NULL;
+ BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false);
- /* Delete the subxact file and release the memory, if it exist */
- if (ent->subxact_fileset)
- {
- subxact_filename(path, subid, xid);
- FileSetDeleteAll(ent->subxact_fileset);
- pfree(ent->subxact_fileset);
- ent->subxact_fileset = NULL;
- }
-
- /* Remove the xid entry from the stream xid hash */
- hash_search(xidhash, (void *) &xid, HASH_REMOVE, NULL);
+ /* Delete the subxact file, if it exists. */
+ subxact_filename(path, subid, xid);
+ BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true);
}
/*
@@ -3312,8 +3191,8 @@ stream_cleanup_files(Oid subid, TransactionId xid)
*
* Open a file for streamed changes from a toplevel transaction identified
* by stream_xid (global variable). If it's the first chunk of streamed
- * changes for this transaction, initialize the fileset and create the buffile,
- * otherwise open the previously created file.
+ * changes for this transaction, create the buffile, otherwise open the
+ * previously created file.
*
* This can only be called at the beginning of a "streaming" block, i.e.
* between stream_start/stream_stop messages from the upstream.
@@ -3322,20 +3201,13 @@ static void
stream_open_file(Oid subid, TransactionId xid, bool first_segment)
{
char path[MAXPGPATH];
- bool found;
MemoryContext oldcxt;
- StreamXidHash *ent;
Assert(in_streamed_transaction);
Assert(OidIsValid(subid));
Assert(TransactionIdIsValid(xid));
Assert(stream_fd == NULL);
- /* create or find the xid entry in the xidhash */
- ent = (StreamXidHash *) hash_search(xidhash,
- (void *) &xid,
- HASH_ENTER,
- &found);
changes_filename(path, subid, xid);
elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
@@ -3347,49 +3219,20 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
oldcxt = MemoryContextSwitchTo(LogicalStreamingContext);
/*
- * If this is the first streamed segment, the file must not exist, so make
- * sure we're the ones creating it. Otherwise just open the file for
- * writing, in append mode.
+ * If this is the first streamed segment, create the changes file.
+ * Otherwise, just open the file for writing, in append mode.
*/
if (first_segment)
- {
- MemoryContext savectx;
- FileSet *fileset;
-
- if (found)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
-
- /*
- * We need to maintain fileset across multiple stream start/stop
- * calls. So, need to allocate it in a persistent context.
- */
- savectx = MemoryContextSwitchTo(ApplyContext);
- fileset = palloc(sizeof(FileSet));
-
- FileSetInit(fileset);
- MemoryContextSwitchTo(savectx);
-
- stream_fd = BufFileCreateFileSet(fileset, path);
-
- /* Remember the fileset for the next stream of the same transaction */
- ent->xid = xid;
- ent->stream_fileset = fileset;
- ent->subxact_fileset = NULL;
- }
+ stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset,
+ path);
else
{
- if (!found)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
-
/*
* Open the file and seek to the end of the file because we always
* append the changes file.
*/
- stream_fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR);
+ stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset,
+ path, O_RDWR, false);
BufFileSeek(stream_fd, 0, 0, SEEK_END);
}
diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
index 5e5409d84d9..ff3aa67cde0 100644
--- a/src/backend/storage/file/buffile.c
+++ b/src/backend/storage/file/buffile.c
@@ -278,10 +278,13 @@ BufFileCreateFileSet(FileSet *fileset, const char *name)
* with BufFileCreateFileSet in the same FileSet using the same name.
* The backend that created the file must have called BufFileClose() or
* BufFileExportFileSet() to make sure that it is ready to be opened by other
- * backends and render it read-only.
+ * backends and render it read-only. If missing_ok is true, which indicates
+ * that missing files can be safely ignored, then return NULL if the BufFile
+ * with the given name is not found, otherwise, throw an error.
*/
BufFile *
-BufFileOpenFileSet(FileSet *fileset, const char *name, int mode)
+BufFileOpenFileSet(FileSet *fileset, const char *name, int mode,
+ bool missing_ok)
{
BufFile *file;
char segment_name[MAXPGPATH];
@@ -318,10 +321,18 @@ BufFileOpenFileSet(FileSet *fileset, const char *name, int mode)
* name.
*/
if (nfiles == 0)
+ {
+ /* free the memory */
+ pfree(files);
+
+ if (missing_ok)
+ return NULL;
+
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open temporary file \"%s\" from BufFile \"%s\": %m",
segment_name, name)));
+ }
file = makeBufFileCommon(nfiles);
file->files = files;
@@ -341,10 +352,11 @@ BufFileOpenFileSet(FileSet *fileset, const char *name, int mode)
* the FileSet to be cleaned up.
*
* Only one backend should attempt to delete a given name, and should know
- * that it exists and has been exported or closed.
+ * that it exists and has been exported or closed otherwise missing_ok should
+ * be passed true.
*/
void
-BufFileDeleteFileSet(FileSet *fileset, const char *name)
+BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok)
{
char segment_name[MAXPGPATH];
int segment = 0;
@@ -366,7 +378,7 @@ BufFileDeleteFileSet(FileSet *fileset, const char *name)
CHECK_FOR_INTERRUPTS();
}
- if (!found)
+ if (!found && !missing_ok)
elog(ERROR, "could not delete unknown BufFile \"%s\"", name);
}
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index f7994d771d6..debf12e1b0b 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -564,7 +564,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
lt = &lts->tapes[i];
pg_itoa(i, filename);
- file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY);
+ file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY, false);
filesize = BufFileSize(file);
/*
diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c
index 504ef1c2869..033088f9bc1 100644
--- a/src/backend/utils/sort/sharedtuplestore.c
+++ b/src/backend/utils/sort/sharedtuplestore.c
@@ -560,7 +560,8 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
sts_filename(name, accessor, accessor->read_participant);
accessor->read_file =
- BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY);
+ BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
+ false);
}
/* Seek and load the chunk header. */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index a6c9d4e2a10..c00be2a2b6f 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -50,6 +50,15 @@ typedef struct LogicalRepWorker
XLogRecPtr relstate_lsn;
slock_t relmutex;
+ /*
+ * Used to create the changes and subxact files for the streaming
+ * transactions. Upon the arrival of the first streaming transaction, the
+ * fileset will be initialized, and it will be deleted when the worker
+ * exits. Under this, separate buffiles would be created for each
+ * transaction which will be deleted after the transaction is finished.
+ */
+ FileSet *stream_fileset;
+
/* Stats. */
XLogRecPtr last_lsn;
TimestampTz last_send_time;
@@ -79,7 +88,6 @@ extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
extern void logicalrep_worker_stop(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
-extern void logicalrep_worker_cleanupfileset(void);
extern int logicalrep_sync_worker_count(Oid subid);
diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h
index 143eada85fe..7ae5ea2dde0 100644
--- a/src/include/storage/buffile.h
+++ b/src/include/storage/buffile.h
@@ -49,8 +49,9 @@ extern long BufFileAppend(BufFile *target, BufFile *source);
extern BufFile *BufFileCreateFileSet(FileSet *fileset, const char *name);
extern void BufFileExportFileSet(BufFile *file);
extern BufFile *BufFileOpenFileSet(FileSet *fileset, const char *name,
- int mode);
-extern void BufFileDeleteFileSet(FileSet *fileset, const char *name);
+ int mode, bool missing_ok);
+extern void BufFileDeleteFileSet(FileSet *fileset, const char *name,
+ bool missing_ok);
extern void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset);
#endif /* BUFFILE_H */