aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c101
1 files changed, 63 insertions, 38 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b9a7a7ffbb3..3f499b11f72 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -333,6 +333,8 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
/* Compute GID for two_phase transactions */
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
+/* Common streaming function to apply all the spooled messages */
+static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
/*
* Should this worker apply changes for given relation.
@@ -885,13 +887,46 @@ apply_handle_begin_prepare(StringInfo s)
}
/*
+ * Common function to prepare the GID.
+ */
+static void
+apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
+{
+ char gid[GIDSIZE];
+
+ /*
+ * Compute unique GID for two_phase transactions. We don't use GID of
+ * prepared transaction sent by server as that can lead to deadlock when
+ * we have multiple subscriptions from same node point to publications on
+ * the same node. See comments atop worker.c
+ */
+ TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
+ gid, sizeof(gid));
+
+ /*
+ * BeginTransactionBlock is necessary to balance the EndTransactionBlock
+ * called within the PrepareTransactionBlock below.
+ */
+ BeginTransactionBlock();
+ CommitTransactionCommand(); /* Completes the preceding Begin command. */
+
+ /*
+ * Update origin state so we can restart streaming from correct position
+ * in case of crash.
+ */
+ replorigin_session_origin_lsn = prepare_data->end_lsn;
+ replorigin_session_origin_timestamp = prepare_data->prepare_time;
+
+ PrepareTransactionBlock(gid);
+}
+
+/*
* Handle PREPARE message.
*/
static void
apply_handle_prepare(StringInfo s)
{
LogicalRepPreparedTxnData prepare_data;
- char gid[GIDSIZE];
logicalrep_read_prepare(s, &prepare_data);
@@ -903,15 +938,6 @@ apply_handle_prepare(StringInfo s)
LSN_FORMAT_ARGS(remote_final_lsn))));
/*
- * Compute unique GID for two_phase transactions. We don't use GID of
- * prepared transaction sent by server as that can lead to deadlock when
- * we have multiple subscriptions from same node point to publications on
- * the same node. See comments atop worker.c
- */
- TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
- gid, sizeof(gid));
-
- /*
* Unlike commit, here, we always prepare the transaction even though no
* change has happened in this transaction. It is done this way because at
* commit prepared time, we won't know whether we have skipped preparing a
@@ -923,21 +949,8 @@ apply_handle_prepare(StringInfo s)
*/
begin_replication_step();
- /*
- * BeginTransactionBlock is necessary to balance the EndTransactionBlock
- * called within the PrepareTransactionBlock below.
- */
- BeginTransactionBlock();
- CommitTransactionCommand(); /* Completes the preceding Begin command. */
-
- /*
- * Update origin state so we can restart streaming from correct position
- * in case of crash.
- */
- replorigin_session_origin_lsn = prepare_data.end_lsn;
- replorigin_session_origin_timestamp = prepare_data.prepare_time;
+ apply_handle_prepare_internal(&prepare_data);
- PrepareTransactionBlock(gid);
end_replication_step();
CommitTransactionCommand();
pgstat_report_stat(false);
@@ -1256,30 +1269,19 @@ apply_handle_stream_abort(StringInfo s)
}
/*
- * Handle STREAM COMMIT message.
+ * Common spoolfile processing.
*/
static void
-apply_handle_stream_commit(StringInfo s)
+apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
{
- TransactionId xid;
StringInfoData s2;
int nchanges;
char path[MAXPGPATH];
char *buffer = NULL;
- LogicalRepCommitData commit_data;
StreamXidHash *ent;
MemoryContext oldcxt;
BufFile *fd;
- if (in_streamed_transaction)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("STREAM COMMIT message without STREAM STOP")));
-
- xid = logicalrep_read_stream_commit(s, &commit_data);
-
- elog(DEBUG1, "received commit for streamed transaction %u", xid);
-
/* Make sure we have an open transaction */
begin_replication_step();
@@ -1311,7 +1313,7 @@ apply_handle_stream_commit(StringInfo s)
MemoryContextSwitchTo(oldcxt);
- remote_final_lsn = commit_data.commit_lsn;
+ remote_final_lsn = lsn;
/*
* Make sure the handle apply_dispatch methods are aware we're in a remote
@@ -1390,6 +1392,29 @@ apply_handle_stream_commit(StringInfo s)
elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
nchanges, path);
+ return;
+}
+
+/*
+ * Handle STREAM COMMIT message.
+ */
+static void
+apply_handle_stream_commit(StringInfo s)
+{
+ TransactionId xid;
+ LogicalRepCommitData commit_data;
+
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("STREAM COMMIT message without STREAM STOP")));
+
+ xid = logicalrep_read_stream_commit(s, &commit_data);
+
+ elog(DEBUG1, "received commit for streamed transaction %u", xid);
+
+ apply_spooled_messages(xid, commit_data.commit_lsn);
+
apply_handle_commit_internal(s, &commit_data);
/* unlink the files with serialized changes and subxact info */