aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/Makefile2
-rw-r--r--contrib/test_decoding/expected/ondisk_startup.out43
-rw-r--r--contrib/test_decoding/specs/ondisk_startup.spec43
-rw-r--r--src/backend/replication/logical/snapbuild.c6
-rw-r--r--src/backend/replication/slot.c35
5 files changed, 116 insertions, 13 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index d7f32c3ec58..2e5a01bd730 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -53,7 +53,7 @@ regresscheck-install-force: | submake-regress submake-test_decoding
--extra-install=contrib/test_decoding \
$(REGRESSCHECKS)
-ISOLATIONCHECKS=mxact delayed_startup concurrent_ddl_dml
+ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml
isolationcheck: all | submake-isolation submake-test_decoding
$(MKDIR_P) isolation_output
diff --git a/contrib/test_decoding/expected/ondisk_startup.out b/contrib/test_decoding/expected/ondisk_startup.out
new file mode 100644
index 00000000000..65115c830a4
--- /dev/null
+++ b/contrib/test_decoding/expected/ondisk_startup.out
@@ -0,0 +1,43 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s2txid s1init s3txid s2alter s2c s1insert s1checkpoint s1start s1insert s1alter s1insert s1start
+step s2txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL;
+?column?
+
+f
+step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); <waiting ...>
+step s3txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL;
+?column?
+
+f
+step s2alter: ALTER TABLE do_write ADD COLUMN addedbys2 int;
+step s2c: COMMIT;
+step s1init: <... completed>
+?column?
+
+init
+step s1insert: INSERT INTO do_write DEFAULT VALUES;
+step s1checkpoint: CHECKPOINT;
+step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');
+data
+
+BEGIN
+table public.do_write: INSERT: id[integer]:1 addedbys2[integer]:null
+COMMIT
+step s1insert: INSERT INTO do_write DEFAULT VALUES;
+step s1alter: ALTER TABLE do_write ADD COLUMN addedbys1 int;
+step s1insert: INSERT INTO do_write DEFAULT VALUES;
+step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');
+data
+
+BEGIN
+table public.do_write: INSERT: id[integer]:2 addedbys2[integer]:null
+COMMIT
+BEGIN
+COMMIT
+BEGIN
+table public.do_write: INSERT: id[integer]:3 addedbys2[integer]:null addedbys1[integer]:null
+COMMIT
+?column?
+
+stop
diff --git a/contrib/test_decoding/specs/ondisk_startup.spec b/contrib/test_decoding/specs/ondisk_startup.spec
new file mode 100644
index 00000000000..39c4a223aee
--- /dev/null
+++ b/contrib/test_decoding/specs/ondisk_startup.spec
@@ -0,0 +1,43 @@
+# Force usage of ondisk decoding snapshots to test that code path.
+setup
+{
+ DROP TABLE IF EXISTS do_write;
+ CREATE TABLE do_write(id serial primary key);
+}
+
+teardown
+{
+ DROP TABLE do_write;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+
+session "s1"
+setup { SET synchronous_commit=on; }
+
+step "s1init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');}
+step "s1start" {SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');}
+step "s1insert" { INSERT INTO do_write DEFAULT VALUES; }
+step "s1checkpoint" { CHECKPOINT; }
+step "s1alter" { ALTER TABLE do_write ADD COLUMN addedbys1 int; }
+
+session "s2"
+setup { SET synchronous_commit=on; }
+
+step "s2txid" { BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; }
+step "s2alter" { ALTER TABLE do_write ADD COLUMN addedbys2 int; }
+step "s2c" { COMMIT; }
+
+
+session "s3"
+setup { SET synchronous_commit=on; }
+
+step "s3txid" { BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; }
+step "s3c" { COMMIT; }
+
+# Force usage of ondisk snapshot by starting and not finishing a
+# transaction with a assigned xid after consistency has been
+# reached. In combination with a checkpoint forcing a snapshot to be
+# written and a new restart point computed that'll lead to the usage
+# of the snapshot.
+permutation "s2txid" "s1init" "s3txid" "s2alter" "s2c" "s1insert" "s1checkpoint" "s1start" "s1insert" "s1alter" "s1insert" "s1start"
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 705ee0b61c2..d636ccb63b1 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1406,7 +1406,7 @@ typedef struct SnapBuildOnDisk
offsetof(SnapBuildOnDisk, version)
#define SNAPBUILD_MAGIC 0x51A1E001
-#define SNAPBUILD_VERSION 1
+#define SNAPBUILD_VERSION 2
/*
* Store/Load a snapshot from disk, depending on the snapshot builder's state.
@@ -1552,6 +1552,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
ondisk_c += sz;
+ FIN_CRC32C(ondisk->checksum);
+
/* we have valid data now, open tempfile and write it there */
fd = OpenTransientFile(tmppath,
O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
@@ -1724,6 +1726,8 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
CloseTransientFile(fd);
+ FIN_CRC32C(checksum);
+
/* verify checksum of what we've read */
if (!EQ_CRC32C(checksum, ondisk.checksum))
ereport(ERROR,
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 7817ad8659e..937b669e8cd 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -61,18 +61,29 @@ typedef struct ReplicationSlotOnDisk
uint32 version;
uint32 length;
+ /*
+ * The actual data in the slot that follows can differ based on the above
+ * 'version'.
+ */
+
ReplicationSlotPersistentData slotdata;
} ReplicationSlotOnDisk;
-/* size of the part of the slot that is version independent */
+/* size of version independent data */
#define ReplicationSlotOnDiskConstantSize \
offsetof(ReplicationSlotOnDisk, slotdata)
-/* size of the slots that is not version indepenent */
-#define ReplicationSlotOnDiskDynamicSize \
+/* size of the part of the slot not covered by the checksum */
+#define SnapBuildOnDiskNotChecksummedSize \
+ offsetof(ReplicationSlotOnDisk, version)
+/* size of the part covered by the checksum */
+#define SnapBuildOnDiskChecksummedSize \
+ sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
+/* size of the slot data that is version dependant */
+#define ReplicationSlotOnDiskV2Size \
sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
#define SLOT_MAGIC 0x1051CA1 /* format identifier */
-#define SLOT_VERSION 1 /* version for new files */
+#define SLOT_VERSION 2 /* version for new files */
/* Control array for replication slot management */
ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
@@ -992,8 +1003,8 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
cp.magic = SLOT_MAGIC;
INIT_CRC32C(cp.checksum);
- cp.version = 1;
- cp.length = ReplicationSlotOnDiskDynamicSize;
+ cp.version = SLOT_VERSION;
+ cp.length = ReplicationSlotOnDiskV2Size;
SpinLockAcquire(&slot->mutex);
@@ -1002,8 +1013,9 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
SpinLockRelease(&slot->mutex);
COMP_CRC32C(cp.checksum,
- (char *) (&cp) + ReplicationSlotOnDiskConstantSize,
- ReplicationSlotOnDiskDynamicSize);
+ (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
+ SnapBuildOnDiskChecksummedSize);
+ FIN_CRC32C(cp.checksum);
if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
{
@@ -1155,7 +1167,7 @@ RestoreSlotFromDisk(const char *name)
path, cp.version)));
/* boundary check on length */
- if (cp.length != ReplicationSlotOnDiskDynamicSize)
+ if (cp.length != ReplicationSlotOnDiskV2Size)
ereport(PANIC,
(errcode_for_file_access(),
errmsg("replication slot file \"%s\" has corrupted length %u",
@@ -1182,8 +1194,9 @@ RestoreSlotFromDisk(const char *name)
/* now verify the CRC */
INIT_CRC32C(checksum);
COMP_CRC32C(checksum,
- (char *) &cp + ReplicationSlotOnDiskConstantSize,
- ReplicationSlotOnDiskDynamicSize);
+ (char *) &cp + SnapBuildOnDiskNotChecksummedSize,
+ SnapBuildOnDiskChecksummedSize);
+ FIN_CRC32C(checksum);
if (!EQ_CRC32C(checksum, cp.checksum))
ereport(PANIC,