aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2022-03-08 08:08:32 +0530
committerAmit Kapila <akapila@postgresql.org>2022-03-08 08:08:32 +0530
commitd3e8368c4b6e5110d8b3d12859850aeaae08dffb (patch)
tree2c5ee9eb72722f2fe5950ec8a4b671dbc454add3 /src
parent4228cabb72bb57e1df4c9d92613f1fcd4baadd5a (diff)
downloadpostgresql-d3e8368c4b6e5110d8b3d12859850aeaae08dffb.tar.gz
postgresql-d3e8368c4b6e5110d8b3d12859850aeaae08dffb.zip
Add the additional information to the logical replication worker errcontext.
This commits adds both the finish LSN (commit_lsn in case transaction got committed, prepare_lsn in case of a prepared transaction, etc.) and replication origin name to the existing error context message. This will help users in specifying the origin name and transaction finish LSN to pg_replication_origin_advance() SQL function to skip a particular transaction. Author: Masahiko Sawada Reviewed-by: Takamichi Osumi, Euler Taveira, and Amit Kapila Discussion: https://postgr.es/m/CAD21AoBarBf2oTF71ig2g_o=3Z_Dt6_sOpMQma1kFgbnA5OZ_w@mail.gmail.com
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/worker.c75
1 files changed, 56 insertions, 19 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 92aa794706d..8653e1d8402 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -226,6 +226,8 @@ typedef struct ApplyErrorCallbackArg
/* Remote node information */
int remote_attnum; /* -1 if invalid */
TransactionId remote_xid;
+ XLogRecPtr finish_lsn;
+ char *origin_name;
} ApplyErrorCallbackArg;
static ApplyErrorCallbackArg apply_error_callback_arg =
@@ -234,6 +236,8 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
+ .finish_lsn = InvalidXLogRecPtr,
+ .origin_name = NULL,
};
static MemoryContext ApplyMessageContext = NULL;
@@ -332,7 +336,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
/* Functions for apply error callback */
static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid);
+static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
static inline void reset_apply_error_context_info(void);
/*
@@ -785,7 +789,7 @@ apply_handle_begin(StringInfo s)
LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid);
+ set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
remote_final_lsn = begin_data.final_lsn;
@@ -837,7 +841,7 @@ apply_handle_begin_prepare(StringInfo s)
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
logicalrep_read_begin_prepare(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid);
+ set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
remote_final_lsn = begin_data.prepare_lsn;
@@ -936,7 +940,7 @@ apply_handle_commit_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid);
+ set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@@ -977,7 +981,7 @@ apply_handle_rollback_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data);
- set_apply_error_context_xact(rollback_data.xid);
+ set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@@ -1042,7 +1046,7 @@ apply_handle_stream_prepare(StringInfo s)
errmsg_internal("tablesync worker received a STREAM PREPARE message")));
logicalrep_read_stream_prepare(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid);
+ set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_lsn);
elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
@@ -1124,7 +1128,7 @@ apply_handle_stream_start(StringInfo s)
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction")));
- set_apply_error_context_xact(stream_xid);
+ set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr);
/*
* Initialize the worker's stream_fileset if we haven't yet. This will be
@@ -1213,7 +1217,7 @@ apply_handle_stream_abort(StringInfo s)
*/
if (xid == subxid)
{
- set_apply_error_context_xact(xid);
+ set_apply_error_context_xact(xid, InvalidXLogRecPtr);
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
}
else
@@ -1239,7 +1243,7 @@ apply_handle_stream_abort(StringInfo s)
bool found = false;
char path[MAXPGPATH];
- set_apply_error_context_xact(subxid);
+ set_apply_error_context_xact(subxid, InvalidXLogRecPtr);
subidx = -1;
begin_replication_step();
@@ -1424,7 +1428,7 @@ apply_handle_stream_commit(StringInfo s)
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data);
- set_apply_error_context_xact(xid);
+ set_apply_error_context_xact(xid, commit_data.commit_lsn);
elog(DEBUG1, "received commit for streamed transaction %u", xid);
@@ -3499,6 +3503,17 @@ ApplyWorkerMain(Datum main_arg)
myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
pfree(syncslotname);
+
+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message.
+ */
+ ReplicationOriginNameForTablesync(MySubscription->oid,
+ MyLogicalRepWorker->relid,
+ originname,
+ sizeof(originname));
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
}
else
{
@@ -3542,6 +3557,13 @@ ApplyWorkerMain(Datum main_arg)
* does some initializations on the upstream so let's still call it.
*/
(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
+
+ /*
+ * Allocate the origin name in long-lived context for error context
+ * message.
+ */
+ apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
+ originname);
}
/*
@@ -3651,36 +3673,51 @@ apply_error_callback(void *arg)
if (apply_error_callback_arg.command == 0)
return;
+ Assert(errarg->origin_name);
+
if (errarg->rel == NULL)
{
if (!TransactionIdIsValid(errarg->remote_xid))
- errcontext("processing remote data during \"%s\"",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\"",
+ errarg->origin_name,
logicalrep_message_type(errarg->command));
- else
- errcontext("processing remote data during \"%s\" in transaction %u",
+ else if (XLogRecPtrIsInvalid(errarg->finish_lsn))
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->remote_xid);
+ else
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" in transaction %u finished at %X/%X",
+ errarg->origin_name,
+ logicalrep_message_type(errarg->command),
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
}
else if (errarg->remote_attnum < 0)
- errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" in transaction %u finished at %X/%X",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname,
- errarg->remote_xid);
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
else
- errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
+ errcontext("processing remote data for replication origin \"%s\" during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u finished at %X/%X",
+ errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
errarg->rel->remoterel.relname,
errarg->rel->remoterel.attnames[errarg->remote_attnum],
- errarg->remote_xid);
+ errarg->remote_xid,
+ LSN_FORMAT_ARGS(errarg->finish_lsn));
}
/* Set transaction information of apply error callback */
static inline void
-set_apply_error_context_xact(TransactionId xid)
+set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn)
{
apply_error_callback_arg.remote_xid = xid;
+ apply_error_callback_arg.finish_lsn = lsn;
}
/* Reset all information of apply error callback */
@@ -3690,5 +3727,5 @@ reset_apply_error_context_info(void)
apply_error_callback_arg.command = 0;
apply_error_callback_arg.rel = NULL;
apply_error_callback_arg.remote_attnum = -1;
- set_apply_error_context_xact(InvalidTransactionId);
+ set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
}