diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 327 |
1 files changed, 163 insertions, 164 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 04813b506e1..9d1eab9e1e6 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -88,29 +88,29 @@ typedef struct FlushPosition { - dlist_node node; - XLogRecPtr local_end; - XLogRecPtr remote_end; + dlist_node node; + XLogRecPtr local_end; + XLogRecPtr remote_end; } FlushPosition; static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping); typedef struct SlotErrCallbackArg { - LogicalRepRelation *rel; + LogicalRepRelation *rel; int attnum; } SlotErrCallbackArg; -static MemoryContext ApplyMessageContext = NULL; -MemoryContext ApplyContext = NULL; +static MemoryContext ApplyMessageContext = NULL; +MemoryContext ApplyContext = NULL; -WalReceiverConn *wrconn = NULL; +WalReceiverConn *wrconn = NULL; -Subscription *MySubscription = NULL; -bool MySubscriptionValid = false; +Subscription *MySubscription = NULL; +bool MySubscriptionValid = false; -bool in_remote_transaction = false; -static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +bool in_remote_transaction = false; +static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); @@ -215,7 +215,7 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel) */ static void slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, - TupleTableSlot *slot) + TupleTableSlot *slot) { TupleDesc desc = RelationGetDescr(rel->localrel); int num_phys_attrs = desc->natts; @@ -271,9 +271,9 @@ slot_fill_defaults(LogicalRepRelMapEntry *rel, EState *estate, static void slot_store_error_callback(void *arg) { - SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg; - Oid remotetypoid, - localtypoid; + SlotErrCallbackArg *errarg = (SlotErrCallbackArg *) arg; + Oid remotetypoid, + localtypoid; if (errarg->attnum < 0) return; @@ -295,12 +295,12 @@ slot_store_error_callback(void *arg) */ static void slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, - char **values) + char **values) { - int natts = slot->tts_tupleDescriptor->natts; - int i; - SlotErrCallbackArg errarg; - ErrorContextCallback errcallback; + int natts = slot->tts_tupleDescriptor->natts; + int i; + SlotErrCallbackArg errarg; + ErrorContextCallback errcallback; ExecClearTuple(slot); @@ -315,14 +315,14 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, /* Call the "in" function for each non-dropped attribute */ for (i = 0; i < natts; i++) { - Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i]; - int remoteattnum = rel->attrmap[i]; + Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i]; + int remoteattnum = rel->attrmap[i]; if (!att->attisdropped && remoteattnum >= 0 && values[remoteattnum] != NULL) { - Oid typinput; - Oid typioparam; + Oid typinput; + Oid typioparam; errarg.attnum = remoteattnum; @@ -359,12 +359,12 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, */ static void slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, - char **values, bool *replaces) + char **values, bool *replaces) { - int natts = slot->tts_tupleDescriptor->natts; - int i; - SlotErrCallbackArg errarg; - ErrorContextCallback errcallback; + int natts = slot->tts_tupleDescriptor->natts; + int i; + SlotErrCallbackArg errarg; + ErrorContextCallback errcallback; slot_getallattrs(slot); ExecClearTuple(slot); @@ -380,16 +380,16 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, /* Call the "in" function for each replaced attribute */ for (i = 0; i < natts; i++) { - Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i]; - int remoteattnum = rel->attrmap[i]; + Form_pg_attribute att = slot->tts_tupleDescriptor->attrs[i]; + int remoteattnum = rel->attrmap[i]; if (remoteattnum >= 0 && !replaces[remoteattnum]) continue; if (remoteattnum >= 0 && values[remoteattnum] != NULL) { - Oid typinput; - Oid typioparam; + Oid typinput; + Oid typioparam; errarg.attnum = remoteattnum; @@ -418,7 +418,7 @@ slot_modify_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, static void apply_handle_begin(StringInfo s) { - LogicalRepBeginData begin_data; + LogicalRepBeginData begin_data; logicalrep_read_begin(s, &begin_data); @@ -437,7 +437,7 @@ apply_handle_begin(StringInfo s) static void apply_handle_commit(StringInfo s) { - LogicalRepCommitData commit_data; + LogicalRepCommitData commit_data; logicalrep_read_commit(s, &commit_data); @@ -476,8 +476,8 @@ static void apply_handle_origin(StringInfo s) { /* - * ORIGIN message can only come inside remote transaction and before - * any actual writes. + * ORIGIN message can only come inside remote transaction and before any + * actual writes. */ if (!in_remote_transaction || (IsTransactionState() && !am_tablesync_worker())) @@ -497,7 +497,7 @@ apply_handle_origin(StringInfo s) static void apply_handle_relation(StringInfo s) { - LogicalRepRelation *rel; + LogicalRepRelation *rel; rel = logicalrep_read_rel(s); logicalrep_relmap_update(rel); @@ -512,7 +512,7 @@ apply_handle_relation(StringInfo s) static void apply_handle_type(StringInfo s) { - LogicalRepTyp typ; + LogicalRepTyp typ; logicalrep_read_typ(s, &typ); logicalrep_typmap_update(&typ); @@ -526,7 +526,7 @@ apply_handle_type(StringInfo s) static Oid GetRelationIdentityOrPK(Relation rel) { - Oid idxoid; + Oid idxoid; idxoid = RelationGetReplicaIndex(rel); @@ -543,11 +543,11 @@ static void apply_handle_insert(StringInfo s) { LogicalRepRelMapEntry *rel; - LogicalRepTupleData newtup; - LogicalRepRelId relid; - EState *estate; - TupleTableSlot *remoteslot; - MemoryContext oldctx; + LogicalRepTupleData newtup; + LogicalRepRelId relid; + EState *estate; + TupleTableSlot *remoteslot; + MemoryContext oldctx; ensure_transaction(); @@ -607,15 +607,15 @@ check_relation_updatable(LogicalRepRelMapEntry *rel) return; /* - * We are in error mode so it's fine this is somewhat slow. - * It's better to give user correct error. + * We are in error mode so it's fine this is somewhat slow. It's better to + * give user correct error. */ if (OidIsValid(GetRelationIdentityOrPK(rel->localrel))) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("publisher does not send replica identity column " - "expected by the logical replication target relation \"%s.%s\"", + "expected by the logical replication target relation \"%s.%s\"", rel->remoterel.nspname, rel->remoterel.relname))); } @@ -637,17 +637,17 @@ static void apply_handle_update(StringInfo s) { LogicalRepRelMapEntry *rel; - LogicalRepRelId relid; - Oid idxoid; - EState *estate; - EPQState epqstate; - LogicalRepTupleData oldtup; - LogicalRepTupleData newtup; - bool has_oldtup; - TupleTableSlot *localslot; - TupleTableSlot *remoteslot; - bool found; - MemoryContext oldctx; + LogicalRepRelId relid; + Oid idxoid; + EState *estate; + EPQState epqstate; + LogicalRepTupleData oldtup; + LogicalRepTupleData newtup; + bool has_oldtup; + TupleTableSlot *localslot; + TupleTableSlot *remoteslot; + bool found; + MemoryContext oldctx; ensure_transaction(); @@ -685,8 +685,8 @@ apply_handle_update(StringInfo s) MemoryContextSwitchTo(oldctx); /* - * Try to find tuple using either replica identity index, primary key - * or if needed, sequential scan. + * Try to find tuple using either replica identity index, primary key or + * if needed, sequential scan. */ idxoid = GetRelationIdentityOrPK(rel->localrel); Assert(OidIsValid(idxoid) || @@ -758,15 +758,15 @@ static void apply_handle_delete(StringInfo s) { LogicalRepRelMapEntry *rel; - LogicalRepTupleData oldtup; - LogicalRepRelId relid; - Oid idxoid; - EState *estate; - EPQState epqstate; - TupleTableSlot *remoteslot; - TupleTableSlot *localslot; - bool found; - MemoryContext oldctx; + LogicalRepTupleData oldtup; + LogicalRepRelId relid; + Oid idxoid; + EState *estate; + EPQState epqstate; + TupleTableSlot *remoteslot; + TupleTableSlot *localslot; + bool found; + MemoryContext oldctx; ensure_transaction(); @@ -802,8 +802,8 @@ apply_handle_delete(StringInfo s) MemoryContextSwitchTo(oldctx); /* - * Try to find tuple using either replica identity index, primary key - * or if needed, sequential scan. + * Try to find tuple using either replica identity index, primary key or + * if needed, sequential scan. */ idxoid = GetRelationIdentityOrPK(rel->localrel); Assert(OidIsValid(idxoid) || @@ -826,7 +826,7 @@ apply_handle_delete(StringInfo s) } else { - /* The tuple to be deleted could not be found.*/ + /* The tuple to be deleted could not be found. */ ereport(DEBUG1, (errmsg("logical replication could not find row for delete " "in replication target %s", @@ -856,46 +856,46 @@ apply_handle_delete(StringInfo s) static void apply_dispatch(StringInfo s) { - char action = pq_getmsgbyte(s); + char action = pq_getmsgbyte(s); switch (action) { - /* BEGIN */ + /* BEGIN */ case 'B': apply_handle_begin(s); break; - /* COMMIT */ + /* COMMIT */ case 'C': apply_handle_commit(s); break; - /* INSERT */ + /* INSERT */ case 'I': apply_handle_insert(s); break; - /* UPDATE */ + /* UPDATE */ case 'U': apply_handle_update(s); break; - /* DELETE */ + /* DELETE */ case 'D': apply_handle_delete(s); break; - /* RELATION */ + /* RELATION */ case 'R': apply_handle_relation(s); break; - /* TYPE */ + /* TYPE */ case 'Y': apply_handle_type(s); break; - /* ORIGIN */ + /* ORIGIN */ case 'O': apply_handle_origin(s); break; default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid logical replication message type %c", action))); + errmsg("invalid logical replication message type %c", action))); } } @@ -925,7 +925,7 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush, dlist_foreach_modify(iter, &lsn_mapping) { FlushPosition *pos = - dlist_container(FlushPosition, node, iter.cur); + dlist_container(FlushPosition, node, iter.cur); *write = pos->remote_end; @@ -995,12 +995,12 @@ static void LogicalRepApplyLoop(XLogRecPtr last_received) { /* - * Init the ApplyMessageContext which we clean up after each - * replication protocol message. + * Init the ApplyMessageContext which we clean up after each replication + * protocol message. */ ApplyMessageContext = AllocSetContextCreate(ApplyContext, - "ApplyMessageContext", - ALLOCSET_DEFAULT_SIZES); + "ApplyMessageContext", + ALLOCSET_DEFAULT_SIZES); /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); @@ -1039,7 +1039,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) } else { - int c; + int c; StringInfoData s; /* Reset timeout. */ @@ -1108,7 +1108,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) { /* * If we didn't get any transactions for a while there might be - * unconsumed invalidation messages in the queue, consume them now. + * unconsumed invalidation messages in the queue, consume them + * now. */ AcceptInvalidationMessages(); if (!MySubscriptionValid) @@ -1126,6 +1127,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) if (endofstream) { TimeLineID tli; + walrcv_endstreaming(wrconn, &tli); break; } @@ -1152,19 +1154,18 @@ LogicalRepApplyLoop(XLogRecPtr last_received) if (rc & WL_TIMEOUT) { /* - * We didn't receive anything new. If we haven't heard - * anything from the server for more than - * wal_receiver_timeout / 2, ping the server. Also, if - * it's been longer than wal_receiver_status_interval - * since the last update we sent, send a status update to - * the master anyway, to report any progress in applying - * WAL. + * We didn't receive anything new. If we haven't heard anything + * from the server for more than wal_receiver_timeout / 2, ping + * the server. Also, if it's been longer than + * wal_receiver_status_interval since the last update we sent, + * send a status update to the master anyway, to report any + * progress in applying WAL. */ bool requestReply = false; /* - * Check if time since last receive from standby has - * reached the configured limit. + * Check if time since last receive from standby has reached the + * configured limit. */ if (wal_receiver_timeout > 0) { @@ -1180,13 +1181,13 @@ LogicalRepApplyLoop(XLogRecPtr last_received) (errmsg("terminating logical replication worker due to timeout"))); /* - * We didn't receive anything new, for half of - * receiver replication timeout. Ping the server. + * We didn't receive anything new, for half of receiver + * replication timeout. Ping the server. */ if (!ping_sent) { timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, - (wal_receiver_timeout / 2)); + (wal_receiver_timeout / 2)); if (now >= timeout) { requestReply = true; @@ -1211,17 +1212,17 @@ LogicalRepApplyLoop(XLogRecPtr last_received) static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) { - static StringInfo reply_message = NULL; - static TimestampTz send_time = 0; + static StringInfo reply_message = NULL; + static TimestampTz send_time = 0; static XLogRecPtr last_recvpos = InvalidXLogRecPtr; static XLogRecPtr last_writepos = InvalidXLogRecPtr; static XLogRecPtr last_flushpos = InvalidXLogRecPtr; - XLogRecPtr writepos; - XLogRecPtr flushpos; + XLogRecPtr writepos; + XLogRecPtr flushpos; TimestampTz now; - bool have_pending_txes; + bool have_pending_txes; /* * If the user doesn't want status to be reported to the publisher, be @@ -1237,8 +1238,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) get_flush_position(&writepos, &flushpos, &have_pending_txes); /* - * No outstanding transactions to flush, we can report the latest - * received position. This is important for synchronous replication. + * No outstanding transactions to flush, we can report the latest received + * position. This is important for synchronous replication. */ if (!have_pending_txes) flushpos = writepos = recvpos; @@ -1262,7 +1263,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) if (!reply_message) { - MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext); + MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext); + reply_message = makeStringInfo(); MemoryContextSwitchTo(oldctx); } @@ -1273,7 +1275,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) pq_sendint64(reply_message, recvpos); /* write */ pq_sendint64(reply_message, flushpos); /* flush */ pq_sendint64(reply_message, writepos); /* apply */ - pq_sendint64(reply_message, now); /* sendTime */ + pq_sendint64(reply_message, now); /* sendTime */ pq_sendbyte(reply_message, requestReply); /* replyRequested */ elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X", @@ -1300,9 +1302,9 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) static void reread_subscription(void) { - MemoryContext oldctx; - Subscription *newsub; - bool started_tx = false; + MemoryContext oldctx; + Subscription *newsub; + bool started_tx = false; /* This function might be called inside or outside of transaction. */ if (!IsTransactionState()) @@ -1317,47 +1319,45 @@ reread_subscription(void) newsub = GetSubscription(MyLogicalRepWorker->subid, true); /* - * Exit if the subscription was removed. - * This normally should not happen as the worker gets killed - * during DROP SUBSCRIPTION. + * Exit if the subscription was removed. This normally should not happen + * as the worker gets killed during DROP SUBSCRIPTION. */ if (!newsub) { ereport(LOG, - (errmsg("logical replication worker for subscription \"%s\" will " - "stop because the subscription was removed", - MySubscription->name))); + (errmsg("logical replication worker for subscription \"%s\" will " + "stop because the subscription was removed", + MySubscription->name))); walrcv_disconnect(wrconn); proc_exit(0); } /* - * Exit if the subscription was disabled. - * This normally should not happen as the worker gets killed - * during ALTER SUBSCRIPTION ... DISABLE. + * Exit if the subscription was disabled. This normally should not happen + * as the worker gets killed during ALTER SUBSCRIPTION ... DISABLE. */ if (!newsub->enabled) { ereport(LOG, - (errmsg("logical replication worker for subscription \"%s\" will " - "stop because the subscription was disabled", - MySubscription->name))); + (errmsg("logical replication worker for subscription \"%s\" will " + "stop because the subscription was disabled", + MySubscription->name))); walrcv_disconnect(wrconn); proc_exit(0); } /* - * Exit if connection string was changed. The launcher will start - * new worker. + * Exit if connection string was changed. The launcher will start new + * worker. */ if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0) { ereport(LOG, - (errmsg("logical replication worker for subscription \"%s\" will " - "restart because the connection information was changed", - MySubscription->name))); + (errmsg("logical replication worker for subscription \"%s\" will " + "restart because the connection information was changed", + MySubscription->name))); walrcv_disconnect(wrconn); proc_exit(0); @@ -1370,9 +1370,9 @@ reread_subscription(void) if (strcmp(newsub->name, MySubscription->name) != 0) { ereport(LOG, - (errmsg("logical replication worker for subscription \"%s\" will " - "restart because subscription was renamed", - MySubscription->name))); + (errmsg("logical replication worker for subscription \"%s\" will " + "restart because subscription was renamed", + MySubscription->name))); walrcv_disconnect(wrconn); proc_exit(0); @@ -1382,30 +1382,30 @@ reread_subscription(void) Assert(newsub->slotname); /* - * We need to make new connection to new slot if slot name has changed - * so exit here as well if that's the case. + * We need to make new connection to new slot if slot name has changed so + * exit here as well if that's the case. */ if (strcmp(newsub->slotname, MySubscription->slotname) != 0) { ereport(LOG, - (errmsg("logical replication worker for subscription \"%s\" will " - "restart because the replication slot name was changed", - MySubscription->name))); + (errmsg("logical replication worker for subscription \"%s\" will " + "restart because the replication slot name was changed", + MySubscription->name))); walrcv_disconnect(wrconn); proc_exit(0); } /* - * Exit if publication list was changed. The launcher will start - * new worker. + * Exit if publication list was changed. The launcher will start new + * worker. */ if (!equal(newsub->publications, MySubscription->publications)) { ereport(LOG, - (errmsg("logical replication worker for subscription \"%s\" will " - "restart because subscription's publications were changed", - MySubscription->name))); + (errmsg("logical replication worker for subscription \"%s\" will " + "restart because subscription's publications were changed", + MySubscription->name))); walrcv_disconnect(wrconn); proc_exit(0); @@ -1448,11 +1448,11 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) void ApplyWorkerMain(Datum main_arg) { - int worker_slot = DatumGetInt32(main_arg); - MemoryContext oldctx; - char originname[NAMEDATALEN]; - XLogRecPtr origin_startpos; - char *myslotname; + int worker_slot = DatumGetInt32(main_arg); + MemoryContext oldctx; + char originname[NAMEDATALEN]; + XLogRecPtr origin_startpos; + char *myslotname; WalRcvStreamOptions options; /* Attach to slot */ @@ -1488,8 +1488,8 @@ ApplyWorkerMain(Datum main_arg) /* Load the subscription into persistent memory context. */ ApplyContext = AllocSetContextCreate(TopMemoryContext, - "ApplyContext", - ALLOCSET_DEFAULT_SIZES); + "ApplyContext", + ALLOCSET_DEFAULT_SIZES); StartTransactionCommand(); oldctx = MemoryContextSwitchTo(ApplyContext); MySubscription = GetSubscription(MyLogicalRepWorker->subid, false); @@ -1503,9 +1503,9 @@ ApplyWorkerMain(Datum main_arg) if (!MySubscription->enabled) { ereport(LOG, - (errmsg("logical replication worker for subscription \"%s\" will not " - "start because the subscription was disabled during startup", - MySubscription->name))); + (errmsg("logical replication worker for subscription \"%s\" will not " + "start because the subscription was disabled during startup", + MySubscription->name))); proc_exit(0); } @@ -1530,7 +1530,7 @@ ApplyWorkerMain(Datum main_arg) if (am_tablesync_worker()) { - char *syncslotname; + char *syncslotname; /* This is table synchroniation worker, call initial sync. */ syncslotname = LogicalRepSyncTableStart(&origin_startpos); @@ -1545,10 +1545,10 @@ ApplyWorkerMain(Datum main_arg) else { /* This is main apply worker */ - RepOriginId originid; - TimeLineID startpointTLI; - char *err; - int server_version; + RepOriginId originid; + TimeLineID startpointTLI; + char *err; + int server_version; myslotname = MySubscription->slotname; @@ -1570,9 +1570,8 @@ ApplyWorkerMain(Datum main_arg) (errmsg("could not connect to the publisher: %s", err))); /* - * We don't really use the output identify_system for anything - * but it does some initializations on the upstream so let's still - * call it. + * We don't really use the output identify_system for anything but it + * does some initializations on the upstream so let's still call it. */ (void) walrcv_identify_system(wrconn, &startpointTLI, &server_version); @@ -1580,8 +1579,8 @@ ApplyWorkerMain(Datum main_arg) } /* - * Setup callback for syscache so that we know when something - * changes in the subscription relation state. + * Setup callback for syscache so that we know when something changes in + * the subscription relation state. */ CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, invalidate_syncing_table_states, |