diff options
author | Amit Kapila <akapila@postgresql.org> | 2020-09-03 07:54:07 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2020-09-03 07:54:07 +0530 |
commit | 464824323e57dc4b397e8b05854d779908b55304 (patch) | |
tree | 30a02506ae6b53475302980bc558e2a41ea429f0 /src/backend/replication/logical/worker.c | |
parent | 66f163068030b5c5fe792a0daee27822dac43791 (diff) | |
download | postgresql-464824323e57dc4b397e8b05854d779908b55304.tar.gz postgresql-464824323e57dc4b397e8b05854d779908b55304.zip |
Add support for streaming to built-in logical replication.
To add support for streaming of in-progress transactions into the
built-in logical replication, we need to do three things:
* Extend the logical replication protocol, so identify in-progress
transactions, and allow adding additional bits of information (e.g.
XID of subtransactions).
* Modify the output plugin (pgoutput) to implement the new stream
API callbacks, by leveraging the extended replication protocol.
* Modify the replication apply worker, to properly handle streamed
in-progress transaction by spilling the data to disk and then
replaying them on commit.
We however must explicitly disable streaming replication during
replication slot creation, even if the plugin supports it. We
don't need to replicate the changes accumulated during this phase,
and moreover we don't have a replication connection open so we
don't have where to send the data anyway.
Author: Tomas Vondra, Dilip Kumar and Amit Kapila
Reviewed-by: Amit Kapila, Kuntal Ghosh and Ajin Cherian
Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 952 |
1 files changed, 947 insertions, 5 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b576e342cb7..812aca80112 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -18,11 +18,45 @@ * This module includes server facing code and shares libpqwalreceiver * module with walreceiver for providing the libpq specific functionality. * + * + * STREAMED TRANSACTIONS + * --------------------- + * Streamed transactions (large transactions exceeding a memory limit on the + * upstream) are not applied immediately, but instead, the data is written + * to temporary files and then applied at once when the final commit arrives. + * + * Unlike the regular (non-streamed) case, handling streamed transactions has + * to handle aborts of both the toplevel transaction and subtransactions. This + * is achieved by tracking offsets for subtransactions, which is then used + * to truncate the file with serialized changes. + * + * The files are placed in tmp file directory by default, and the filenames + * include both the XID of the toplevel transaction and OID of the + * subscription. This is necessary so that different workers processing a + * remote transaction with the same XID doesn't interfere. + * + * We use BufFiles instead of using normal temporary files because (a) the + * BufFile infrastructure supports temporary files that exceed the OS file size + * limit, (b) provides a way for automatic clean up on the error and (c) provides + * a way to survive these files across local transactions and allow to open and + * close at stream start and close. We decided to use SharedFileSet + * infrastructure as without that it deletes the files on the closure of the + * file and if we decide to keep stream files open across the start/stop stream + * then it will consume a lot of memory (more than 8K for each BufFile and + * there could be multiple such BufFiles as the subscriber could receive + * multiple start/stop streams for different transactions before getting the + * commit). Moreover, if we don't use SharedFileSet then we also need to invent + * a new way to pass filenames to BufFile APIs so that we are allowed to open + * the file we desired across multiple stream-open calls for the same + * transaction. *------------------------------------------------------------------------- */ #include "postgres.h" +#include <sys/stat.h> +#include <unistd.h> + #include "access/table.h" #include "access/tableam.h" #include "access/xact.h" @@ -33,7 +67,9 @@ #include "catalog/pg_inherits.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" +#include "catalog/pg_tablespace.h" #include "commands/tablecmds.h" +#include "commands/tablespace.h" #include "commands/trigger.h" #include "executor/executor.h" #include "executor/execPartition.h" @@ -63,7 +99,9 @@ #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "rewrite/rewriteHandler.h" +#include "storage/buffile.h" #include "storage/bufmgr.h" +#include "storage/fd.h" #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/proc.h" @@ -71,6 +109,7 @@ #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/catcache.h" +#include "utils/dynahash.h" #include "utils/datum.h" #include "utils/fmgroids.h" #include "utils/guc.h" @@ -99,9 +138,26 @@ typedef struct SlotErrCallbackArg int remote_attnum; } SlotErrCallbackArg; +/* + * Stream xid hash entry. Whenever we see a new xid we create this entry in the + * xidhash and along with it create the streaming file and store the fileset handle. + * The subxact file is created iff there is any subxact info under this xid. This + * entry is used on the subsequent streams for the xid to get the corresponding + * fileset handles, so storing them in hash makes the search faster. + */ +typedef struct StreamXidHash +{ + TransactionId xid; /* xid is the hash key and must be first */ + SharedFileSet *stream_fileset; /* shared file set for stream data */ + SharedFileSet *subxact_fileset; /* shared file set for subxact info */ +} StreamXidHash; + static MemoryContext ApplyMessageContext = NULL; MemoryContext ApplyContext = NULL; +/* per stream context for streaming transactions */ +static MemoryContext LogicalStreamingContext = NULL; + WalReceiverConn *wrconn = NULL; Subscription *MySubscription = NULL; @@ -110,12 +166,66 @@ bool MySubscriptionValid = false; bool in_remote_transaction = false; static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +/* fields valid only when processing streamed transaction */ +bool in_streamed_transaction = false; + +static TransactionId stream_xid = InvalidTransactionId; + +/* + * Hash table for storing the streaming xid information along with shared file + * set for streaming and subxact files. + */ +static HTAB *xidhash = NULL; + +/* BufFile handle of the current streaming file */ +static BufFile *stream_fd = NULL; + +typedef struct SubXactInfo +{ + TransactionId xid; /* XID of the subxact */ + int fileno; /* file number in the buffile */ + off_t offset; /* offset in the file */ +} SubXactInfo; + +/* Sub-transaction data for the current streaming transaction */ +typedef struct ApplySubXactData +{ + uint32 nsubxacts; /* number of sub-transactions */ + uint32 nsubxacts_max; /* current capacity of subxacts */ + TransactionId subxact_last; /* xid of the last sub-transaction */ + SubXactInfo *subxacts; /* sub-xact offset in changes file */ +} ApplySubXactData; + +static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}; + +static void subxact_filename(char *path, Oid subid, TransactionId xid); +static void changes_filename(char *path, Oid subid, TransactionId xid); + +/* + * Information about subtransactions of a given toplevel transaction. + */ +static void subxact_info_write(Oid subid, TransactionId xid); +static void subxact_info_read(Oid subid, TransactionId xid); +static void subxact_info_add(TransactionId xid); +static inline void cleanup_subxact_info(void); + +/* + * Serialize and deserialize changes for a toplevel transaction. + */ +static void stream_cleanup_files(Oid subid, TransactionId xid); +static void stream_open_file(Oid subid, TransactionId xid, bool first); +static void stream_write_change(char action, StringInfo s); +static void stream_close_file(void); + static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); static void store_flush_position(XLogRecPtr remote_lsn); static void maybe_reread_subscription(void); +/* prototype needed because of stream_commit */ +static void apply_dispatch(StringInfo s); + static void apply_handle_insert_internal(ResultRelInfo *relinfo, EState *estate, TupleTableSlot *remoteslot); static void apply_handle_update_internal(ResultRelInfo *relinfo, @@ -187,6 +297,42 @@ ensure_transaction(void) return true; } +/* + * Handle streamed transactions. + * + * If in streaming mode (receiving a block of streamed transaction), we + * simply redirect it to a file for the proper toplevel transaction. + * + * Returns true for streamed transactions, false otherwise (regular mode). + */ +static bool +handle_streamed_transaction(const char action, StringInfo s) +{ + TransactionId xid; + + /* not in streaming mode */ + if (!in_streamed_transaction) + return false; + + Assert(stream_fd != NULL); + Assert(TransactionIdIsValid(stream_xid)); + + /* + * We should have received XID of the subxact as the first part of the + * message, so extract it. + */ + xid = pq_getmsgint(s, 4); + + Assert(TransactionIdIsValid(xid)); + + /* Add the new subxact to the array (unless already there). */ + subxact_info_add(xid); + + /* write the change to the current file */ + stream_write_change(action, s); + + return true; +} /* * Executor state preparation for evaluation of constraint expressions, @@ -612,17 +758,336 @@ static void apply_handle_origin(StringInfo s) { /* - * ORIGIN message can only come inside remote transaction and before any - * actual writes. + * ORIGIN message can only come inside streaming transaction or inside + * remote transaction and before any actual writes. */ - if (!in_remote_transaction || - (IsTransactionState() && !am_tablesync_worker())) + if (!in_streamed_transaction && + (!in_remote_transaction || + (IsTransactionState() && !am_tablesync_worker()))) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("ORIGIN message sent out of order"))); } /* + * Handle STREAM START message. + */ +static void +apply_handle_stream_start(StringInfo s) +{ + bool first_segment; + HASHCTL hash_ctl; + + Assert(!in_streamed_transaction); + + /* + * Start a transaction on stream start, this transaction will be committed + * on the stream stop. We need the transaction for handling the buffile, + * used for serializing the streaming data and subxact info. + */ + ensure_transaction(); + + /* notify handle methods we're processing a remote transaction */ + in_streamed_transaction = true; + + /* extract XID of the top-level transaction */ + stream_xid = logicalrep_read_stream_start(s, &first_segment); + + /* + * Initialize the xidhash table if we haven't yet. This will be used for + * the entire duration of the apply worker so create it in permanent + * context. + */ + if (xidhash == NULL) + { + hash_ctl.keysize = sizeof(TransactionId); + hash_ctl.entrysize = sizeof(StreamXidHash); + hash_ctl.hcxt = ApplyContext; + xidhash = hash_create("StreamXidHash", 1024, &hash_ctl, + HASH_ELEM | HASH_CONTEXT); + } + + /* open the spool file for this transaction */ + stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment); + + /* if this is not the first segment, open existing subxact file */ + if (!first_segment) + subxact_info_read(MyLogicalRepWorker->subid, stream_xid); + + pgstat_report_activity(STATE_RUNNING, NULL); +} + +/* + * Handle STREAM STOP message. + */ +static void +apply_handle_stream_stop(StringInfo s) +{ + Assert(in_streamed_transaction); + + /* + * Close the file with serialized changes, and serialize information about + * subxacts for the toplevel transaction. + */ + subxact_info_write(MyLogicalRepWorker->subid, stream_xid); + stream_close_file(); + + /* We must be in a valid transaction state */ + Assert(IsTransactionState()); + + /* Commit the per-stream transaction */ + CommitTransactionCommand(); + + in_streamed_transaction = false; + + /* Reset per-stream context */ + MemoryContextReset(LogicalStreamingContext); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* + * Handle STREAM abort message. + */ +static void +apply_handle_stream_abort(StringInfo s) +{ + TransactionId xid; + TransactionId subxid; + + Assert(!in_streamed_transaction); + + logicalrep_read_stream_abort(s, &xid, &subxid); + + /* + * If the two XIDs are the same, it's in fact abort of toplevel xact, so + * just delete the files with serialized info. + */ + if (xid == subxid) + stream_cleanup_files(MyLogicalRepWorker->subid, xid); + else + { + /* + * OK, so it's a subxact. We need to read the subxact file for the + * toplevel transaction, determine the offset tracked for the subxact, + * and truncate the file with changes. We also remove the subxacts + * with higher offsets (or rather higher XIDs). + * + * We intentionally scan the array from the tail, because we're likely + * aborting a change for the most recent subtransactions. + * + * We can't use the binary search here as subxact XIDs won't + * necessarily arrive in sorted order, consider the case where we have + * released the savepoint for multiple subtransactions and then + * performed rollback to savepoint for one of the earlier + * sub-transaction. + */ + + int64 i; + int64 subidx; + BufFile *fd; + bool found = false; + char path[MAXPGPATH]; + StreamXidHash *ent; + + subidx = -1; + ensure_transaction(); + subxact_info_read(MyLogicalRepWorker->subid, xid); + + for (i = subxact_data.nsubxacts; i > 0; i--) + { + if (subxact_data.subxacts[i - 1].xid == subxid) + { + subidx = (i - 1); + found = true; + break; + } + } + + /* + * If it's an empty sub-transaction then we will not find the subxid + * here so just cleanup the subxact info and return. + */ + if (!found) + { + /* Cleanup the subxact info */ + cleanup_subxact_info(); + CommitTransactionCommand(); + return; + } + + Assert((subidx >= 0) && (subidx < subxact_data.nsubxacts)); + + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_FIND, + &found); + Assert(found); + + /* open the changes file */ + changes_filename(path, MyLogicalRepWorker->subid, xid); + fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR); + + /* OK, truncate the file at the right offset */ + BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno, + subxact_data.subxacts[subidx].offset); + BufFileClose(fd); + + /* discard the subxacts added later */ + subxact_data.nsubxacts = subidx; + + /* write the updated subxact list */ + subxact_info_write(MyLogicalRepWorker->subid, xid); + CommitTransactionCommand(); + } +} + +/* + * Handle STREAM COMMIT message. + */ +static void +apply_handle_stream_commit(StringInfo s) +{ + TransactionId xid; + StringInfoData s2; + int nchanges; + char path[MAXPGPATH]; + char *buffer = NULL; + bool found; + LogicalRepCommitData commit_data; + StreamXidHash *ent; + MemoryContext oldcxt; + BufFile *fd; + + Assert(!in_streamed_transaction); + + xid = logicalrep_read_stream_commit(s, &commit_data); + + elog(DEBUG1, "received commit for streamed transaction %u", xid); + + ensure_transaction(); + + /* + * Allocate file handle and memory required to process all the messages in + * TopTransactionContext to avoid them getting reset after each message is + * processed. + */ + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + + /* open the spool file for the committed transaction */ + changes_filename(path, MyLogicalRepWorker->subid, xid); + elog(DEBUG1, "replaying changes from file \"%s\"", path); + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_FIND, + &found); + Assert(found); + fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY); + + buffer = palloc(BLCKSZ); + initStringInfo(&s2); + + MemoryContextSwitchTo(oldcxt); + + remote_final_lsn = commit_data.commit_lsn; + + /* + * Make sure the handle apply_dispatch methods are aware we're in a remote + * transaction. + */ + in_remote_transaction = true; + pgstat_report_activity(STATE_RUNNING, NULL); + + /* + * Read the entries one by one and pass them through the same logic as in + * apply_dispatch. + */ + nchanges = 0; + while (true) + { + int nbytes; + int len; + + CHECK_FOR_INTERRUPTS(); + + /* read length of the on-disk record */ + nbytes = BufFileRead(fd, &len, sizeof(len)); + + /* have we reached end of the file? */ + if (nbytes == 0) + break; + + /* do we have a correct length? */ + if (nbytes != sizeof(len)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from streaming transaction's changes file \"%s\": %m", + path))); + + Assert(len > 0); + + /* make sure we have sufficiently large buffer */ + buffer = repalloc(buffer, len); + + /* and finally read the data into the buffer */ + if (BufFileRead(fd, buffer, len) != len) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from streaming transaction's changes file \"%s\": %m", + path))); + + /* copy the buffer to the stringinfo and call apply_dispatch */ + resetStringInfo(&s2); + appendBinaryStringInfo(&s2, buffer, len); + + /* Ensure we are reading the data into our memory context. */ + oldcxt = MemoryContextSwitchTo(ApplyMessageContext); + + apply_dispatch(&s2); + + MemoryContextReset(ApplyMessageContext); + + MemoryContextSwitchTo(oldcxt); + + nchanges++; + + if (nchanges % 1000 == 0) + elog(DEBUG1, "replayed %d changes from file '%s'", + nchanges, path); + } + + BufFileClose(fd); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = commit_data.end_lsn; + replorigin_session_origin_timestamp = commit_data.committime; + + pfree(buffer); + pfree(s2.data); + + CommitTransactionCommand(); + pgstat_report_stat(false); + + store_flush_position(commit_data.end_lsn); + + elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", + nchanges, path); + + in_remote_transaction = false; + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(commit_data.end_lsn); + + /* unlink the files with serialized changes and subxact info */ + stream_cleanup_files(MyLogicalRepWorker->subid, xid); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* * Handle RELATION message. * * Note we don't do validation against local schema here. The validation @@ -635,6 +1100,9 @@ apply_handle_relation(StringInfo s) { LogicalRepRelation *rel; + if (handle_streamed_transaction('R', s)) + return; + rel = logicalrep_read_rel(s); logicalrep_relmap_update(rel); } @@ -650,6 +1118,9 @@ apply_handle_type(StringInfo s) { LogicalRepTyp typ; + if (handle_streamed_transaction('Y', s)) + return; + logicalrep_read_typ(s, &typ); logicalrep_typmap_update(&typ); } @@ -686,6 +1157,9 @@ apply_handle_insert(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; + if (handle_streamed_transaction('I', s)) + return; + ensure_transaction(); relid = logicalrep_read_insert(s, &newtup); @@ -801,6 +1275,9 @@ apply_handle_update(StringInfo s) RangeTblEntry *target_rte; MemoryContext oldctx; + if (handle_streamed_transaction('U', s)) + return; + ensure_transaction(); relid = logicalrep_read_update(s, &has_oldtup, &oldtup, @@ -950,6 +1427,9 @@ apply_handle_delete(StringInfo s) TupleTableSlot *remoteslot; MemoryContext oldctx; + if (handle_streamed_transaction('D', s)) + return; + ensure_transaction(); relid = logicalrep_read_delete(s, &oldtup); @@ -1320,6 +1800,9 @@ apply_handle_truncate(StringInfo s) List *relids_logged = NIL; ListCell *lc; + if (handle_streamed_transaction('T', s)) + return; + ensure_transaction(); remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs); @@ -1458,6 +1941,22 @@ apply_dispatch(StringInfo s) case 'O': apply_handle_origin(s); break; + /* STREAM START */ + case 'S': + apply_handle_stream_start(s); + break; + /* STREAM END */ + case 'E': + apply_handle_stream_stop(s); + break; + /* STREAM ABORT */ + case 'A': + apply_handle_stream_abort(s); + break; + /* STREAM COMMIT */ + case 'c': + apply_handle_stream_commit(s); + break; default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1570,6 +2069,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received) "ApplyMessageContext", ALLOCSET_DEFAULT_SIZES); + /* + * This memory context is used for per-stream data when the streaming mode + * is enabled. This context is reset on each stream stop. + */ + LogicalStreamingContext = AllocSetContextCreate(ApplyContext, + "LogicalStreamingContext", + ALLOCSET_DEFAULT_SIZES); + /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); @@ -1674,7 +2181,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* confirm all writes so far */ send_feedback(last_received, false, false); - if (!in_remote_transaction) + if (!in_remote_transaction && !in_streamed_transaction) { /* * If we didn't get any transactions for a while there might be @@ -1938,6 +2445,7 @@ maybe_reread_subscription(void) strcmp(newsub->name, MySubscription->name) != 0 || strcmp(newsub->slotname, MySubscription->slotname) != 0 || newsub->binary != MySubscription->binary || + newsub->stream != MySubscription->stream || !equal(newsub->publications, MySubscription->publications)) { ereport(LOG, @@ -1979,6 +2487,439 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) MySubscriptionValid = false; } +/* + * subxact_info_write + * Store information about subxacts for a toplevel transaction. + * + * For each subxact we store offset of it's first change in the main file. + * The file is always over-written as a whole. + * + * XXX We should only store subxacts that were not aborted yet. + */ +static void +subxact_info_write(Oid subid, TransactionId xid) +{ + char path[MAXPGPATH]; + bool found; + Size len; + StreamXidHash *ent; + BufFile *fd; + + Assert(TransactionIdIsValid(xid)); + + /* find the xid entry in the xidhash */ + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_FIND, + &found); + /* we must found the entry for its top transaction by this time */ + Assert(found); + + /* + * If there is no subtransaction then nothing to do, but if already have + * subxact file then delete that. + */ + if (subxact_data.nsubxacts == 0) + { + if (ent->subxact_fileset) + { + cleanup_subxact_info(); + SharedFileSetDeleteAll(ent->subxact_fileset); + pfree(ent->subxact_fileset); + ent->subxact_fileset = NULL; + } + return; + } + + subxact_filename(path, subid, xid); + + /* + * Create the subxact file if it not already created, otherwise open the + * existing file. + */ + if (ent->subxact_fileset == NULL) + { + MemoryContext oldctx; + + /* + * We need to maintain shared fileset across multiple stream + * start/stop calls. So, need to allocate it in a persistent context. + */ + oldctx = MemoryContextSwitchTo(ApplyContext); + ent->subxact_fileset = palloc(sizeof(SharedFileSet)); + SharedFileSetInit(ent->subxact_fileset, NULL); + MemoryContextSwitchTo(oldctx); + + fd = BufFileCreateShared(ent->subxact_fileset, path); + } + else + fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR); + + len = sizeof(SubXactInfo) * subxact_data.nsubxacts; + + /* Write the subxact count and subxact info */ + BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts)); + BufFileWrite(fd, subxact_data.subxacts, len); + + BufFileClose(fd); + + /* free the memory allocated for subxact info */ + cleanup_subxact_info(); +} + +/* + * subxact_info_read + * Restore information about subxacts of a streamed transaction. + * + * Read information about subxacts into the structure subxact_data that can be + * used later. + */ +static void +subxact_info_read(Oid subid, TransactionId xid) +{ + char path[MAXPGPATH]; + bool found; + Size len; + BufFile *fd; + StreamXidHash *ent; + MemoryContext oldctx; + + Assert(TransactionIdIsValid(xid)); + Assert(!subxact_data.subxacts); + Assert(subxact_data.nsubxacts == 0); + Assert(subxact_data.nsubxacts_max == 0); + + /* Find the stream xid entry in the xidhash */ + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_FIND, + &found); + + /* + * If subxact_fileset is not valid that mean we don't have any subxact + * info + */ + if (ent->subxact_fileset == NULL) + return; + + subxact_filename(path, subid, xid); + + fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY); + + /* read number of subxact items */ + if (BufFileRead(fd, &subxact_data.nsubxacts, + sizeof(subxact_data.nsubxacts)) != + sizeof(subxact_data.nsubxacts)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from streaming transaction's subxact file \"%s\": %m", + path))); + + len = sizeof(SubXactInfo) * subxact_data.nsubxacts; + + /* we keep the maximum as a power of 2 */ + subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts); + + /* + * Allocate subxact information in the logical streaming context. We need + * this information during the complete stream so that we can add the sub + * transaction info to this. On stream stop we will flush this information + * to the subxact file and reset the logical streaming context. + */ + oldctx = MemoryContextSwitchTo(LogicalStreamingContext); + subxact_data.subxacts = palloc(subxact_data.nsubxacts_max * + sizeof(SubXactInfo)); + MemoryContextSwitchTo(oldctx); + + if ((len > 0) && ((BufFileRead(fd, subxact_data.subxacts, len)) != len)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from streaming transaction's subxact file \"%s\": %m", + path))); + + BufFileClose(fd); +} + +/* + * subxact_info_add + * Add information about a subxact (offset in the main file). + */ +static void +subxact_info_add(TransactionId xid) +{ + SubXactInfo *subxacts = subxact_data.subxacts; + int64 i; + + /* We must have a valid top level stream xid and a stream fd. */ + Assert(TransactionIdIsValid(stream_xid)); + Assert(stream_fd != NULL); + + /* + * If the XID matches the toplevel transaction, we don't want to add it. + */ + if (stream_xid == xid) + return; + + /* + * In most cases we're checking the same subxact as we've already seen in + * the last call, so make sure to ignore it (this change comes later). + */ + if (subxact_data.subxact_last == xid) + return; + + /* OK, remember we're processing this XID. */ + subxact_data.subxact_last = xid; + + /* + * Check if the transaction is already present in the array of subxact. We + * intentionally scan the array from the tail, because we're likely adding + * a change for the most recent subtransactions. + * + * XXX Can we rely on the subxact XIDs arriving in sorted order? That + * would allow us to use binary search here. + */ + for (i = subxact_data.nsubxacts; i > 0; i--) + { + /* found, so we're done */ + if (subxacts[i - 1].xid == xid) + return; + } + + /* This is a new subxact, so we need to add it to the array. */ + if (subxact_data.nsubxacts == 0) + { + MemoryContext oldctx; + + subxact_data.nsubxacts_max = 128; + + /* + * Allocate this memory for subxacts in per-stream context, see + * subxact_info_read. + */ + oldctx = MemoryContextSwitchTo(LogicalStreamingContext); + subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo)); + MemoryContextSwitchTo(oldctx); + } + else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max) + { + subxact_data.nsubxacts_max *= 2; + subxacts = repalloc(subxacts, + subxact_data.nsubxacts_max * sizeof(SubXactInfo)); + } + + subxacts[subxact_data.nsubxacts].xid = xid; + + /* + * Get the current offset of the stream file and store it as offset of + * this subxact. + */ + BufFileTell(stream_fd, + &subxacts[subxact_data.nsubxacts].fileno, + &subxacts[subxact_data.nsubxacts].offset); + + subxact_data.nsubxacts++; + subxact_data.subxacts = subxacts; +} + +/* format filename for file containing the info about subxacts */ +static void +subxact_filename(char *path, Oid subid, TransactionId xid) +{ + snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid); +} + +/* format filename for file containing serialized changes */ +static inline void +changes_filename(char *path, Oid subid, TransactionId xid) +{ + snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid); +} + +/* + * stream_cleanup_files + * Cleanup files for a subscription / toplevel transaction. + * + * Remove files with serialized changes and subxact info for a particular + * toplevel transaction. Each subscription has a separate set of files. + */ +static void +stream_cleanup_files(Oid subid, TransactionId xid) +{ + char path[MAXPGPATH]; + StreamXidHash *ent; + + /* Remove the xid entry from the stream xid hash */ + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_REMOVE, + NULL); + /* By this time we must have created the transaction entry */ + Assert(ent != NULL); + + /* Delete the change file and release the stream fileset memory */ + changes_filename(path, subid, xid); + SharedFileSetDeleteAll(ent->stream_fileset); + pfree(ent->stream_fileset); + ent->stream_fileset = NULL; + + /* Delete the subxact file and release the memory, if it exist */ + if (ent->subxact_fileset) + { + subxact_filename(path, subid, xid); + SharedFileSetDeleteAll(ent->subxact_fileset); + pfree(ent->subxact_fileset); + ent->subxact_fileset = NULL; + } +} + +/* + * stream_open_file + * Open a file that we'll use to serialize changes for a toplevel + * transaction. + * + * Open a file for streamed changes from a toplevel transaction identified + * by stream_xid (global variable). If it's the first chunk of streamed + * changes for this transaction, initialize the shared fileset and create the + * buffile, otherwise open the previously created file. + * + * This can only be called at the beginning of a "streaming" block, i.e. + * between stream_start/stream_stop messages from the upstream. + */ +static void +stream_open_file(Oid subid, TransactionId xid, bool first_segment) +{ + char path[MAXPGPATH]; + bool found; + MemoryContext oldcxt; + StreamXidHash *ent; + + Assert(in_streamed_transaction); + Assert(OidIsValid(subid)); + Assert(TransactionIdIsValid(xid)); + Assert(stream_fd == NULL); + + /* create or find the xid entry in the xidhash */ + ent = (StreamXidHash *) hash_search(xidhash, + (void *) &xid, + HASH_ENTER | HASH_FIND, + &found); + Assert(first_segment || found); + changes_filename(path, subid, xid); + elog(DEBUG1, "opening file \"%s\" for streamed changes", path); + + /* + * Create/open the buffiles under the logical streaming context so that we + * have those files until stream stop. + */ + oldcxt = MemoryContextSwitchTo(LogicalStreamingContext); + + /* + * If this is the first streamed segment, the file must not exist, so make + * sure we're the ones creating it. Otherwise just open the file for + * writing, in append mode. + */ + if (first_segment) + { + MemoryContext savectx; + SharedFileSet *fileset; + + /* + * We need to maintain shared fileset across multiple stream + * start/stop calls. So, need to allocate it in a persistent context. + */ + savectx = MemoryContextSwitchTo(ApplyContext); + fileset = palloc(sizeof(SharedFileSet)); + + SharedFileSetInit(fileset, NULL); + MemoryContextSwitchTo(savectx); + + stream_fd = BufFileCreateShared(fileset, path); + + /* Remember the fileset for the next stream of the same transaction */ + ent->xid = xid; + ent->stream_fileset = fileset; + ent->subxact_fileset = NULL; + } + else + { + /* + * Open the file and seek to the end of the file because we always + * append the changes file. + */ + stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR); + BufFileSeek(stream_fd, 0, 0, SEEK_END); + } + + MemoryContextSwitchTo(oldcxt); +} + +/* + * stream_close_file + * Close the currently open file with streamed changes. + * + * This can only be called at the end of a streaming block, i.e. at stream_stop + * message from the upstream. + */ +static void +stream_close_file(void) +{ + Assert(in_streamed_transaction); + Assert(TransactionIdIsValid(stream_xid)); + Assert(stream_fd != NULL); + + BufFileClose(stream_fd); + + stream_xid = InvalidTransactionId; + stream_fd = NULL; +} + +/* + * stream_write_change + * Serialize a change to a file for the current toplevel transaction. + * + * The change is serialized in a simple format, with length (not including + * the length), action code (identifying the message type) and message + * contents (without the subxact TransactionId value). + */ +static void +stream_write_change(char action, StringInfo s) +{ + int len; + + Assert(in_streamed_transaction); + Assert(TransactionIdIsValid(stream_xid)); + Assert(stream_fd != NULL); + + /* total on-disk size, including the action type character */ + len = (s->len - s->cursor) + sizeof(char); + + /* first write the size */ + BufFileWrite(stream_fd, &len, sizeof(len)); + + /* then the action */ + BufFileWrite(stream_fd, &action, sizeof(action)); + + /* and finally the remaining part of the buffer (after the XID) */ + len = (s->len - s->cursor); + + BufFileWrite(stream_fd, &s->data[s->cursor], len); +} + +/* + * Cleanup the memory for subxacts and reset the related variables. + */ +static inline void +cleanup_subxact_info() +{ + if (subxact_data.subxacts) + pfree(subxact_data.subxacts); + + subxact_data.subxacts = NULL; + subxact_data.subxact_last = InvalidTransactionId; + subxact_data.nsubxacts = 0; + subxact_data.nsubxacts_max = 0; +} + /* Logical Replication Apply worker entry point */ void ApplyWorkerMain(Datum main_arg) @@ -2151,6 +3092,7 @@ ApplyWorkerMain(Datum main_arg) options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM; options.proto.logical.publication_names = MySubscription->publications; options.proto.logical.binary = MySubscription->binary; + options.proto.logical.streaming = MySubscription->stream; /* Start normal logical streaming replication. */ walrcv_startstreaming(wrconn, &options); |