diff options
author | Tomas Vondra <tomas.vondra@postgresql.org> | 2022-04-07 18:13:13 +0200 |
---|---|---|
committer | Tomas Vondra <tomas.vondra@postgresql.org> | 2022-04-07 20:06:36 +0200 |
commit | 2c7ea57e56ca5f668c32d4266e0a3e45b455bef5 (patch) | |
tree | c4b80357147f2212e571dd1a4522c2b73068a783 /src/backend/replication/logical/logical.c | |
parent | d7ab2a9a3c0a2800ab36bb48d1cc97370067777e (diff) | |
download | postgresql-2c7ea57e56ca5f668c32d4266e0a3e45b455bef5.tar.gz postgresql-2c7ea57e56ca5f668c32d4266e0a3e45b455bef5.zip |
Revert "Logical decoding of sequences"
This reverts a sequence of commits, implementing features related to
logical decoding and replication of sequences:
- 0da92dc530c9251735fc70b20cd004d9630a1266
- 80901b32913ffa59bf157a4d88284b2b3a7511d9
- b779d7d8fdae088d70da5ed9fcd8205035676df3
- d5ed9da41d96988d905b49bebb273a9b2d6e2915
- a180c2b34de0989269fdb819bff241a249bf5380
- 75b1521dae1ff1fde17fda2e30e591f2e5d64b6a
- 2d2232933b02d9396113662e44dca5f120d6830e
- 002c9dd97a0c874fd1693a570383e2dd38cd40d5
- 05843b1aa49df2ecc9b97c693b755bd1b6f856a9
The implementation has issues, mostly due to combining transactional and
non-transactional behavior of sequences. It's not clear how this could
be fixed, but it'll require reworking significant part of the patch.
Discussion: https://postgr.es/m/95345a19-d508-63d1-860a-f5c2f41e8d40@enterprisedb.com
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r-- | src/backend/replication/logical/logical.c | 88 |
1 files changed, 0 insertions, 88 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 30e33dace33..788769dd738 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -73,10 +73,6 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); -static void sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr sequence_lsn, Relation rel, - bool transactional, - int64 last_value, int64 log_cnt, bool is_called); /* streaming callbacks */ static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -94,10 +90,6 @@ static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); -static void stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr sequence_lsn, Relation rel, - bool transactional, - int64 last_value, int64 log_cnt, bool is_called); static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); @@ -226,7 +218,6 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->apply_truncate = truncate_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; - ctx->reorder->sequence = sequence_cb_wrapper; /* * To support streaming, we require start/stop/abort/commit/change @@ -243,7 +234,6 @@ StartupDecodingContext(List *output_plugin_options, (ctx->callbacks.stream_commit_cb != NULL) || (ctx->callbacks.stream_change_cb != NULL) || (ctx->callbacks.stream_message_cb != NULL) || - (ctx->callbacks.stream_sequence_cb != NULL) || (ctx->callbacks.stream_truncate_cb != NULL); /* @@ -261,7 +251,6 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->stream_commit = stream_commit_cb_wrapper; ctx->reorder->stream_change = stream_change_cb_wrapper; ctx->reorder->stream_message = stream_message_cb_wrapper; - ctx->reorder->stream_sequence = stream_sequence_cb_wrapper; ctx->reorder->stream_truncate = stream_truncate_cb_wrapper; @@ -1217,42 +1206,6 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, } static void -sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr sequence_lsn, Relation rel, bool transactional, - int64 last_value, int64 log_cnt, bool is_called) -{ - LogicalDecodingContext *ctx = cache->private_data; - LogicalErrorCallbackState state; - ErrorContextCallback errcallback; - - Assert(!ctx->fast_forward); - - if (ctx->callbacks.sequence_cb == NULL) - return; - - /* Push callback + info on the error context stack */ - state.ctx = ctx; - state.callback_name = "sequence"; - state.report_location = sequence_lsn; - errcallback.callback = output_plugin_error_callback; - errcallback.arg = (void *) &state; - errcallback.previous = error_context_stack; - error_context_stack = &errcallback; - - /* set output state */ - ctx->accept_writes = true; - ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; - ctx->write_location = sequence_lsn; - - /* do the actual work: call callback */ - ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional, - last_value, log_cnt, is_called); - - /* Pop the error context stack */ - error_context_stack = errcallback.previous; -} - -static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr first_lsn) { @@ -1558,47 +1511,6 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, } static void -stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr sequence_lsn, Relation rel, - bool transactional, - int64 last_value, int64 log_cnt, bool is_called) -{ - LogicalDecodingContext *ctx = cache->private_data; - LogicalErrorCallbackState state; - ErrorContextCallback errcallback; - - Assert(!ctx->fast_forward); - - /* We're only supposed to call this when streaming is supported. */ - Assert(ctx->streaming); - - /* this callback is optional */ - if (ctx->callbacks.stream_sequence_cb == NULL) - return; - - /* Push callback + info on the error context stack */ - state.ctx = ctx; - state.callback_name = "stream_sequence"; - state.report_location = sequence_lsn; - errcallback.callback = output_plugin_error_callback; - errcallback.arg = (void *) &state; - errcallback.previous = error_context_stack; - error_context_stack = &errcallback; - - /* set output state */ - ctx->accept_writes = true; - ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; - ctx->write_location = sequence_lsn; - - /* do the actual work: call callback */ - ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional, - last_value, log_cnt, is_called); - - /* Pop the error context stack */ - error_context_stack = errcallback.previous; -} - -static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change) |