aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
authorPeter Eisentraut <peter_e@gmx.net>2017-04-04 08:24:32 -0400
committerPeter Eisentraut <peter_e@gmx.net>2017-04-04 08:24:32 -0400
commitfe7bbc4ddb82c635ef08b5eadc5ce472ad515996 (patch)
tree36511b95c6a0d9e3cc9faff6aa273d233ffb8608 /src/backend/replication/logical/worker.c
parentb38006ef6d1ba2f56cc4962ed17956b74c9fa0c4 (diff)
downloadpostgresql-fe7bbc4ddb82c635ef08b5eadc5ce472ad515996.tar.gz
postgresql-fe7bbc4ddb82c635ef08b5eadc5ce472ad515996.zip
Fix remote position tracking in logical replication
We need to set the origin remote position to end_lsn, not commit_lsn, as commit_lsn is the start of commit record, and we use the origin remote position as start position when restarting replication stream. If we'd use commit_lsn, we could request data that we already received from the remote server after a crash of a downstream server. Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c13
1 files changed, 7 insertions, 6 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index d6986f59c1c..fc01cd344ca 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -421,9 +421,6 @@ apply_handle_begin(StringInfo s)
logicalrep_read_begin(s, &begin_data);
- replorigin_session_origin_timestamp = begin_data.committime;
- replorigin_session_origin_lsn = begin_data.final_lsn;
-
remote_final_lsn = begin_data.final_lsn;
in_remote_transaction = true;
@@ -443,14 +440,18 @@ apply_handle_commit(StringInfo s)
logicalrep_read_commit(s, &commit_data);
- Assert(commit_data.commit_lsn == replorigin_session_origin_lsn);
- Assert(commit_data.committime == replorigin_session_origin_timestamp);
-
Assert(commit_data.commit_lsn == remote_final_lsn);
/* The synchronization worker runs in single transaction. */
if (IsTransactionState() && !am_tablesync_worker())
{
+ /*
+ * Update origin state so we can restart streaming from correct
+ * position in case of crash.
+ */
+ replorigin_session_origin_lsn = commit_data.end_lsn;
+ replorigin_session_origin_timestamp = commit_data.committime;
+
CommitTransactionCommand();
store_flush_position(commit_data.end_lsn);