diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/decode.c | 14 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 6 | ||||
-rw-r--r-- | src/include/replication/reorderbuffer.h | 4 |
3 files changed, 16 insertions, 8 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 7924581cdcd..888e064ec0f 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -746,9 +746,10 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, } /* - * Update the decoding stats at transaction prepare/commit/abort. It is - * not clear that sending more or less frequently than this would be - * better. + * Update the decoding stats at transaction prepare/commit/abort. + * Additionally we send the stats when we spill or stream the changes to + * avoid losing them in case the decoding is interrupted. It is not clear + * that sending more or less frequently than this would be better. */ UpdateDecodingStats(ctx); } @@ -828,9 +829,10 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid); /* - * Update the decoding stats at transaction prepare/commit/abort. It is - * not clear that sending more or less frequently than this would be - * better. + * Update the decoding stats at transaction prepare/commit/abort. + * Additionally we send the stats when we spill or stream the changes to + * avoid losing them in case the decoding is interrupted. It is not clear + * that sending more or less frequently than this would be better. */ UpdateDecodingStats(ctx); } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index c79425fbb73..e80a195472e 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -3559,6 +3559,9 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* don't consider already serialized transactions */ rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1; + + /* update the decoding stats */ + UpdateDecodingStats((LogicalDecodingContext *) rb->private_data); } Assert(spilled == txn->nentries_mem); @@ -3928,6 +3931,9 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* Don't consider already streamed transaction. */ rb->streamTxns += (txn_is_streamed) ? 0 : 1; + /* update the decoding stats */ + UpdateDecodingStats((LogicalDecodingContext *) rb->private_data); + Assert(dlist_is_empty(&txn->changes)); Assert(txn->nentries == 0); Assert(txn->nentries_mem == 0); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index bfab8303ee7..53cdfa5d88f 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -617,14 +617,14 @@ struct ReorderBuffer /* Statistics about transactions streamed to the decoding output plugin */ int64 streamTxns; /* number of transactions streamed */ int64 streamCount; /* streaming invocation counter */ - int64 streamBytes; /* amount of data streamed */ + int64 streamBytes; /* amount of data decoded */ /* * Statistics about all the transactions sent to the decoding output * plugin */ int64 totalTxns; /* total number of transactions sent */ - int64 totalBytes; /* total amount of data sent */ + int64 totalBytes; /* total amount of data decoded */ }; |