aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/proto.c42
-rw-r--r--src/backend/replication/logical/worker.c101
2 files changed, 95 insertions, 48 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index a2452525299..2d774567e08 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -145,15 +145,15 @@ logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_da
}
/*
- * Write PREPARE to the output stream.
+ * The core functionality for logicalrep_write_prepare.
*/
-void
-logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
- XLogRecPtr prepare_lsn)
+static void
+logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
+ ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
{
uint8 flags = 0;
- pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE);
+ pq_sendbyte(out, type);
/*
* This should only ever happen for two-phase commit transactions, in
@@ -161,6 +161,7 @@ logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
*/
Assert(txn->gid != NULL);
Assert(rbtxn_prepared(txn));
+ Assert(TransactionIdIsValid(txn->xid));
/* send the flags field */
pq_sendbyte(out, flags);
@@ -176,24 +177,36 @@ logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
}
/*
- * Read transaction PREPARE from the stream.
+ * Write PREPARE to the output stream.
*/
void
-logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
+logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
+ txn, prepare_lsn);
+}
+
+/*
+ * The core functionality for logicalrep_read_prepare.
+ */
+static void
+logicalrep_read_prepare_common(StringInfo in, char *msgtype,
+ LogicalRepPreparedTxnData *prepare_data)
{
/* read flags */
uint8 flags = pq_getmsgbyte(in);
if (flags != 0)
- elog(ERROR, "unrecognized flags %u in prepare message", flags);
+ elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
/* read fields */
prepare_data->prepare_lsn = pq_getmsgint64(in);
if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
- elog(ERROR, "prepare_lsn is not set in prepare message");
+ elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
prepare_data->end_lsn = pq_getmsgint64(in);
if (prepare_data->end_lsn == InvalidXLogRecPtr)
- elog(ERROR, "end_lsn is not set in prepare message");
+ elog(ERROR, "end_lsn is not set in %s message", msgtype);
prepare_data->prepare_time = pq_getmsgint64(in);
prepare_data->xid = pq_getmsgint(in, 4);
@@ -202,6 +215,15 @@ logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
}
/*
+ * Read transaction PREPARE from the stream.
+ */
+void
+logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
+{
+ logicalrep_read_prepare_common(in, "prepare", prepare_data);
+}
+
+/*
* Write COMMIT PREPARED to the output stream.
*/
void
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b9a7a7ffbb3..3f499b11f72 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -333,6 +333,8 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
/* Compute GID for two_phase transactions */
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
+/* Common streaming function to apply all the spooled messages */
+static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
/*
* Should this worker apply changes for given relation.
@@ -885,13 +887,46 @@ apply_handle_begin_prepare(StringInfo s)
}
/*
+ * Common function to prepare the GID.
+ */
+static void
+apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
+{
+ char gid[GIDSIZE];
+
+ /*
+ * Compute unique GID for two_phase transactions. We don't use GID of
+ * prepared transaction sent by server as that can lead to deadlock when
+ * we have multiple subscriptions from same node point to publications on
+ * the same node. See comments atop worker.c
+ */
+ TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
+ gid, sizeof(gid));
+
+ /*
+ * BeginTransactionBlock is necessary to balance the EndTransactionBlock
+ * called within the PrepareTransactionBlock below.
+ */
+ BeginTransactionBlock();
+ CommitTransactionCommand(); /* Completes the preceding Begin command. */
+
+ /*
+ * Update origin state so we can restart streaming from correct position
+ * in case of crash.
+ */
+ replorigin_session_origin_lsn = prepare_data->end_lsn;
+ replorigin_session_origin_timestamp = prepare_data->prepare_time;
+
+ PrepareTransactionBlock(gid);
+}
+
+/*
* Handle PREPARE message.
*/
static void
apply_handle_prepare(StringInfo s)
{
LogicalRepPreparedTxnData prepare_data;
- char gid[GIDSIZE];
logicalrep_read_prepare(s, &prepare_data);
@@ -903,15 +938,6 @@ apply_handle_prepare(StringInfo s)
LSN_FORMAT_ARGS(remote_final_lsn))));
/*
- * Compute unique GID for two_phase transactions. We don't use GID of
- * prepared transaction sent by server as that can lead to deadlock when
- * we have multiple subscriptions from same node point to publications on
- * the same node. See comments atop worker.c
- */
- TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
- gid, sizeof(gid));
-
- /*
* Unlike commit, here, we always prepare the transaction even though no
* change has happened in this transaction. It is done this way because at
* commit prepared time, we won't know whether we have skipped preparing a
@@ -923,21 +949,8 @@ apply_handle_prepare(StringInfo s)
*/
begin_replication_step();
- /*
- * BeginTransactionBlock is necessary to balance the EndTransactionBlock
- * called within the PrepareTransactionBlock below.
- */
- BeginTransactionBlock();
- CommitTransactionCommand(); /* Completes the preceding Begin command. */
-
- /*
- * Update origin state so we can restart streaming from correct position
- * in case of crash.
- */
- replorigin_session_origin_lsn = prepare_data.end_lsn;
- replorigin_session_origin_timestamp = prepare_data.prepare_time;
+ apply_handle_prepare_internal(&prepare_data);
- PrepareTransactionBlock(gid);
end_replication_step();
CommitTransactionCommand();
pgstat_report_stat(false);
@@ -1256,30 +1269,19 @@ apply_handle_stream_abort(StringInfo s)
}
/*
- * Handle STREAM COMMIT message.
+ * Common spoolfile processing.
*/
static void
-apply_handle_stream_commit(StringInfo s)
+apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
{
- TransactionId xid;
StringInfoData s2;
int nchanges;
char path[MAXPGPATH];
char *buffer = NULL;
- LogicalRepCommitData commit_data;
StreamXidHash *ent;
MemoryContext oldcxt;
BufFile *fd;
- if (in_streamed_transaction)
- ereport(ERROR,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("STREAM COMMIT message without STREAM STOP")));
-
- xid = logicalrep_read_stream_commit(s, &commit_data);
-
- elog(DEBUG1, "received commit for streamed transaction %u", xid);
-
/* Make sure we have an open transaction */
begin_replication_step();
@@ -1311,7 +1313,7 @@ apply_handle_stream_commit(StringInfo s)
MemoryContextSwitchTo(oldcxt);
- remote_final_lsn = commit_data.commit_lsn;
+ remote_final_lsn = lsn;
/*
* Make sure the handle apply_dispatch methods are aware we're in a remote
@@ -1390,6 +1392,29 @@ apply_handle_stream_commit(StringInfo s)
elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
nchanges, path);
+ return;
+}
+
+/*
+ * Handle STREAM COMMIT message.
+ */
+static void
+apply_handle_stream_commit(StringInfo s)
+{
+ TransactionId xid;
+ LogicalRepCommitData commit_data;
+
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("STREAM COMMIT message without STREAM STOP")));
+
+ xid = logicalrep_read_stream_commit(s, &commit_data);
+
+ elog(DEBUG1, "received commit for streamed transaction %u", xid);
+
+ apply_spooled_messages(xid, commit_data.commit_lsn);
+
apply_handle_commit_internal(s, &commit_data);
/* unlink the files with serialized changes and subxact info */