aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/decode.c14
-rw-r--r--src/backend/replication/logical/reorderbuffer.c6
-rw-r--r--src/include/replication/reorderbuffer.h4
3 files changed, 16 insertions, 8 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 7924581cdcd..888e064ec0f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -746,9 +746,10 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
}
/*
- * Update the decoding stats at transaction prepare/commit/abort. It is
- * not clear that sending more or less frequently than this would be
- * better.
+ * Update the decoding stats at transaction prepare/commit/abort.
+ * Additionally we send the stats when we spill or stream the changes to
+ * avoid losing them in case the decoding is interrupted. It is not clear
+ * that sending more or less frequently than this would be better.
*/
UpdateDecodingStats(ctx);
}
@@ -828,9 +829,10 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
/*
- * Update the decoding stats at transaction prepare/commit/abort. It is
- * not clear that sending more or less frequently than this would be
- * better.
+ * Update the decoding stats at transaction prepare/commit/abort.
+ * Additionally we send the stats when we spill or stream the changes to
+ * avoid losing them in case the decoding is interrupted. It is not clear
+ * that sending more or less frequently than this would be better.
*/
UpdateDecodingStats(ctx);
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c79425fbb73..e80a195472e 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -3559,6 +3559,9 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* don't consider already serialized transactions */
rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
+
+ /* update the decoding stats */
+ UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
}
Assert(spilled == txn->nentries_mem);
@@ -3928,6 +3931,9 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* Don't consider already streamed transaction. */
rb->streamTxns += (txn_is_streamed) ? 0 : 1;
+ /* update the decoding stats */
+ UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
+
Assert(dlist_is_empty(&txn->changes));
Assert(txn->nentries == 0);
Assert(txn->nentries_mem == 0);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index bfab8303ee7..53cdfa5d88f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -617,14 +617,14 @@ struct ReorderBuffer
/* Statistics about transactions streamed to the decoding output plugin */
int64 streamTxns; /* number of transactions streamed */
int64 streamCount; /* streaming invocation counter */
- int64 streamBytes; /* amount of data streamed */
+ int64 streamBytes; /* amount of data decoded */
/*
* Statistics about all the transactions sent to the decoding output
* plugin
*/
int64 totalTxns; /* total number of transactions sent */
- int64 totalBytes; /* total amount of data sent */
+ int64 totalBytes; /* total amount of data decoded */
};