aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/pgoutput/pgoutput.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c367
1 files changed, 351 insertions, 16 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 81ef7dc4c1a..c29c0888133 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -47,17 +47,40 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferChange *change);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
+static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
static bool publications_valid;
+static bool in_streaming;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
-static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx);
+static void send_relation_and_attrs(Relation relation, TransactionId xid,
+ LogicalDecodingContext *ctx);
/*
* Entry in the map used to remember which relation schemas we sent.
*
+ * The schema_sent flag determines if the current schema record was already
+ * sent to the subscriber (in which case we don't need to send it again).
+ *
+ * The schema cache on downstream is however updated only at commit time,
+ * and with streamed transactions the commit order may be different from
+ * the order the transactions are sent in. Also, the (sub) transactions
+ * might get aborted so we need to send the schema for each (sub) transaction
+ * so that we don't loose the schema information on abort. For handling this,
+ * we maintain the list of xids (streamed_txns) for those we have already sent
+ * the schema.
+ *
* For partitions, 'pubactions' considers not only the table's own
* publications, but also those of all of its ancestors.
*/
@@ -70,6 +93,8 @@ typedef struct RelationSyncEntry
* have been sent for this to be true.
*/
bool schema_sent;
+ List *streamed_txns; /* streamed toplevel transactions with this
+ * schema */
bool replicate_valid;
PublicationActions pubactions;
@@ -95,10 +120,15 @@ typedef struct RelationSyncEntry
static HTAB *RelationSyncCache = NULL;
static void init_rel_sync_cache(MemoryContext decoding_context);
+static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
uint32 hashvalue);
+static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
+ TransactionId xid);
+static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
+ TransactionId xid);
/*
* Specify output plugin callbacks
@@ -115,16 +145,26 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->commit_cb = pgoutput_commit_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
cb->shutdown_cb = pgoutput_shutdown;
+
+ /* transaction streaming */
+ cb->stream_start_cb = pgoutput_stream_start;
+ cb->stream_stop_cb = pgoutput_stream_stop;
+ cb->stream_abort_cb = pgoutput_stream_abort;
+ cb->stream_commit_cb = pgoutput_stream_commit;
+ cb->stream_change_cb = pgoutput_change;
+ cb->stream_truncate_cb = pgoutput_truncate;
}
static void
parse_output_parameters(List *options, uint32 *protocol_version,
- List **publication_names, bool *binary)
+ List **publication_names, bool *binary,
+ bool *enable_streaming)
{
ListCell *lc;
bool protocol_version_given = false;
bool publication_names_given = false;
bool binary_option_given = false;
+ bool streaming_given = false;
*binary = false;
@@ -182,6 +222,16 @@ parse_output_parameters(List *options, uint32 *protocol_version,
*binary = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "streaming") == 0)
+ {
+ if (streaming_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ streaming_given = true;
+
+ *enable_streaming = defGetBoolean(defel);
+ }
else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
}
@@ -194,6 +244,7 @@ static void
pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
bool is_init)
{
+ bool enable_streaming = false;
PGOutputData *data = palloc0(sizeof(PGOutputData));
/* Create our memory context for private allocations. */
@@ -217,7 +268,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
parse_output_parameters(ctx->output_plugin_options,
&data->protocol_version,
&data->publication_names,
- &data->binary);
+ &data->binary,
+ &enable_streaming);
/* Check if we support requested protocol */
if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
@@ -237,6 +289,27 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("publication_names parameter missing")));
+ /*
+ * Decide whether to enable streaming. It is disabled by default, in
+ * which case we just update the flag in decoding context. Otherwise
+ * we only allow it with sufficient version of the protocol, and when
+ * the output plugin supports it.
+ */
+ if (!enable_streaming)
+ ctx->streaming = false;
+ else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("requested proto_version=%d does not support streaming, need %d or higher",
+ data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
+ else if (!ctx->streaming)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("streaming requested, but not supported by output plugin")));
+
+ /* Also remember we're currently not streaming any transaction. */
+ in_streaming = false;
+
/* Init publication state. */
data->publications = NIL;
publications_valid = false;
@@ -247,6 +320,11 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
/* Initialize relation schema cache. */
init_rel_sync_cache(CacheMemoryContext);
}
+ else
+ {
+ /* Disable the streaming during the slot initialization mode. */
+ ctx->streaming = false;
+ }
}
/*
@@ -305,9 +383,47 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
*/
static void
maybe_send_schema(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, ReorderBufferChange *change,
Relation relation, RelationSyncEntry *relentry)
{
- if (relentry->schema_sent)
+ bool schema_sent;
+ TransactionId xid = InvalidTransactionId;
+ TransactionId topxid = InvalidTransactionId;
+
+ /*
+ * Remember XID of the (sub)transaction for the change. We don't care if
+ * it's top-level transaction or not (we have already sent that XID in
+ * start of the current streaming block).
+ *
+ * If we're not in a streaming block, just use InvalidTransactionId and
+ * the write methods will not include it.
+ */
+ if (in_streaming)
+ xid = change->txn->xid;
+
+ if (change->txn->toptxn)
+ topxid = change->txn->toptxn->xid;
+ else
+ topxid = xid;
+
+ /*
+ * Do we need to send the schema? We do track streamed transactions
+ * separately, because those may be applied later (and the regular
+ * transactions won't see their effects until then) and in an order that
+ * we don't know at this point.
+ *
+ * XXX There is a scope of optimization here. Currently, we always send
+ * the schema first time in a streaming transaction but we can probably
+ * avoid that by checking 'relentry->schema_sent' flag. However, before
+ * doing that we need to study its impact on the case where we have a mix
+ * of streaming and non-streaming transactions.
+ */
+ if (in_streaming)
+ schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
+ else
+ schema_sent = relentry->schema_sent;
+
+ if (schema_sent)
return;
/* If needed, send the ancestor's schema first. */
@@ -323,19 +439,24 @@ maybe_send_schema(LogicalDecodingContext *ctx,
relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc),
CreateTupleDescCopy(outdesc));
MemoryContextSwitchTo(oldctx);
- send_relation_and_attrs(ancestor, ctx);
+ send_relation_and_attrs(ancestor, xid, ctx);
RelationClose(ancestor);
}
- send_relation_and_attrs(relation, ctx);
- relentry->schema_sent = true;
+ send_relation_and_attrs(relation, xid, ctx);
+
+ if (in_streaming)
+ set_schema_sent_in_streamed_txn(relentry, topxid);
+ else
+ relentry->schema_sent = true;
}
/*
* Sends a relation
*/
static void
-send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx)
+send_relation_and_attrs(Relation relation, TransactionId xid,
+ LogicalDecodingContext *ctx)
{
TupleDesc desc = RelationGetDescr(relation);
int i;
@@ -359,17 +480,19 @@ send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx)
continue;
OutputPluginPrepareWrite(ctx, false);
- logicalrep_write_typ(ctx->out, att->atttypid);
+ logicalrep_write_typ(ctx->out, xid, att->atttypid);
OutputPluginWrite(ctx, false);
}
OutputPluginPrepareWrite(ctx, false);
- logicalrep_write_rel(ctx->out, relation);
+ logicalrep_write_rel(ctx->out, xid, relation);
OutputPluginWrite(ctx, false);
}
/*
* Sends the decoded DML over wire.
+ *
+ * This is called both in streaming and non-streaming modes.
*/
static void
pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
@@ -378,10 +501,20 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
MemoryContext old;
RelationSyncEntry *relentry;
+ TransactionId xid = InvalidTransactionId;
if (!is_publishable_relation(relation))
return;
+ /*
+ * Remember the xid for the change in streaming mode. We need to send xid
+ * with each change in the streaming mode so that subscriber can make
+ * their association and on aborts, it can discard the corresponding
+ * changes.
+ */
+ if (in_streaming)
+ xid = change->txn->xid;
+
relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
/* First check the table filter */
@@ -406,7 +539,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
- maybe_send_schema(ctx, relation, relentry);
+ maybe_send_schema(ctx, txn, change, relation, relentry);
/* Send the data */
switch (change->action)
@@ -426,7 +559,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_insert(ctx->out, relation, tuple,
+ logicalrep_write_insert(ctx->out, xid, relation, tuple,
data->binary);
OutputPluginWrite(ctx, true);
break;
@@ -451,8 +584,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_update(ctx->out, relation, oldtuple, newtuple,
- data->binary);
+ logicalrep_write_update(ctx->out, xid, relation, oldtuple,
+ newtuple, data->binary);
OutputPluginWrite(ctx, true);
break;
}
@@ -472,7 +605,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_delete(ctx->out, relation, oldtuple,
+ logicalrep_write_delete(ctx->out, xid, relation, oldtuple,
data->binary);
OutputPluginWrite(ctx, true);
}
@@ -498,6 +631,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int i;
int nrelids;
Oid *relids;
+ TransactionId xid = InvalidTransactionId;
+
+ /* Remember the xid for the change in streaming mode. See pgoutput_change. */
+ if (in_streaming)
+ xid = change->txn->xid;
old = MemoryContextSwitchTo(data->context);
@@ -526,13 +664,14 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
continue;
relids[nrelids++] = relid;
- maybe_send_schema(ctx, relation, relentry);
+ maybe_send_schema(ctx, txn, change, relation, relentry);
}
if (nrelids > 0)
{
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_truncate(ctx->out,
+ xid,
nrelids,
relids,
change->data.truncate.cascade,
@@ -606,6 +745,118 @@ publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
}
/*
+ * START STREAM callback
+ */
+static void
+pgoutput_stream_start(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn)
+{
+ bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+
+ /* we can't nest streaming of transactions */
+ Assert(!in_streaming);
+
+ /*
+ * If we already sent the first stream for this transaction then don't
+ * send the origin id in the subsequent streams.
+ */
+ if (rbtxn_is_streamed(txn))
+ send_replication_origin = false;
+
+ OutputPluginPrepareWrite(ctx, !send_replication_origin);
+ logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
+
+ if (send_replication_origin)
+ {
+ char *origin;
+
+ /* Message boundary */
+ OutputPluginWrite(ctx, false);
+ OutputPluginPrepareWrite(ctx, true);
+
+ if (replorigin_by_oid(txn->origin_id, true, &origin))
+ logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr);
+ }
+
+ OutputPluginWrite(ctx, true);
+
+ /* we're streaming a chunk of transaction now */
+ in_streaming = true;
+}
+
+/*
+ * STOP STREAM callback
+ */
+static void
+pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn)
+{
+ /* we should be streaming a trasanction */
+ Assert(in_streaming);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_stream_stop(ctx->out);
+ OutputPluginWrite(ctx, true);
+
+ /* we've stopped streaming a transaction */
+ in_streaming = false;
+}
+
+/*
+ * Notify downstream to discard the streamed transaction (along with all
+ * it's subtransactions, if it's a toplevel transaction).
+ */
+static void
+pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn)
+{
+ ReorderBufferTXN *toptxn;
+
+ /*
+ * The abort should happen outside streaming block, even for streamed
+ * transactions. The transaction has to be marked as streamed, though.
+ */
+ Assert(!in_streaming);
+
+ /* determine the toplevel transaction */
+ toptxn = (txn->toptxn) ? txn->toptxn : txn;
+
+ Assert(rbtxn_is_streamed(toptxn));
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
+ OutputPluginWrite(ctx, true);
+
+ cleanup_rel_sync_cache(toptxn->xid, false);
+}
+
+/*
+ * Notify downstream to apply the streamed transaction (along with all
+ * it's subtransactions).
+ */
+static void
+pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ /*
+ * The commit should happen outside streaming block, even for streamed
+ * transactions. The transaction has to be marked as streamed, though.
+ */
+ Assert(!in_streaming);
+ Assert(rbtxn_is_streamed(txn));
+
+ OutputPluginUpdateProgress(ctx);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
+ OutputPluginWrite(ctx, true);
+
+ cleanup_rel_sync_cache(txn->xid, true);
+}
+
+/*
* Initialize the relation schema sync cache for a decoding session.
*
* The hash table is destroyed at the end of a decoding session. While
@@ -642,6 +893,39 @@ init_rel_sync_cache(MemoryContext cachectx)
}
/*
+ * We expect relatively small number of streamed transactions.
+ */
+static bool
+get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
+{
+ ListCell *lc;
+
+ foreach(lc, entry->streamed_txns)
+ {
+ if (xid == (uint32) lfirst_int(lc))
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * Add the xid in the rel sync entry for which we have already sent the schema
+ * of the relation.
+ */
+static void
+set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
+{
+ MemoryContext oldctx;
+
+ oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+
+ entry->streamed_txns = lappend_int(entry->streamed_txns, xid);
+
+ MemoryContextSwitchTo(oldctx);
+}
+
+/*
* Find or create entry in the relation schema cache.
*
* This looks up publications that the given relation is directly or
@@ -771,12 +1055,59 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
}
if (!found)
+ {
entry->schema_sent = false;
+ entry->streamed_txns = NULL;
+ }
return entry;
}
/*
+ * Cleanup list of streamed transactions and update the schema_sent flag.
+ *
+ * When a streamed transaction commits or aborts, we need to remove the
+ * toplevel XID from the schema cache. If the transaction aborted, the
+ * subscriber will simply throw away the schema records we streamed, so
+ * we don't need to do anything else.
+ *
+ * If the transaction is committed, the subscriber will update the relation
+ * cache - so tweak the schema_sent flag accordingly.
+ */
+static void
+cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
+{
+ HASH_SEQ_STATUS hash_seq;
+ RelationSyncEntry *entry;
+ ListCell *lc;
+
+ Assert(RelationSyncCache != NULL);
+
+ hash_seq_init(&hash_seq, RelationSyncCache);
+ while ((entry = hash_seq_search(&hash_seq)) != NULL)
+ {
+ /*
+ * We can set the schema_sent flag for an entry that has committed xid
+ * in the list as that ensures that the subscriber would have the
+ * corresponding schema and we don't need to send it unless there is
+ * any invalidation for that relation.
+ */
+ foreach(lc, entry->streamed_txns)
+ {
+ if (xid == (uint32) lfirst_int(lc))
+ {
+ if (is_commit)
+ entry->schema_sent = true;
+
+ entry->streamed_txns =
+ foreach_delete_current(entry->streamed_txns, lc);
+ break;
+ }
+ }
+ }
+}
+
+/*
* Relcache invalidation callback
*/
static void
@@ -811,7 +1142,11 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
* Reset schema sent status as the relation definition may have changed.
*/
if (entry != NULL)
+ {
entry->schema_sent = false;
+ list_free(entry->streamed_txns);
+ entry->streamed_txns = NULL;
+ }
}
/*