diff options
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r-- | src/backend/replication/logical/logical.c | 27 |
1 files changed, 27 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 788769dd738..625a7f42730 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -746,6 +746,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i /* set output state */ ctx->accept_writes = false; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.startup_cb(ctx, opt, is_init); @@ -773,6 +774,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx) /* set output state */ ctx->accept_writes = false; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.shutdown_cb(ctx); @@ -808,6 +810,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->first_lsn; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.begin_cb(ctx, txn); @@ -839,6 +842,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ + ctx->end_xact = true; /* do the actual work: call callback */ ctx->callbacks.commit_cb(ctx, txn, commit_lsn); @@ -879,6 +883,7 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->first_lsn; + ctx->end_xact = false; /* * If the plugin supports two-phase commits then begin prepare callback is @@ -923,6 +928,7 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ + ctx->end_xact = true; /* * If the plugin supports two-phase commits then prepare callback is @@ -967,6 +973,7 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ + ctx->end_xact = true; /* * If the plugin support two-phase commits then commit prepared callback @@ -1012,6 +1019,7 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ + ctx->end_xact = true; /* * If the plugin support two-phase commits then rollback prepared callback @@ -1062,6 +1070,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; + ctx->end_xact = false; + ctx->callbacks.change_cb(ctx, txn, relation, change); /* Pop the error context stack */ @@ -1102,6 +1112,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; + ctx->end_xact = false; + ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change); /* Pop the error context stack */ @@ -1129,6 +1141,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid, /* set output state */ ctx->accept_writes = false; + ctx->end_xact = false; /* do the actual work: call callback */ ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid); @@ -1159,6 +1172,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) /* set output state */ ctx->accept_writes = false; + ctx->end_xact = false; /* do the actual work: call callback */ ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id); @@ -1196,6 +1210,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; ctx->write_location = message_lsn; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix, @@ -1239,6 +1254,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = first_lsn; + ctx->end_xact = false; + /* in streaming mode, stream_start_cb is required */ if (ctx->callbacks.stream_start_cb == NULL) ereport(ERROR, @@ -1286,6 +1303,8 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = last_lsn; + ctx->end_xact = false; + /* in streaming mode, stream_stop_cb is required */ if (ctx->callbacks.stream_stop_cb == NULL) ereport(ERROR, @@ -1325,6 +1344,7 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = abort_lsn; + ctx->end_xact = true; /* in streaming mode, stream_abort_cb is required */ if (ctx->callbacks.stream_abort_cb == NULL) @@ -1369,6 +1389,7 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; + ctx->end_xact = true; /* in streaming mode with two-phase commits, stream_prepare_cb is required */ if (ctx->callbacks.stream_prepare_cb == NULL) @@ -1409,6 +1430,7 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; + ctx->end_xact = true; /* in streaming mode, stream_commit_cb is required */ if (ctx->callbacks.stream_commit_cb == NULL) @@ -1457,6 +1479,8 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; + ctx->end_xact = false; + /* in streaming mode, stream_change_cb is required */ if (ctx->callbacks.stream_change_cb == NULL) ereport(ERROR, @@ -1501,6 +1525,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; ctx->write_location = message_lsn; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix, @@ -1549,6 +1574,8 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; + ctx->end_xact = false; + ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change); /* Pop the error context stack */ |