diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 101 |
1 files changed, 63 insertions, 38 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b9a7a7ffbb3..3f499b11f72 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -333,6 +333,8 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata, /* Compute GID for two_phase transactions */ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); +/* Common streaming function to apply all the spooled messages */ +static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); /* * Should this worker apply changes for given relation. @@ -885,13 +887,46 @@ apply_handle_begin_prepare(StringInfo s) } /* + * Common function to prepare the GID. + */ +static void +apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data) +{ + char gid[GIDSIZE]; + + /* + * Compute unique GID for two_phase transactions. We don't use GID of + * prepared transaction sent by server as that can lead to deadlock when + * we have multiple subscriptions from same node point to publications on + * the same node. See comments atop worker.c + */ + TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid, + gid, sizeof(gid)); + + /* + * BeginTransactionBlock is necessary to balance the EndTransactionBlock + * called within the PrepareTransactionBlock below. + */ + BeginTransactionBlock(); + CommitTransactionCommand(); /* Completes the preceding Begin command. */ + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data->end_lsn; + replorigin_session_origin_timestamp = prepare_data->prepare_time; + + PrepareTransactionBlock(gid); +} + +/* * Handle PREPARE message. */ static void apply_handle_prepare(StringInfo s) { LogicalRepPreparedTxnData prepare_data; - char gid[GIDSIZE]; logicalrep_read_prepare(s, &prepare_data); @@ -903,15 +938,6 @@ apply_handle_prepare(StringInfo s) LSN_FORMAT_ARGS(remote_final_lsn)))); /* - * Compute unique GID for two_phase transactions. We don't use GID of - * prepared transaction sent by server as that can lead to deadlock when - * we have multiple subscriptions from same node point to publications on - * the same node. See comments atop worker.c - */ - TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, - gid, sizeof(gid)); - - /* * Unlike commit, here, we always prepare the transaction even though no * change has happened in this transaction. It is done this way because at * commit prepared time, we won't know whether we have skipped preparing a @@ -923,21 +949,8 @@ apply_handle_prepare(StringInfo s) */ begin_replication_step(); - /* - * BeginTransactionBlock is necessary to balance the EndTransactionBlock - * called within the PrepareTransactionBlock below. - */ - BeginTransactionBlock(); - CommitTransactionCommand(); /* Completes the preceding Begin command. */ - - /* - * Update origin state so we can restart streaming from correct position - * in case of crash. - */ - replorigin_session_origin_lsn = prepare_data.end_lsn; - replorigin_session_origin_timestamp = prepare_data.prepare_time; + apply_handle_prepare_internal(&prepare_data); - PrepareTransactionBlock(gid); end_replication_step(); CommitTransactionCommand(); pgstat_report_stat(false); @@ -1256,30 +1269,19 @@ apply_handle_stream_abort(StringInfo s) } /* - * Handle STREAM COMMIT message. + * Common spoolfile processing. */ static void -apply_handle_stream_commit(StringInfo s) +apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) { - TransactionId xid; StringInfoData s2; int nchanges; char path[MAXPGPATH]; char *buffer = NULL; - LogicalRepCommitData commit_data; StreamXidHash *ent; MemoryContext oldcxt; BufFile *fd; - if (in_streamed_transaction) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("STREAM COMMIT message without STREAM STOP"))); - - xid = logicalrep_read_stream_commit(s, &commit_data); - - elog(DEBUG1, "received commit for streamed transaction %u", xid); - /* Make sure we have an open transaction */ begin_replication_step(); @@ -1311,7 +1313,7 @@ apply_handle_stream_commit(StringInfo s) MemoryContextSwitchTo(oldcxt); - remote_final_lsn = commit_data.commit_lsn; + remote_final_lsn = lsn; /* * Make sure the handle apply_dispatch methods are aware we're in a remote @@ -1390,6 +1392,29 @@ apply_handle_stream_commit(StringInfo s) elog(DEBUG1, "replayed %d (all) changes from file \"%s\"", nchanges, path); + return; +} + +/* + * Handle STREAM COMMIT message. + */ +static void +apply_handle_stream_commit(StringInfo s) +{ + TransactionId xid; + LogicalRepCommitData commit_data; + + if (in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("STREAM COMMIT message without STREAM STOP"))); + + xid = logicalrep_read_stream_commit(s, &commit_data); + + elog(DEBUG1, "received commit for streamed transaction %u", xid); + + apply_spooled_messages(xid, commit_data.commit_lsn); + apply_handle_commit_internal(s, &commit_data); /* unlink the files with serialized changes and subxact info */ |