aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/logical.c6
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c141
-rw-r--r--src/backend/replication/walsender.c72
-rw-r--r--src/include/replication/logical.h3
-rw-r--r--src/include/replication/output_plugin.h2
-rw-r--r--src/test/subscription/t/001_rep_changes.pl28
-rw-r--r--src/test/subscription/t/020_messages.pl5
-rw-r--r--src/tools/pgindent/typedefs.list1
8 files changed, 228 insertions, 30 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 934aa13f2d3..e1f14aeecb5 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -683,12 +683,14 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
* Update progress tracking (if supported).
*/
void
-OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
+OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx,
+ bool skipped_xact)
{
if (!ctx->update_progress)
return;
- ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+ ctx->update_progress(ctx, ctx->write_location, ctx->write_xid,
+ skipped_xact);
}
/*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 893833ea83c..20d0b1e1253 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -183,6 +183,36 @@ typedef struct RelationSyncEntry
MemoryContext entry_cxt;
} RelationSyncEntry;
+/*
+ * Maintain a per-transaction level variable to track whether the transaction
+ * has sent BEGIN. BEGIN is only sent when the first change in a transaction
+ * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
+ * messages for empty transactions which saves network bandwidth.
+ *
+ * This optimization is not used for prepared transactions because if the
+ * WALSender restarts after prepare of a transaction and before commit prepared
+ * of the same transaction then we won't be able to figure out if we have
+ * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
+ * because we would have lost the in-memory txndata information that was
+ * present prior to the restart. This will result in sending a spurious
+ * COMMIT PREPARED without a corresponding prepared transaction at the
+ * downstream which would lead to an error when it tries to process it.
+ *
+ * XXX We could achieve this optimization by changing protocol to send
+ * additional information so that downstream can detect that the corresponding
+ * prepare has not been sent. However, adding such a check for every
+ * transaction in the downstream could be costly so we might want to do it
+ * optionally.
+ *
+ * We also don't have this optimization for streamed transactions because
+ * they can contain prepared transactions.
+ */
+typedef struct PGOutputTxnData
+{
+ bool sent_begin_txn; /* flag indicating whether BEGIN has
+ * been sent */
+} PGOutputTxnData;
+
/* Map used to remember which relation schemas we sent. */
static HTAB *RelationSyncCache = NULL;
@@ -488,15 +518,41 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
}
/*
- * BEGIN callback
+ * BEGIN callback.
+ *
+ * Don't send the BEGIN message here instead postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set of
+ * tables (instead of all tables) and transactions whose changes were on
+ * the table(s) that are not published will produce empty transactions. These
+ * empty transactions will send BEGIN and COMMIT messages to subscribers,
+ * using bandwidth on something with little/no use for logical replication.
*/
static void
-pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+pgoutput_begin_txn(LogicalDecodingContext * ctx, ReorderBufferTXN * txn)
+{
+ PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
+ sizeof(PGOutputTxnData));
+
+ txn->output_plugin_private = txndata;
+}
+
+/*
+ * Send BEGIN.
+ *
+ * This is called while processing the first change of the transaction.
+ */
+static void
+pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ Assert(txndata);
+ Assert(!txndata->sent_begin_txn);
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_begin(ctx->out, txn);
+ txndata->sent_begin_txn = true;
send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
send_replication_origin);
@@ -511,7 +567,25 @@ static void
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
- OutputPluginUpdateProgress(ctx);
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+ bool sent_begin_txn;
+
+ Assert(txndata);
+
+ /*
+ * We don't need to send the commit message unless some relevant change
+ * from this transaction has been sent to the downstream.
+ */
+ sent_begin_txn = txndata->sent_begin_txn;
+ OutputPluginUpdateProgress(ctx, !sent_begin_txn);
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
+
+ if (!sent_begin_txn)
+ {
+ elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
+ return;
+ }
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit(ctx->out, txn, commit_lsn);
@@ -542,7 +616,7 @@ static void
pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn)
{
- OutputPluginUpdateProgress(ctx);
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -556,7 +630,7 @@ static void
pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
- OutputPluginUpdateProgress(ctx);
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -572,7 +646,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
XLogRecPtr prepare_end_lsn,
TimestampTz prepare_time)
{
- OutputPluginUpdateProgress(ctx);
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -1295,6 +1369,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
MemoryContext old;
RelationSyncEntry *relentry;
TransactionId xid = InvalidTransactionId;
@@ -1371,6 +1446,16 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
break;
/*
+ * Send BEGIN if we haven't yet.
+ *
+ * We send the BEGIN message after ensuring that we will actually
+ * send the change. This avoids sending a pair of BEGIN/COMMIT
+ * messages for empty transactions.
+ */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+
+ /*
* Schema should be sent using the original relation because it
* also sends the ancestor's relation.
*/
@@ -1420,6 +1505,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
relentry, &action))
break;
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+
maybe_send_schema(ctx, change, relation, relentry);
OutputPluginPrepareWrite(ctx, true);
@@ -1480,6 +1569,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
relentry, &action))
break;
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+
maybe_send_schema(ctx, change, relation, relentry);
OutputPluginPrepareWrite(ctx, true);
@@ -1510,6 +1603,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
MemoryContext old;
RelationSyncEntry *relentry;
int i;
@@ -1548,6 +1642,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
continue;
relids[nrelids++] = relid;
+
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+
maybe_send_schema(ctx, change, relation, relentry);
}
@@ -1585,6 +1684,19 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (in_streaming)
xid = txn->xid;
+ /*
+ * Output BEGIN if we haven't yet. Avoid for non-transactional
+ * messages.
+ */
+ if (transactional)
+ {
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+ }
+
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_message(ctx->out,
xid,
@@ -1629,6 +1741,19 @@ pgoutput_sequence(LogicalDecodingContext *ctx,
if (!relentry->pubactions.pubsequence)
return;
+ /*
+ * Output BEGIN if we haven't yet. Avoid for non-transactional
+ * sequence changes.
+ */
+ if (transactional)
+ {
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+ }
+
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_sequence(ctx->out,
relation,
@@ -1799,7 +1924,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
Assert(!in_streaming);
Assert(rbtxn_is_streamed(txn));
- OutputPluginUpdateProgress(ctx);
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1820,7 +1945,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
{
Assert(rbtxn_is_streamed(txn));
- OutputPluginUpdateProgress(ctx);
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
OutputPluginWrite(ctx, true);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index cffb3482adf..75400a53f2f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -242,14 +242,16 @@ static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessRepliesIfAny(void);
-static void WalSndKeepalive(bool requestReply);
+static void ProcessPendingWrites(void);
+static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr);
static void WalSndKeepaliveIfNecessary(void);
static void WalSndCheckTimeOut(void);
static long WalSndComputeSleeptime(TimestampTz now);
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
-static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
+static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
+ bool skipped_xact);
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
@@ -1399,6 +1401,16 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
}
/* If we have pending write here, go to slow path */
+ ProcessPendingWrites();
+}
+
+/*
+ * Wait until there is no pending write. Also process replies from the other
+ * side and check timeouts during that.
+ */
+static void
+ProcessPendingWrites(void)
+{
for (;;)
{
long sleeptime;
@@ -1447,9 +1459,12 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
* LogicalDecodingContext 'update_progress' callback.
*
* Write the current position to the lag tracker (see XLogSendPhysical).
+ *
+ * When skipping empty transactions, send a keepalive message if necessary.
*/
static void
-WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
+ bool skipped_xact)
{
static TimestampTz sendTime = 0;
TimestampTz now = GetCurrentTimestamp();
@@ -1459,12 +1474,35 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
* avoid flooding the lag tracker when we commit frequently.
*/
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
- if (!TimestampDifferenceExceeds(sendTime, now,
- WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
- return;
+ if (TimestampDifferenceExceeds(sendTime, now,
+ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
+ {
+ LagTrackerWrite(lsn, now);
+ sendTime = now;
+ }
- LagTrackerWrite(lsn, now);
- sendTime = now;
+ /*
+ * When skipping empty transactions in synchronous replication, we send a
+ * keepalive message to avoid delaying such transactions.
+ *
+ * It is okay to check sync_standbys_defined flag without lock here as
+ * in the worst case we will just send an extra keepalive message when it
+ * is really not required.
+ */
+ if (skipped_xact &&
+ SyncRepRequested() &&
+ ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+ {
+ WalSndKeepalive(false, lsn);
+
+ /* Try to flush pending output to the client */
+ if (pq_flush_if_writable() != 0)
+ WalSndShutdown();
+
+ /* If we have pending write here, make sure it's actually flushed */
+ if (pq_is_send_pending())
+ ProcessPendingWrites();
+ }
}
/*
@@ -1550,7 +1588,7 @@ WalSndWaitForWal(XLogRecPtr loc)
if (MyWalSnd->flush < sentPtr &&
MyWalSnd->write < sentPtr &&
!waiting_for_ping_response)
- WalSndKeepalive(false);
+ WalSndKeepalive(false, InvalidXLogRecPtr);
/* check whether we're done */
if (loc <= RecentFlushPtr)
@@ -2068,7 +2106,7 @@ ProcessStandbyReplyMessage(void)
/* Send a reply if the standby requested one. */
if (replyRequested)
- WalSndKeepalive(false);
+ WalSndKeepalive(false, InvalidXLogRecPtr);
/*
* Update shared state for this WalSender process based on reply data from
@@ -3074,7 +3112,7 @@ WalSndDone(WalSndSendDataCallback send_data)
proc_exit(0);
}
if (!waiting_for_ping_response)
- WalSndKeepalive(true);
+ WalSndKeepalive(true, InvalidXLogRecPtr);
}
/*
@@ -3563,18 +3601,22 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
*
* If requestReply is set, the message requests the other party to send
* a message back to us, for heartbeat purposes. We also set a flag to
- * let nearby code that we're waiting for that response, to avoid
+ * let nearby code know that we're waiting for that response, to avoid
* repeated requests.
+ *
+ * writePtr is the location up to which the WAL is sent. It is essentially
+ * the same as sentPtr but in some cases, we need to send keep alive before
+ * sentPtr is updated like when skipping empty transactions.
*/
static void
-WalSndKeepalive(bool requestReply)
+WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
{
elog(DEBUG2, "sending replication keepalive");
/* construct the message... */
resetStringInfo(&output_message);
pq_sendbyte(&output_message, 'k');
- pq_sendint64(&output_message, sentPtr);
+ pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
pq_sendint64(&output_message, GetCurrentTimestamp());
pq_sendbyte(&output_message, requestReply ? 1 : 0);
@@ -3613,7 +3655,7 @@ WalSndKeepaliveIfNecessary(void)
wal_sender_timeout / 2);
if (last_processing >= ping_time)
{
- WalSndKeepalive(true);
+ WalSndKeepalive(true, InvalidXLogRecPtr);
/* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0)
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 1097cc9799a..a6ef16ad5b1 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -26,7 +26,8 @@ typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr,
XLogRecPtr Ptr,
- TransactionId xid
+ TransactionId xid,
+ bool skipped_xact
);
typedef struct LogicalDecodingContext
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index a16bebf76ca..fe85d49a030 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -270,6 +270,6 @@ typedef struct OutputPluginCallbacks
/* Functions in replication/logical/logical.c */
extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
-extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact);
#endif /* OUTPUT_PLUGIN_H */
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index eca1c633359..d35a133f154 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -473,6 +473,34 @@ $node_publisher->safe_psql('postgres', "INSERT INTO tab_full VALUES(0)");
$node_publisher->wait_for_catchup('tap_sub');
+# Check that we don't send BEGIN and COMMIT because of empty transaction
+# optimization. We have to look for the DEBUG1 log messages about that, so
+# temporarily bump up the log verbosity.
+$node_publisher->append_conf('postgresql.conf', "log_min_messages = debug1");
+$node_publisher->reload;
+
+# Note that the current location of the log file is not grabbed immediately
+# after reloading the configuration, but after sending one SQL command to
+# the node so that we are sure that the reloading has taken effect.
+$log_location = -s $node_publisher->logfile;
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_notrep VALUES (11)");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$logfile = slurp_file($node_publisher->logfile, $log_location);
+ok( $logfile =~
+ qr/skipped replication of an empty transaction with XID/,
+ 'empty transaction is skipped');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM tab_notrep");
+is($result, qq(0), 'check non-replicated table is empty on subscriber');
+
+$node_publisher->append_conf('postgresql.conf',
+ "log_min_messages = warning");
+$node_publisher->reload;
+
# note that data are different on provider and subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_ins");
diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl
index b5045ff3c43..d21d929c2d8 100644
--- a/src/test/subscription/t/020_messages.pl
+++ b/src/test/subscription/t/020_messages.pl
@@ -87,9 +87,8 @@ $result = $node_publisher->safe_psql(
'publication_names', 'tap_pub')
));
-# 66 67 == B C == BEGIN COMMIT
-is( $result, qq(66
-67),
+# no message and no BEGIN and COMMIT because of empty transaction optimization
+is($result, qq(),
'option messages defaults to false so message (M) is not available on slot'
);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 6b77cc64ef4..72fafb795bb 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1616,6 +1616,7 @@ PGMessageField
PGModuleMagicFunction
PGNoticeHooks
PGOutputData
+PGOutputTxnData
PGPROC
PGP_CFB
PGP_Context