diff options
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 367 |
1 files changed, 351 insertions, 16 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 81ef7dc4c1a..c29c0888133 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -47,17 +47,40 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); +static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); static bool publications_valid; +static bool in_streaming; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); -static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx); +static void send_relation_and_attrs(Relation relation, TransactionId xid, + LogicalDecodingContext *ctx); /* * Entry in the map used to remember which relation schemas we sent. * + * The schema_sent flag determines if the current schema record was already + * sent to the subscriber (in which case we don't need to send it again). + * + * The schema cache on downstream is however updated only at commit time, + * and with streamed transactions the commit order may be different from + * the order the transactions are sent in. Also, the (sub) transactions + * might get aborted so we need to send the schema for each (sub) transaction + * so that we don't loose the schema information on abort. For handling this, + * we maintain the list of xids (streamed_txns) for those we have already sent + * the schema. + * * For partitions, 'pubactions' considers not only the table's own * publications, but also those of all of its ancestors. */ @@ -70,6 +93,8 @@ typedef struct RelationSyncEntry * have been sent for this to be true. */ bool schema_sent; + List *streamed_txns; /* streamed toplevel transactions with this + * schema */ bool replicate_valid; PublicationActions pubactions; @@ -95,10 +120,15 @@ typedef struct RelationSyncEntry static HTAB *RelationSyncCache = NULL; static void init_rel_sync_cache(MemoryContext decoding_context); +static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit); static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid); static void rel_sync_cache_relation_cb(Datum arg, Oid relid); static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue); +static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, + TransactionId xid); +static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, + TransactionId xid); /* * Specify output plugin callbacks @@ -115,16 +145,26 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->commit_cb = pgoutput_commit_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; + + /* transaction streaming */ + cb->stream_start_cb = pgoutput_stream_start; + cb->stream_stop_cb = pgoutput_stream_stop; + cb->stream_abort_cb = pgoutput_stream_abort; + cb->stream_commit_cb = pgoutput_stream_commit; + cb->stream_change_cb = pgoutput_change; + cb->stream_truncate_cb = pgoutput_truncate; } static void parse_output_parameters(List *options, uint32 *protocol_version, - List **publication_names, bool *binary) + List **publication_names, bool *binary, + bool *enable_streaming) { ListCell *lc; bool protocol_version_given = false; bool publication_names_given = false; bool binary_option_given = false; + bool streaming_given = false; *binary = false; @@ -182,6 +222,16 @@ parse_output_parameters(List *options, uint32 *protocol_version, *binary = defGetBoolean(defel); } + else if (strcmp(defel->defname, "streaming") == 0) + { + if (streaming_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + streaming_given = true; + + *enable_streaming = defGetBoolean(defel); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -194,6 +244,7 @@ static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init) { + bool enable_streaming = false; PGOutputData *data = palloc0(sizeof(PGOutputData)); /* Create our memory context for private allocations. */ @@ -217,7 +268,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, parse_output_parameters(ctx->output_plugin_options, &data->protocol_version, &data->publication_names, - &data->binary); + &data->binary, + &enable_streaming); /* Check if we support requested protocol */ if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM) @@ -237,6 +289,27 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("publication_names parameter missing"))); + /* + * Decide whether to enable streaming. It is disabled by default, in + * which case we just update the flag in decoding context. Otherwise + * we only allow it with sufficient version of the protocol, and when + * the output plugin supports it. + */ + if (!enable_streaming) + ctx->streaming = false; + else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("requested proto_version=%d does not support streaming, need %d or higher", + data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM))); + else if (!ctx->streaming) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("streaming requested, but not supported by output plugin"))); + + /* Also remember we're currently not streaming any transaction. */ + in_streaming = false; + /* Init publication state. */ data->publications = NIL; publications_valid = false; @@ -247,6 +320,11 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, /* Initialize relation schema cache. */ init_rel_sync_cache(CacheMemoryContext); } + else + { + /* Disable the streaming during the slot initialization mode. */ + ctx->streaming = false; + } } /* @@ -305,9 +383,47 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, */ static void maybe_send_schema(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry) { - if (relentry->schema_sent) + bool schema_sent; + TransactionId xid = InvalidTransactionId; + TransactionId topxid = InvalidTransactionId; + + /* + * Remember XID of the (sub)transaction for the change. We don't care if + * it's top-level transaction or not (we have already sent that XID in + * start of the current streaming block). + * + * If we're not in a streaming block, just use InvalidTransactionId and + * the write methods will not include it. + */ + if (in_streaming) + xid = change->txn->xid; + + if (change->txn->toptxn) + topxid = change->txn->toptxn->xid; + else + topxid = xid; + + /* + * Do we need to send the schema? We do track streamed transactions + * separately, because those may be applied later (and the regular + * transactions won't see their effects until then) and in an order that + * we don't know at this point. + * + * XXX There is a scope of optimization here. Currently, we always send + * the schema first time in a streaming transaction but we can probably + * avoid that by checking 'relentry->schema_sent' flag. However, before + * doing that we need to study its impact on the case where we have a mix + * of streaming and non-streaming transactions. + */ + if (in_streaming) + schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid); + else + schema_sent = relentry->schema_sent; + + if (schema_sent) return; /* If needed, send the ancestor's schema first. */ @@ -323,19 +439,24 @@ maybe_send_schema(LogicalDecodingContext *ctx, relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc), CreateTupleDescCopy(outdesc)); MemoryContextSwitchTo(oldctx); - send_relation_and_attrs(ancestor, ctx); + send_relation_and_attrs(ancestor, xid, ctx); RelationClose(ancestor); } - send_relation_and_attrs(relation, ctx); - relentry->schema_sent = true; + send_relation_and_attrs(relation, xid, ctx); + + if (in_streaming) + set_schema_sent_in_streamed_txn(relentry, topxid); + else + relentry->schema_sent = true; } /* * Sends a relation */ static void -send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx) +send_relation_and_attrs(Relation relation, TransactionId xid, + LogicalDecodingContext *ctx) { TupleDesc desc = RelationGetDescr(relation); int i; @@ -359,17 +480,19 @@ send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx) continue; OutputPluginPrepareWrite(ctx, false); - logicalrep_write_typ(ctx->out, att->atttypid); + logicalrep_write_typ(ctx->out, xid, att->atttypid); OutputPluginWrite(ctx, false); } OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, relation); + logicalrep_write_rel(ctx->out, xid, relation); OutputPluginWrite(ctx, false); } /* * Sends the decoded DML over wire. + * + * This is called both in streaming and non-streaming modes. */ static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, @@ -378,10 +501,20 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; MemoryContext old; RelationSyncEntry *relentry; + TransactionId xid = InvalidTransactionId; if (!is_publishable_relation(relation)) return; + /* + * Remember the xid for the change in streaming mode. We need to send xid + * with each change in the streaming mode so that subscriber can make + * their association and on aborts, it can discard the corresponding + * changes. + */ + if (in_streaming) + xid = change->txn->xid; + relentry = get_rel_sync_entry(data, RelationGetRelid(relation)); /* First check the table filter */ @@ -406,7 +539,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); - maybe_send_schema(ctx, relation, relentry); + maybe_send_schema(ctx, txn, change, relation, relentry); /* Send the data */ switch (change->action) @@ -426,7 +559,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_insert(ctx->out, relation, tuple, + logicalrep_write_insert(ctx->out, xid, relation, tuple, data->binary); OutputPluginWrite(ctx, true); break; @@ -451,8 +584,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, relation, oldtuple, newtuple, - data->binary); + logicalrep_write_update(ctx->out, xid, relation, oldtuple, + newtuple, data->binary); OutputPluginWrite(ctx, true); break; } @@ -472,7 +605,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_delete(ctx->out, relation, oldtuple, + logicalrep_write_delete(ctx->out, xid, relation, oldtuple, data->binary); OutputPluginWrite(ctx, true); } @@ -498,6 +631,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int i; int nrelids; Oid *relids; + TransactionId xid = InvalidTransactionId; + + /* Remember the xid for the change in streaming mode. See pgoutput_change. */ + if (in_streaming) + xid = change->txn->xid; old = MemoryContextSwitchTo(data->context); @@ -526,13 +664,14 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, continue; relids[nrelids++] = relid; - maybe_send_schema(ctx, relation, relentry); + maybe_send_schema(ctx, txn, change, relation, relentry); } if (nrelids > 0) { OutputPluginPrepareWrite(ctx, true); logicalrep_write_truncate(ctx->out, + xid, nrelids, relids, change->data.truncate.cascade, @@ -606,6 +745,118 @@ publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue) } /* + * START STREAM callback + */ +static void +pgoutput_stream_start(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + + /* we can't nest streaming of transactions */ + Assert(!in_streaming); + + /* + * If we already sent the first stream for this transaction then don't + * send the origin id in the subsequent streams. + */ + if (rbtxn_is_streamed(txn)) + send_replication_origin = false; + + OutputPluginPrepareWrite(ctx, !send_replication_origin); + logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn)); + + if (send_replication_origin) + { + char *origin; + + /* Message boundary */ + OutputPluginWrite(ctx, false); + OutputPluginPrepareWrite(ctx, true); + + if (replorigin_by_oid(txn->origin_id, true, &origin)) + logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr); + } + + OutputPluginWrite(ctx, true); + + /* we're streaming a chunk of transaction now */ + in_streaming = true; +} + +/* + * STOP STREAM callback + */ +static void +pgoutput_stream_stop(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + /* we should be streaming a trasanction */ + Assert(in_streaming); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_stop(ctx->out); + OutputPluginWrite(ctx, true); + + /* we've stopped streaming a transaction */ + in_streaming = false; +} + +/* + * Notify downstream to discard the streamed transaction (along with all + * it's subtransactions, if it's a toplevel transaction). + */ +static void +pgoutput_stream_abort(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn) +{ + ReorderBufferTXN *toptxn; + + /* + * The abort should happen outside streaming block, even for streamed + * transactions. The transaction has to be marked as streamed, though. + */ + Assert(!in_streaming); + + /* determine the toplevel transaction */ + toptxn = (txn->toptxn) ? txn->toptxn : txn; + + Assert(rbtxn_is_streamed(toptxn)); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid); + OutputPluginWrite(ctx, true); + + cleanup_rel_sync_cache(toptxn->xid, false); +} + +/* + * Notify downstream to apply the streamed transaction (along with all + * it's subtransactions). + */ +static void +pgoutput_stream_commit(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + /* + * The commit should happen outside streaming block, even for streamed + * transactions. The transaction has to be marked as streamed, though. + */ + Assert(!in_streaming); + Assert(rbtxn_is_streamed(txn)); + + OutputPluginUpdateProgress(ctx); + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_commit(ctx->out, txn, commit_lsn); + OutputPluginWrite(ctx, true); + + cleanup_rel_sync_cache(txn->xid, true); +} + +/* * Initialize the relation schema sync cache for a decoding session. * * The hash table is destroyed at the end of a decoding session. While @@ -642,6 +893,39 @@ init_rel_sync_cache(MemoryContext cachectx) } /* + * We expect relatively small number of streamed transactions. + */ +static bool +get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid) +{ + ListCell *lc; + + foreach(lc, entry->streamed_txns) + { + if (xid == (uint32) lfirst_int(lc)) + return true; + } + + return false; +} + +/* + * Add the xid in the rel sync entry for which we have already sent the schema + * of the relation. + */ +static void +set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid) +{ + MemoryContext oldctx; + + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + + entry->streamed_txns = lappend_int(entry->streamed_txns, xid); + + MemoryContextSwitchTo(oldctx); +} + +/* * Find or create entry in the relation schema cache. * * This looks up publications that the given relation is directly or @@ -771,12 +1055,59 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) } if (!found) + { entry->schema_sent = false; + entry->streamed_txns = NULL; + } return entry; } /* + * Cleanup list of streamed transactions and update the schema_sent flag. + * + * When a streamed transaction commits or aborts, we need to remove the + * toplevel XID from the schema cache. If the transaction aborted, the + * subscriber will simply throw away the schema records we streamed, so + * we don't need to do anything else. + * + * If the transaction is committed, the subscriber will update the relation + * cache - so tweak the schema_sent flag accordingly. + */ +static void +cleanup_rel_sync_cache(TransactionId xid, bool is_commit) +{ + HASH_SEQ_STATUS hash_seq; + RelationSyncEntry *entry; + ListCell *lc; + + Assert(RelationSyncCache != NULL); + + hash_seq_init(&hash_seq, RelationSyncCache); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + { + /* + * We can set the schema_sent flag for an entry that has committed xid + * in the list as that ensures that the subscriber would have the + * corresponding schema and we don't need to send it unless there is + * any invalidation for that relation. + */ + foreach(lc, entry->streamed_txns) + { + if (xid == (uint32) lfirst_int(lc)) + { + if (is_commit) + entry->schema_sent = true; + + entry->streamed_txns = + foreach_delete_current(entry->streamed_txns, lc); + break; + } + } + } +} + +/* * Relcache invalidation callback */ static void @@ -811,7 +1142,11 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) * Reset schema sent status as the relation definition may have changed. */ if (entry != NULL) + { entry->schema_sent = false; + list_free(entry->streamed_txns); + entry->streamed_txns = NULL; + } } /* |