aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/heap/heapam.c603
-rw-r--r--src/backend/access/rmgrdesc/heapdesc.c9
-rw-r--r--src/backend/access/rmgrdesc/xlogdesc.c1
-rw-r--r--src/backend/access/transam/twophase.c4
-rw-r--r--src/backend/access/transam/xact.c41
-rwxr-xr-xsrc/backend/access/transam/xlog.c8
-rw-r--r--src/backend/catalog/index.c2
-rw-r--r--src/backend/commands/trigger.c3
-rw-r--r--src/backend/postmaster/postmaster.c4
-rw-r--r--src/backend/utils/cache/relcache.c50
-rw-r--r--src/backend/utils/misc/postgresql.conf.sample2
-rw-r--r--src/bin/pg_controldata/pg_controldata.c2
-rw-r--r--src/include/access/heapam_xlog.h70
-rw-r--r--src/include/access/xact.h1
-rw-r--r--src/include/access/xlog.h8
-rw-r--r--src/include/access/xlog_internal.h2
-rw-r--r--src/include/utils/rel.h24
-rw-r--r--src/include/utils/relcache.h12
-rw-r--r--src/tools/pgindent/typedefs.list3
19 files changed, 714 insertions, 135 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 2035a2158f1..249fffeb061 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -85,12 +85,14 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
TransactionId xid, CommandId cid, int options);
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
Buffer newbuf, HeapTuple oldtup,
- HeapTuple newtup, bool all_visible_cleared,
- bool new_all_visible_cleared);
+ HeapTuple newtup, HeapTuple old_key_tup,
+ bool all_visible_cleared, bool new_all_visible_cleared);
static void HeapSatisfiesHOTandKeyUpdate(Relation relation,
- Bitmapset *hot_attrs, Bitmapset *key_attrs,
- bool *satisfies_hot, bool *satisfies_key,
- HeapTuple oldtup, HeapTuple newtup);
+ Bitmapset *hot_attrs,
+ Bitmapset *key_attrs, Bitmapset *id_attrs,
+ bool *satisfies_hot, bool *satisfies_key,
+ bool *satisfies_id,
+ HeapTuple oldtup, HeapTuple newtup);
static void compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask,
uint16 old_infomask2, TransactionId add_to_xmax,
LockTupleMode mode, bool is_update,
@@ -108,6 +110,9 @@ static void MultiXactIdWait(MultiXactId multi, MultiXactStatus status,
static bool ConditionalMultiXactIdWait(MultiXactId multi,
MultiXactStatus status, int *remaining,
uint16 infomask);
+static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
+static HeapTuple ExtractReplicaIdentity(Relation rel, HeapTuple tup, bool key_modified,
+ bool *copy);
/*
@@ -2103,11 +2108,24 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
xl_heap_insert xlrec;
xl_heap_header xlhdr;
XLogRecPtr recptr;
- XLogRecData rdata[3];
+ XLogRecData rdata[4];
Page page = BufferGetPage(buffer);
uint8 info = XLOG_HEAP_INSERT;
+ bool need_tuple_data;
+
+ /*
+ * For logical decoding, we need the tuple even if we're doing a
+ * full page write, so make sure to log it separately. (XXX We could
+ * alternatively store a pointer into the FPW).
+ *
+ * Also, if this is a catalog, we need to transmit combocids to
+ * properly decode, so log that as well.
+ */
+ need_tuple_data = RelationIsLogicallyLogged(relation);
+ if (RelationIsAccessibleInLogicalDecoding(relation))
+ log_heap_new_cid(relation, heaptup);
- xlrec.all_visible_cleared = all_visible_cleared;
+ xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
xlrec.target.node = relation->rd_node;
xlrec.target.tid = heaptup->t_self;
rdata[0].data = (char *) &xlrec;
@@ -2126,18 +2144,36 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
*/
rdata[1].data = (char *) &xlhdr;
rdata[1].len = SizeOfHeapHeader;
- rdata[1].buffer = buffer;
+ rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer;
rdata[1].buffer_std = true;
rdata[1].next = &(rdata[2]);
/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
rdata[2].data = (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits);
rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits);
- rdata[2].buffer = buffer;
+ rdata[2].buffer = need_tuple_data ? InvalidBuffer : buffer;
rdata[2].buffer_std = true;
rdata[2].next = NULL;
/*
+ * Make a separate rdata entry for the tuple's buffer if we're
+ * doing logical decoding, so that an eventual FPW doesn't
+ * remove the tuple's data.
+ */
+ if (need_tuple_data)
+ {
+ rdata[2].next = &(rdata[3]);
+
+ rdata[3].data = NULL;
+ rdata[3].len = 0;
+ rdata[3].buffer = buffer;
+ rdata[3].buffer_std = true;
+ rdata[3].next = NULL;
+
+ xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
+ }
+
+ /*
* If this is the single and first tuple on page, we can reinit the
* page instead of restoring the whole thing. Set flag, and hide
* buffer references from XLogInsert.
@@ -2146,7 +2182,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
{
info |= XLOG_HEAP_INIT_PAGE;
- rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
+ rdata[1].buffer = rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
}
recptr = XLogInsert(RM_HEAP_ID, info, rdata);
@@ -2272,6 +2308,8 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
Page page;
bool needwal;
Size saveFreeSpace;
+ bool need_tuple_data = RelationIsLogicallyLogged(relation);
+ bool need_cids = RelationIsAccessibleInLogicalDecoding(relation);
needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation);
saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
@@ -2358,7 +2396,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
{
XLogRecPtr recptr;
xl_heap_multi_insert *xlrec;
- XLogRecData rdata[2];
+ XLogRecData rdata[3];
uint8 info = XLOG_HEAP2_MULTI_INSERT;
char *tupledata;
int totaldatalen;
@@ -2388,7 +2426,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
/* the rest of the scratch space is used for tuple data */
tupledata = scratchptr;
- xlrec->all_visible_cleared = all_visible_cleared;
+ xlrec->flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
xlrec->node = relation->rd_node;
xlrec->blkno = BufferGetBlockNumber(buffer);
xlrec->ntuples = nthispage;
@@ -2420,6 +2458,13 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
datalen);
tuphdr->datalen = datalen;
scratchptr += datalen;
+
+ /*
+ * We don't use heap_multi_insert for catalog tuples yet, but
+ * better be prepared...
+ */
+ if (need_cids)
+ log_heap_new_cid(relation, heaptup);
}
totaldatalen = scratchptr - tupledata;
Assert((scratchptr - scratch) < BLCKSZ);
@@ -2431,17 +2476,34 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
rdata[1].data = tupledata;
rdata[1].len = totaldatalen;
- rdata[1].buffer = buffer;
+ rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer;
rdata[1].buffer_std = true;
rdata[1].next = NULL;
/*
+ * Make a separate rdata entry for the tuple's buffer if
+ * we're doing logical decoding, so that an eventual FPW
+ * doesn't remove the tuple's data.
+ */
+ if (need_tuple_data)
+ {
+ rdata[1].next = &(rdata[2]);
+
+ rdata[2].data = NULL;
+ rdata[2].len = 0;
+ rdata[2].buffer = buffer;
+ rdata[2].buffer_std = true;
+ rdata[2].next = NULL;
+ xlrec->flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
+ }
+
+ /*
* If we're going to reinitialize the whole page using the WAL
* record, hide buffer reference from XLogInsert.
*/
if (init)
{
- rdata[1].buffer = InvalidBuffer;
+ rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
info |= XLOG_HEAP_INIT_PAGE;
}
@@ -2561,6 +2623,8 @@ heap_delete(Relation relation, ItemPointer tid,
bool have_tuple_lock = false;
bool iscombo;
bool all_visible_cleared = false;
+ HeapTuple old_key_tuple = NULL; /* replica identity of the tuple */
+ bool old_key_copied = false;
Assert(ItemPointerIsValid(tid));
@@ -2734,6 +2798,12 @@ l1:
/* replace cid with a combo cid if necessary */
HeapTupleHeaderAdjustCmax(tp.t_data, &cid, &iscombo);
+ /*
+ * Compute replica identity tuple before entering the critical section so
+ * we don't PANIC upon a memory allocation failure.
+ */
+ old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied);
+
START_CRIT_SECTION();
/*
@@ -2786,9 +2856,13 @@ l1:
{
xl_heap_delete xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[2];
+ XLogRecData rdata[4];
+
+ /* For logical decode we need combocids to properly decode the catalog */
+ if (RelationIsAccessibleInLogicalDecoding(relation))
+ log_heap_new_cid(relation, &tp);
- xlrec.all_visible_cleared = all_visible_cleared;
+ xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0;
xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask,
tp.t_data->t_infomask2);
xlrec.target.node = relation->rd_node;
@@ -2805,6 +2879,37 @@ l1:
rdata[1].buffer_std = true;
rdata[1].next = NULL;
+ /*
+ * Log replica identity of the deleted tuple if there is one
+ */
+ if (old_key_tuple != NULL)
+ {
+ xl_heap_header xlhdr;
+
+ xlhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2;
+ xlhdr.t_infomask = old_key_tuple->t_data->t_infomask;
+ xlhdr.t_hoff = old_key_tuple->t_data->t_hoff;
+
+ rdata[1].next = &(rdata[2]);
+ rdata[2].data = (char*)&xlhdr;
+ rdata[2].len = SizeOfHeapHeader;
+ rdata[2].buffer = InvalidBuffer;
+ rdata[2].next = NULL;
+
+ rdata[2].next = &(rdata[3]);
+ rdata[3].data = (char *) old_key_tuple->t_data
+ + offsetof(HeapTupleHeaderData, t_bits);
+ rdata[3].len = old_key_tuple->t_len
+ - offsetof(HeapTupleHeaderData, t_bits);
+ rdata[3].buffer = InvalidBuffer;
+ rdata[3].next = NULL;
+
+ if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE;
+ else
+ xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY;
+ }
+
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata);
PageSetLSN(page, recptr);
@@ -2850,6 +2955,9 @@ l1:
pgstat_count_heap_delete(relation);
+ if (old_key_tuple != NULL && old_key_copied)
+ heap_freetuple(old_key_tuple);
+
return HeapTupleMayBeUpdated;
}
@@ -2934,9 +3042,12 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
TransactionId xid = GetCurrentTransactionId();
Bitmapset *hot_attrs;
Bitmapset *key_attrs;
+ Bitmapset *id_attrs;
ItemId lp;
HeapTupleData oldtup;
HeapTuple heaptup;
+ HeapTuple old_key_tuple = NULL;
+ bool old_key_copied = false;
Page page;
BlockNumber block;
MultiXactStatus mxact_status;
@@ -2952,6 +3063,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
bool iscombo;
bool satisfies_hot;
bool satisfies_key;
+ bool satisfies_id;
bool use_hot_update = false;
bool key_intact;
bool all_visible_cleared = false;
@@ -2979,8 +3091,10 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
* Note that we get a copy here, so we need not worry about relcache flush
* happening midway through.
*/
- hot_attrs = RelationGetIndexAttrBitmap(relation, false);
- key_attrs = RelationGetIndexAttrBitmap(relation, true);
+ hot_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_ALL);
+ key_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_KEY);
+ id_attrs = RelationGetIndexAttrBitmap(relation,
+ INDEX_ATTR_BITMAP_IDENTITY_KEY);
block = ItemPointerGetBlockNumber(otid);
buffer = ReadBuffer(relation, block);
@@ -3038,9 +3152,9 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
* is updates that don't manipulate key columns, not those that
* serendipitiously arrive at the same key values.
*/
- HeapSatisfiesHOTandKeyUpdate(relation, hot_attrs, key_attrs,
+ HeapSatisfiesHOTandKeyUpdate(relation, hot_attrs, key_attrs, id_attrs,
&satisfies_hot, &satisfies_key,
- &oldtup, newtup);
+ &satisfies_id, &oldtup, newtup);
if (satisfies_key)
{
*lockmode = LockTupleNoKeyExclusive;
@@ -3514,6 +3628,14 @@ l2:
PageSetFull(page);
}
+ /*
+ * Compute replica identity tuple before entering the critical section so
+ * we don't PANIC upon a memory allocation failure.
+ * ExtractReplicaIdentity() will return NULL if nothing needs to be
+ * logged.
+ */
+ old_key_tuple = ExtractReplicaIdentity(relation, &oldtup, !satisfies_id, &old_key_copied);
+
/* NO EREPORT(ERROR) from here till changes are logged */
START_CRIT_SECTION();
@@ -3589,11 +3711,23 @@ l2:
/* XLOG stuff */
if (RelationNeedsWAL(relation))
{
- XLogRecPtr recptr = log_heap_update(relation, buffer,
- newbuf, &oldtup, heaptup,
- all_visible_cleared,
- all_visible_cleared_new);
+ XLogRecPtr recptr;
+ /*
+ * For logical decoding we need combocids to properly decode the
+ * catalog.
+ */
+ if (RelationIsAccessibleInLogicalDecoding(relation))
+ {
+ log_heap_new_cid(relation, &oldtup);
+ log_heap_new_cid(relation, heaptup);
+ }
+
+ recptr = log_heap_update(relation, buffer,
+ newbuf, &oldtup, heaptup,
+ old_key_tuple,
+ all_visible_cleared,
+ all_visible_cleared_new);
if (newbuf != buffer)
{
PageSetLSN(BufferGetPage(newbuf), recptr);
@@ -3644,6 +3778,9 @@ l2:
heap_freetuple(heaptup);
}
+ if (old_key_tuple != NULL && old_key_copied)
+ heap_freetuple(old_key_tuple);
+
bms_free(hot_attrs);
bms_free(key_attrs);
@@ -3731,63 +3868,72 @@ heap_tuple_attr_equals(TupleDesc tupdesc, int attrnum,
/*
* Check which columns are being updated.
*
- * This simultaneously checks conditions for HOT updates and for FOR KEY
- * SHARE updates. Since much of the time they will be checking very similar
- * sets of columns, and doing the same tests on them, it makes sense to
- * optimize and do them together.
+ * This simultaneously checks conditions for HOT updates, for FOR KEY
+ * SHARE updates, and REPLICA IDENTITY concerns. Since much of the time they
+ * will be checking very similar sets of columns, and doing the same tests on
+ * them, it makes sense to optimize and do them together.
*
- * We receive two bitmapsets comprising the two sets of columns we're
+ * We receive three bitmapsets comprising the three sets of columns we're
* interested in. Note these are destructively modified; that is OK since
* this is invoked at most once in heap_update.
*
* hot_result is set to TRUE if it's okay to do a HOT update (i.e. it does not
* modified indexed columns); key_result is set to TRUE if the update does not
- * modify columns used in the key.
+ * modify columns used in the key; id_result is set to TRUE if the update does
+ * not modify columns in any index marked as the REPLICA IDENTITY.
*/
static void
-HeapSatisfiesHOTandKeyUpdate(Relation relation,
- Bitmapset *hot_attrs, Bitmapset *key_attrs,
+HeapSatisfiesHOTandKeyUpdate(Relation relation, Bitmapset *hot_attrs,
+ Bitmapset *key_attrs, Bitmapset *id_attrs,
bool *satisfies_hot, bool *satisfies_key,
+ bool *satisfies_id,
HeapTuple oldtup, HeapTuple newtup)
{
int next_hot_attnum;
int next_key_attnum;
+ int next_id_attnum;
bool hot_result = true;
bool key_result = true;
- bool key_done = false;
- bool hot_done = false;
+ bool id_result = true;
- next_hot_attnum = bms_first_member(hot_attrs);
- if (next_hot_attnum == -1)
- hot_done = true;
- else
- /* Adjust for system attributes */
- next_hot_attnum += FirstLowInvalidHeapAttributeNumber;
+ /* If REPLICA IDENTITY is set to FULL, id_attrs will be empty. */
+ Assert(bms_is_subset(id_attrs, key_attrs));
+ Assert(bms_is_subset(key_attrs, hot_attrs));
+ /*
+ * If one of these sets contains no remaining bits, bms_first_member will
+ * return -1, and after adding FirstLowInvalidHeapAttributeNumber (which
+ * is negative!) we'll get an attribute number that can't possibly be
+ * real, and thus won't match any actual attribute number.
+ */
+ next_hot_attnum = bms_first_member(hot_attrs);
+ next_hot_attnum += FirstLowInvalidHeapAttributeNumber;
next_key_attnum = bms_first_member(key_attrs);
- if (next_key_attnum == -1)
- key_done = true;
- else
- /* Adjust for system attributes */
- next_key_attnum += FirstLowInvalidHeapAttributeNumber;
+ next_key_attnum += FirstLowInvalidHeapAttributeNumber;
+ next_id_attnum = bms_first_member(id_attrs);
+ next_id_attnum += FirstLowInvalidHeapAttributeNumber;
for (;;)
{
- int check_now;
bool changed;
+ int check_now;
- /* both bitmapsets are now empty */
- if (key_done && hot_done)
- break;
-
- /* XXX there's probably an easier way ... */
- if (hot_done)
- check_now = next_key_attnum;
- if (key_done)
+ /*
+ * Since the HOT attributes are a superset of the key attributes and
+ * the key attributes are a superset of the id attributes, this logic
+ * is guaranteed to identify the next column that needs to be
+ * checked.
+ */
+ if (hot_result && next_hot_attnum > FirstLowInvalidHeapAttributeNumber)
check_now = next_hot_attnum;
+ else if (key_result && next_key_attnum > FirstLowInvalidHeapAttributeNumber)
+ check_now = next_key_attnum;
+ else if (id_result && next_id_attnum > FirstLowInvalidHeapAttributeNumber)
+ check_now = next_id_attnum;
else
- check_now = Min(next_hot_attnum, next_key_attnum);
+ break;
+ /* See whether it changed. */
changed = !heap_tuple_attr_equals(RelationGetDescr(relation),
check_now, oldtup, newtup);
if (changed)
@@ -3796,34 +3942,42 @@ HeapSatisfiesHOTandKeyUpdate(Relation relation,
hot_result = false;
if (check_now == next_key_attnum)
key_result = false;
- }
+ if (check_now == next_id_attnum)
+ id_result = false;
- /* if both are false now, we can stop checking */
- if (!hot_result && !key_result)
- break;
+ /* if all are false now, we can stop checking */
+ if (!hot_result && !key_result && !id_result)
+ break;
+ }
- if (check_now == next_hot_attnum)
+ /*
+ * Advance the next attribute numbers for the sets that contain
+ * the attribute we just checked. As we work our way through the
+ * columns, the next_attnum values will rise; but when each set
+ * becomes empty, bms_first_member() will return -1 and the attribute
+ * number will end up with a value less than
+ * FirstLowInvalidHeapAttributeNumber.
+ */
+ if (hot_result && check_now == next_hot_attnum)
{
next_hot_attnum = bms_first_member(hot_attrs);
- if (next_hot_attnum == -1)
- hot_done = true;
- else
- /* Adjust for system attributes */
- next_hot_attnum += FirstLowInvalidHeapAttributeNumber;
+ next_hot_attnum += FirstLowInvalidHeapAttributeNumber;
}
- if (check_now == next_key_attnum)
+ if (key_result && check_now == next_key_attnum)
{
next_key_attnum = bms_first_member(key_attrs);
- if (next_key_attnum == -1)
- key_done = true;
- else
- /* Adjust for system attributes */
- next_key_attnum += FirstLowInvalidHeapAttributeNumber;
+ next_key_attnum += FirstLowInvalidHeapAttributeNumber;
+ }
+ if (id_result && check_now == next_id_attnum)
+ {
+ next_id_attnum = bms_first_member(id_attrs);
+ next_id_attnum += FirstLowInvalidHeapAttributeNumber;
}
}
*satisfies_hot = hot_result;
*satisfies_key = key_result;
+ *satisfies_id = id_result;
}
/*
@@ -6140,14 +6294,17 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
static XLogRecPtr
log_heap_update(Relation reln, Buffer oldbuf,
Buffer newbuf, HeapTuple oldtup, HeapTuple newtup,
+ HeapTuple old_key_tuple,
bool all_visible_cleared, bool new_all_visible_cleared)
{
xl_heap_update xlrec;
- xl_heap_header xlhdr;
+ xl_heap_header_len xlhdr;
+ xl_heap_header_len xlhdr_idx;
uint8 info;
XLogRecPtr recptr;
- XLogRecData rdata[4];
+ XLogRecData rdata[7];
Page page = BufferGetPage(newbuf);
+ bool need_tuple_data = RelationIsLogicallyLogged(reln);
/* Caller should not call me on a non-WAL-logged relation */
Assert(RelationNeedsWAL(reln));
@@ -6163,9 +6320,12 @@ log_heap_update(Relation reln, Buffer oldbuf,
xlrec.old_infobits_set = compute_infobits(oldtup->t_data->t_infomask,
oldtup->t_data->t_infomask2);
xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data);
- xlrec.all_visible_cleared = all_visible_cleared;
+ xlrec.flags = 0;
+ if (all_visible_cleared)
+ xlrec.flags |= XLOG_HEAP_ALL_VISIBLE_CLEARED;
xlrec.newtid = newtup->t_self;
- xlrec.new_all_visible_cleared = new_all_visible_cleared;
+ if (new_all_visible_cleared)
+ xlrec.flags |= XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED;
rdata[0].data = (char *) &xlrec;
rdata[0].len = SizeOfHeapUpdate;
@@ -6178,33 +6338,86 @@ log_heap_update(Relation reln, Buffer oldbuf,
rdata[1].buffer_std = true;
rdata[1].next = &(rdata[2]);
- xlhdr.t_infomask2 = newtup->t_data->t_infomask2;
- xlhdr.t_infomask = newtup->t_data->t_infomask;
- xlhdr.t_hoff = newtup->t_data->t_hoff;
+ xlhdr.header.t_infomask2 = newtup->t_data->t_infomask2;
+ xlhdr.header.t_infomask = newtup->t_data->t_infomask;
+ xlhdr.header.t_hoff = newtup->t_data->t_hoff;
+ xlhdr.t_len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
/*
- * As with insert records, we need not store the rdata[2] segment if we
- * decide to store the whole buffer instead.
+ * As with insert records, we need not store the rdata[2] segment
+ * if we decide to store the whole buffer instead unless we're
+ * doing logical decoding.
*/
rdata[2].data = (char *) &xlhdr;
- rdata[2].len = SizeOfHeapHeader;
- rdata[2].buffer = newbuf;
+ rdata[2].len = SizeOfHeapHeaderLen;
+ rdata[2].buffer = need_tuple_data ? InvalidBuffer : newbuf;
rdata[2].buffer_std = true;
rdata[2].next = &(rdata[3]);
/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
- rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits);
+ rdata[3].data = (char *) newtup->t_data
+ + offsetof(HeapTupleHeaderData, t_bits);
rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
- rdata[3].buffer = newbuf;
+ rdata[3].buffer = need_tuple_data ? InvalidBuffer : newbuf;
rdata[3].buffer_std = true;
rdata[3].next = NULL;
+ /*
+ * Separate storage for the FPW buffer reference of the new page in the
+ * wal_level >= logical case.
+ */
+ if (need_tuple_data)
+ {
+ rdata[3].next = &(rdata[4]);
+
+ rdata[4].data = NULL,
+ rdata[4].len = 0;
+ rdata[4].buffer = newbuf;
+ rdata[4].buffer_std = true;
+ rdata[4].next = NULL;
+ xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
+
+ /* We need to log a tuple identity */
+ if (old_key_tuple)
+ {
+ /* don't really need this, but its more comfy to decode */
+ xlhdr_idx.header.t_infomask2 = old_key_tuple->t_data->t_infomask2;
+ xlhdr_idx.header.t_infomask = old_key_tuple->t_data->t_infomask;
+ xlhdr_idx.header.t_hoff = old_key_tuple->t_data->t_hoff;
+ xlhdr_idx.t_len = old_key_tuple->t_len;
+
+ rdata[4].next = &(rdata[5]);
+ rdata[5].data = (char *) &xlhdr_idx;
+ rdata[5].len = SizeOfHeapHeaderLen;
+ rdata[5].buffer = InvalidBuffer;
+ rdata[5].next = &(rdata[6]);
+
+ /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
+ rdata[6].data = (char *) old_key_tuple->t_data
+ + offsetof(HeapTupleHeaderData, t_bits);
+ rdata[6].len = old_key_tuple->t_len
+ - offsetof(HeapTupleHeaderData, t_bits);
+ rdata[6].buffer = InvalidBuffer;
+ rdata[6].next = NULL;
+
+ if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE;
+ else
+ xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY;
+ }
+ }
+
/* If new tuple is the single and first tuple on page... */
if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber &&
PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
{
+ XLogRecData *rcur = &rdata[2];
info |= XLOG_HEAP_INIT_PAGE;
- rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
+ while (rcur != NULL)
+ {
+ rcur->buffer = InvalidBuffer;
+ rcur = rcur->next;
+ }
}
recptr = XLogInsert(RM_HEAP_ID, info, rdata);
@@ -6340,6 +6553,184 @@ log_newpage_buffer(Buffer buffer, bool page_std)
}
/*
+ * Perform XLogInsert of a XLOG_HEAP2_NEW_CID record
+ *
+ * This is only used in wal_level >= WAL_LEVEL_LOGICAL, and only for catalog
+ * tuples.
+ */
+static XLogRecPtr
+log_heap_new_cid(Relation relation, HeapTuple tup)
+{
+ xl_heap_new_cid xlrec;
+
+ XLogRecPtr recptr;
+ XLogRecData rdata[1];
+ HeapTupleHeader hdr = tup->t_data;
+
+ Assert(ItemPointerIsValid(&tup->t_self));
+ Assert(tup->t_tableOid != InvalidOid);
+
+ xlrec.top_xid = GetTopTransactionId();
+ xlrec.target.node = relation->rd_node;
+ xlrec.target.tid = tup->t_self;
+
+ /*
+ * If the tuple got inserted & deleted in the same TX we definitely have a
+ * combocid, set cmin and cmax.
+ */
+ if (hdr->t_infomask & HEAP_COMBOCID)
+ {
+ Assert(!(hdr->t_infomask & HEAP_XMAX_INVALID));
+ Assert(!(hdr->t_infomask & HEAP_XMIN_INVALID));
+ xlrec.cmin = HeapTupleHeaderGetCmin(hdr);
+ xlrec.cmax = HeapTupleHeaderGetCmax(hdr);
+ xlrec.combocid = HeapTupleHeaderGetRawCommandId(hdr);
+ }
+ /* No combocid, so only cmin or cmax can be set by this TX */
+ else
+ {
+ /*
+ * Tuple inserted.
+ *
+ * We need to check for LOCK ONLY because multixacts might be
+ * transferred to the new tuple in case of FOR KEY SHARE updates in
+ * which case there will be a xmax, although the tuple just got
+ * inserted.
+ */
+ if (hdr->t_infomask & HEAP_XMAX_INVALID ||
+ HEAP_XMAX_IS_LOCKED_ONLY(hdr->t_infomask))
+ {
+ xlrec.cmin = HeapTupleHeaderGetRawCommandId(hdr);
+ xlrec.cmax = InvalidCommandId;
+ }
+ /* Tuple from a different tx updated or deleted. */
+ else
+ {
+ xlrec.cmin = InvalidCommandId;
+ xlrec.cmax = HeapTupleHeaderGetRawCommandId(hdr);
+
+ }
+ xlrec.combocid = InvalidCommandId;
+ }
+
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = SizeOfHeapNewCid;
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].next = NULL;
+
+ recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID, rdata);
+
+ return recptr;
+}
+
+/*
+ * Build a heap tuple representing the configured REPLICA IDENTITY to represent
+ * the old tuple in a UPDATE or DELETE.
+ *
+ * Returns NULL if there's no need to log a identity or if there's no suitable
+ * key in the Relation relation.
+ */
+static HeapTuple
+ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *copy)
+{
+ TupleDesc desc = RelationGetDescr(relation);
+ Relation idx_rel;
+ TupleDesc idx_desc;
+ char replident = relation->rd_rel->relreplident;
+ HeapTuple key_tuple = NULL;
+ bool copy_oid = false;
+ bool nulls[MaxHeapAttributeNumber];
+ Datum values[MaxHeapAttributeNumber];
+ int natt;
+
+ *copy = false;
+
+ if (!RelationIsLogicallyLogged(relation))
+ return NULL;
+
+ if (replident == REPLICA_IDENTITY_NOTHING)
+ return NULL;
+
+ if (replident == REPLICA_IDENTITY_FULL)
+ {
+ /*
+ * When logging the entire old tuple, it very well could contain
+ * toasted columns. If so, force them to be inlined.
+ */
+ if (HeapTupleHasExternal(tp))
+ {
+ *copy = true;
+ tp = toast_flatten_tuple(tp, RelationGetDescr(relation));
+ }
+ return tp;
+ }
+
+ /* if the key hasn't changed and we're only logging the key, we're done */
+ if (!key_changed)
+ return NULL;
+
+ /* needs to already have been fetched? */
+ if (relation->rd_indexvalid == 0)
+ RelationGetIndexList(relation);
+
+ if (!OidIsValid(relation->rd_replidindex))
+ {
+ elog(DEBUG4, "Could not find configured replica identity for table \"%s\"",
+ RelationGetRelationName(relation));
+ return NULL;
+ }
+
+ idx_rel = RelationIdGetRelation(relation->rd_replidindex);
+ idx_desc = RelationGetDescr(idx_rel);
+
+ /* deform tuple, so we have fast access to columns */
+ heap_deform_tuple(tp, desc, values, nulls);
+
+ /* set all columns to NULL, regardless of whether they actually are */
+ memset(nulls, 1, sizeof(nulls));
+
+ /*
+ * Now set all columns contained in the index to NOT NULL, they cannot
+ * currently be NULL.
+ */
+ for (natt = 0; natt < idx_desc->natts; natt++)
+ {
+ int attno = idx_rel->rd_index->indkey.values[natt];
+
+ if (attno == ObjectIdAttributeNumber)
+ copy_oid = true;
+ else if (attno < 0)
+ elog(ERROR, "system column in index");
+ else
+ nulls[attno - 1] = false;
+ }
+
+ key_tuple = heap_form_tuple(desc, values, nulls);
+ *copy = true;
+ RelationClose(idx_rel);
+
+ /* XXX: we could also do this unconditionally, the space is used anyway */
+ if (copy_oid)
+ HeapTupleSetOid(key_tuple, HeapTupleGetOid(tp));
+
+ /*
+ * If the tuple, which by here only contains indexed columns, still has
+ * toasted columns, force them to be inlined. This is somewhat unlikely
+ * since there's limits on the size of indexed columns, so we don't
+ * duplicate toast_flatten_tuple()s functionality in the above loop over
+ * the indexed columns, even if it would be more efficient.
+ */
+ if (HeapTupleHasExternal(key_tuple))
+ {
+ HeapTuple oldtup = key_tuple;
+ key_tuple = toast_flatten_tuple(oldtup, RelationGetDescr(relation));
+ heap_freetuple(oldtup);
+ }
+
+ return key_tuple;
+}
+
+/*
* Handles CLEANUP_INFO
*/
static void
@@ -6714,7 +7105,7 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
- if (xlrec->all_visible_cleared)
+ if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
{
Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
Buffer vmbuffer = InvalidBuffer;
@@ -6763,7 +7154,7 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
/* Mark the page as a candidate for pruning */
PageSetPrunable(page, record->xl_xid);
- if (xlrec->all_visible_cleared)
+ if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
/* Make sure there is no forward chain link in t_ctid */
@@ -6797,7 +7188,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
- if (xlrec->all_visible_cleared)
+ if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
{
Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
Buffer vmbuffer = InvalidBuffer;
@@ -6868,7 +7259,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
PageSetLSN(page, lsn);
- if (xlrec->all_visible_cleared)
+ if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
MarkBufferDirty(buffer);
@@ -6931,7 +7322,7 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
- if (xlrec->all_visible_cleared)
+ if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
{
Relation reln = CreateFakeRelcacheEntry(xlrec->node);
Buffer vmbuffer = InvalidBuffer;
@@ -7014,7 +7405,7 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
PageSetLSN(page, lsn);
- if (xlrec->all_visible_cleared)
+ if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
MarkBufferDirty(buffer);
@@ -7053,7 +7444,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
HeapTupleHeaderData hdr;
char data[MaxHeapTupleSize];
} tbuf;
- xl_heap_header xlhdr;
+ xl_heap_header_len xlhdr;
int hsize;
uint32 newlen;
Size freespace;
@@ -7062,7 +7453,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
- if (xlrec->all_visible_cleared)
+ if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
{
Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
BlockNumber block = ItemPointerGetBlockNumber(&xlrec->target.tid);
@@ -7140,7 +7531,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
/* Mark the page as a candidate for pruning */
PageSetPrunable(page, record->xl_xid);
- if (xlrec->all_visible_cleared)
+ if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
/*
@@ -7164,7 +7555,7 @@ newt:;
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
*/
- if (xlrec->new_all_visible_cleared)
+ if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED)
{
Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
BlockNumber block = ItemPointerGetBlockNumber(&xlrec->newtid);
@@ -7222,13 +7613,13 @@ newsame:;
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
elog(PANIC, "heap_update_redo: invalid max offset number");
- hsize = SizeOfHeapUpdate + SizeOfHeapHeader;
+ hsize = SizeOfHeapUpdate + SizeOfHeapHeaderLen;
- newlen = record->xl_len - hsize;
- Assert(newlen <= MaxHeapTupleSize);
memcpy((char *) &xlhdr,
(char *) xlrec + SizeOfHeapUpdate,
- SizeOfHeapHeader);
+ SizeOfHeapHeaderLen);
+ newlen = xlhdr.t_len;
+ Assert(newlen <= MaxHeapTupleSize);
htup = &tbuf.hdr;
MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
@@ -7236,9 +7627,9 @@ newsame:;
(char *) xlrec + hsize,
newlen);
newlen += offsetof(HeapTupleHeaderData, t_bits);
- htup->t_infomask2 = xlhdr.t_infomask2;
- htup->t_infomask = xlhdr.t_infomask;
- htup->t_hoff = xlhdr.t_hoff;
+ htup->t_infomask2 = xlhdr.header.t_infomask2;
+ htup->t_infomask = xlhdr.header.t_infomask;
+ htup->t_hoff = xlhdr.header.t_hoff;
HeapTupleHeaderSetXmin(htup, record->xl_xid);
HeapTupleHeaderSetCmin(htup, FirstCommandId);
@@ -7250,7 +7641,7 @@ newsame:;
if (offnum == InvalidOffsetNumber)
elog(PANIC, "heap_update_redo: failed to add tuple");
- if (xlrec->new_all_visible_cleared)
+ if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED)
PageClearAllVisible(page);
freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
@@ -7501,6 +7892,12 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record)
case XLOG_HEAP2_LOCK_UPDATED:
heap_xlog_lock_updated(lsn, record);
break;
+ case XLOG_HEAP2_NEW_CID:
+ /*
+ * Nothing to do on a real replay, only used during logical
+ * decoding.
+ */
+ break;
default:
elog(PANIC, "heap2_redo: unknown op code %u", info);
}
diff --git a/src/backend/access/rmgrdesc/heapdesc.c b/src/backend/access/rmgrdesc/heapdesc.c
index e14c0539105..39c53d0022d 100644
--- a/src/backend/access/rmgrdesc/heapdesc.c
+++ b/src/backend/access/rmgrdesc/heapdesc.c
@@ -184,6 +184,15 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec)
xlrec->infobits_set);
out_target(buf, &(xlrec->target));
}
+ else if (info == XLOG_HEAP2_NEW_CID)
+ {
+ xl_heap_new_cid *xlrec = (xl_heap_new_cid *) rec;
+
+ appendStringInfo(buf, "new_cid: ");
+ out_target(buf, &(xlrec->target));
+ appendStringInfo(buf, "; cmin: %u, cmax: %u, combo: %u",
+ xlrec->cmin, xlrec->cmax, xlrec->combocid);
+ }
else
appendStringInfoString(buf, "UNKNOWN");
}
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index 1d70494233e..dd67b1f643f 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -28,6 +28,7 @@ const struct config_enum_entry wal_level_options[] = {
{"minimal", WAL_LEVEL_MINIMAL, false},
{"archive", WAL_LEVEL_ARCHIVE, false},
{"hot_standby", WAL_LEVEL_HOT_STANDBY, false},
+ {"logical", WAL_LEVEL_LOGICAL, false},
{NULL, 0, false}
};
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index e975f8d26d0..48203d2c81e 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -47,6 +47,7 @@
#include "access/twophase.h"
#include "access/twophase_rmgr.h"
#include "access/xact.h"
+#include "access/xlog.h"
#include "access/xlogutils.h"
#include "catalog/pg_type.h"
#include "catalog/storage.h"
@@ -1920,7 +1921,8 @@ RecoverPreparedTransactions(void)
* the prepared transaction generated xid assignment records. Test
* here must match one used in AssignTransactionId().
*/
- if (InHotStandby && hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS)
+ if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS ||
+ XLogLogicalInfoActive()))
overwriteOK = true;
/*
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b467b5c89d1..ad0d19c4a90 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -148,6 +148,7 @@ typedef struct TransactionStateData
int prevSecContext; /* previous SecurityRestrictionContext */
bool prevXactReadOnly; /* entry-time xact r/o state */
bool startedInRecovery; /* did we start in recovery? */
+ bool didLogXid; /* has xid been included in WAL record? */
struct TransactionStateData *parent; /* back link to parent */
} TransactionStateData;
@@ -177,6 +178,7 @@ static TransactionStateData TopTransactionStateData = {
0, /* previous SecurityRestrictionContext */
false, /* entry-time xact r/o state */
false, /* startedInRecovery */
+ false, /* didLogXid */
NULL /* link to parent state block */
};
@@ -395,6 +397,19 @@ GetCurrentTransactionIdIfAny(void)
}
/*
+ * MarkCurrentTransactionIdLoggedIfAny
+ *
+ * Remember that the current xid - if it is assigned - now has been wal logged.
+ */
+void
+MarkCurrentTransactionIdLoggedIfAny(void)
+{
+ if (TransactionIdIsValid(CurrentTransactionState->transactionId))
+ CurrentTransactionState->didLogXid = true;
+}
+
+
+/*
* GetStableLatestTransactionId
*
* Get the transaction's XID if it has one, else read the next-to-be-assigned
@@ -435,6 +450,7 @@ AssignTransactionId(TransactionState s)
{
bool isSubXact = (s->parent != NULL);
ResourceOwner currentOwner;
+ bool log_unknown_top = false;
/* Assert that caller didn't screw up */
Assert(!TransactionIdIsValid(s->transactionId));
@@ -470,6 +486,20 @@ AssignTransactionId(TransactionState s)
}
/*
+ * When wal_level=logical, guarantee that a subtransaction's xid can only
+ * be seen in the WAL stream if its toplevel xid has been logged
+ * before. If necessary we log a xact_assignment record with fewer than
+ * PGPROC_MAX_CACHED_SUBXIDS. Note that it is fine if didLogXid isn't set
+ * for a transaction even though it appears in a WAL record, we just might
+ * superfluously log something. That can happen when an xid is included
+ * somewhere inside a wal record, but not in XLogRecord->xl_xid, like in
+ * xl_standby_locks.
+ */
+ if (isSubXact && XLogLogicalInfoActive() &&
+ !TopTransactionStateData.didLogXid)
+ log_unknown_top = true;
+
+ /*
* Generate a new Xid and record it in PG_PROC and pg_subtrans.
*
* NB: we must make the subtrans entry BEFORE the Xid appears anywhere in
@@ -523,6 +553,9 @@ AssignTransactionId(TransactionState s)
* top-level transaction that each subxact belongs to. This is correct in
* recovery only because aborted subtransactions are separately WAL
* logged.
+ *
+ * This is correct even for the case where several levels above us didn't
+ * have an xid assigned as we recursed up to them beforehand.
*/
if (isSubXact && XLogStandbyInfoActive())
{
@@ -533,7 +566,8 @@ AssignTransactionId(TransactionState s)
* ensure this test matches similar one in
* RecoverPreparedTransactions()
*/
- if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS)
+ if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS ||
+ log_unknown_top)
{
XLogRecData rdata[2];
xl_xact_assignment xlrec;
@@ -552,13 +586,15 @@ AssignTransactionId(TransactionState s)
rdata[0].next = &rdata[1];
rdata[1].data = (char *) unreportedXids;
- rdata[1].len = PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId);
+ rdata[1].len = nUnreportedXids * sizeof(TransactionId);
rdata[1].buffer = InvalidBuffer;
rdata[1].next = NULL;
(void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT, rdata);
nUnreportedXids = 0;
+ /* mark top, not current xact as having been logged */
+ TopTransactionStateData.didLogXid = true;
}
}
}
@@ -1737,6 +1773,7 @@ StartTransaction(void)
* initialize reported xid accounting
*/
nUnreportedXids = 0;
+ s->didLogXid = false;
/*
* must initialize resource-management stuff first
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index b68230d1969..6fa5479c92b 100755
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1191,6 +1191,8 @@ begin:;
*/
WALInsertSlotRelease();
+ MarkCurrentTransactionIdLoggedIfAny();
+
END_CRIT_SECTION();
/*
@@ -5961,7 +5963,7 @@ CheckRequiredParameterValues(void)
{
if (ControlFile->wal_level < WAL_LEVEL_HOT_STANDBY)
ereport(ERROR,
- (errmsg("hot standby is not possible because wal_level was not set to \"hot_standby\" on the master server"),
+ (errmsg("hot standby is not possible because wal_level was not set to \"hot_standby\" or higher on the master server"),
errhint("Either set wal_level to \"hot_standby\" on the master, or turn off hot_standby here.")));
/* We ignore autovacuum_max_workers when we make this test. */
@@ -9650,7 +9652,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("WAL level not sufficient for making an online backup"),
- errhint("wal_level must be set to \"archive\" or \"hot_standby\" at server start.")));
+ errhint("wal_level must be set to \"archive\", \"hot_standby\" or \"logical\" at server start.")));
if (strlen(backupidstr) > MAXPGPATH)
ereport(ERROR,
@@ -9988,7 +9990,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("WAL level not sufficient for making an online backup"),
- errhint("wal_level must be set to \"archive\" or \"hot_standby\" at server start.")));
+ errhint("wal_level must be set to \"archive\", \"hot_standby\" or \"logical\" at server start.")));
/*
* OK to update backup counters and forcePageWrites
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 02752406f53..aa31429b8ac 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -3321,7 +3321,7 @@ reindex_relation(Oid relid, int flags)
/* Ensure rd_indexattr is valid; see comments for RelationSetIndexList */
if (is_pg_class)
- (void) RelationGetIndexAttrBitmap(rel, false);
+ (void) RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_ALL);
PG_TRY();
{
diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c
index 0008fc633ba..4eff1845f5c 100644
--- a/src/backend/commands/trigger.c
+++ b/src/backend/commands/trigger.c
@@ -2355,7 +2355,8 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate,
* concurrency.
*/
modifiedCols = GetModifiedColumns(relinfo, estate);
- keyCols = RelationGetIndexAttrBitmap(relinfo->ri_RelationDesc, true);
+ keyCols = RelationGetIndexAttrBitmap(relinfo->ri_RelationDesc,
+ INDEX_ATTR_BITMAP_KEY);
if (bms_overlap(keyCols, modifiedCols))
lockmode = LockTupleExclusive;
else
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index d294a5a47f8..048a1894e59 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -816,10 +816,10 @@ PostmasterMain(int argc, char *argv[])
}
if (XLogArchiveMode && wal_level == WAL_LEVEL_MINIMAL)
ereport(ERROR,
- (errmsg("WAL archival (archive_mode=on) requires wal_level \"archive\" or \"hot_standby\"")));
+ (errmsg("WAL archival (archive_mode=on) requires wal_level \"archive\", \"hot_standby\" or \"logical\"")));
if (max_wal_senders > 0 && wal_level == WAL_LEVEL_MINIMAL)
ereport(ERROR,
- (errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\" or \"hot_standby\"")));
+ (errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\", \"hot_standby\" or \"logical\"")));
/*
* Other one-time internal sanity checks can go here, if they are fast.
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index d0acca871e3..03bbb9f3381 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -3818,8 +3818,9 @@ RelationGetIndexPredicate(Relation relation)
* simple index keys, but attributes used in expressions and partial-index
* predicates.)
*
- * If "keyAttrs" is true, only attributes that can be referenced by foreign
- * keys are considered.
+ * Depending on attrKind, a bitmap covering the attnums for all index columns,
+ * for all key columns or for all the columns the configured replica identity
+ * are returned.
*
* Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
* we can include system attributes (e.g., OID) in the bitmap representation.
@@ -3832,17 +3833,28 @@ RelationGetIndexPredicate(Relation relation)
* be bms_free'd when not needed anymore.
*/
Bitmapset *
-RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs)
+RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
{
- Bitmapset *indexattrs;
- Bitmapset *uindexattrs;
+ Bitmapset *indexattrs; /* indexed columns */
+ Bitmapset *uindexattrs; /* columns in unique indexes */
+ Bitmapset *idindexattrs; /* columns in the the replica identity */
List *indexoidlist;
ListCell *l;
MemoryContext oldcxt;
/* Quick exit if we already computed the result. */
if (relation->rd_indexattr != NULL)
- return bms_copy(keyAttrs ? relation->rd_keyattr : relation->rd_indexattr);
+ switch(attrKind)
+ {
+ case INDEX_ATTR_BITMAP_IDENTITY_KEY:
+ return bms_copy(relation->rd_idattr);
+ case INDEX_ATTR_BITMAP_KEY:
+ return bms_copy(relation->rd_keyattr);
+ case INDEX_ATTR_BITMAP_ALL:
+ return bms_copy(relation->rd_indexattr);
+ default:
+ elog(ERROR, "unknown attrKind %u", attrKind);
+ }
/* Fast path if definitely no indexes */
if (!RelationGetForm(relation)->relhasindex)
@@ -3869,13 +3881,16 @@ RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs)
*/
indexattrs = NULL;
uindexattrs = NULL;
+ idindexattrs = NULL;
foreach(l, indexoidlist)
{
Oid indexOid = lfirst_oid(l);
Relation indexDesc;
IndexInfo *indexInfo;
int i;
- bool isKey;
+ bool isKey; /* candidate key */
+ bool isIDKey; /* replica identity index */
+
indexDesc = index_open(indexOid, AccessShareLock);
@@ -3887,6 +3902,9 @@ RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs)
indexInfo->ii_Expressions == NIL &&
indexInfo->ii_Predicate == NIL;
+ /* Is this index the configured (or default) replica identity? */
+ isIDKey = indexOid == relation->rd_replidindex;
+
/* Collect simple attribute references */
for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
{
@@ -3896,6 +3914,11 @@ RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs)
{
indexattrs = bms_add_member(indexattrs,
attrnum - FirstLowInvalidHeapAttributeNumber);
+
+ if (isIDKey)
+ idindexattrs = bms_add_member(idindexattrs,
+ attrnum - FirstLowInvalidHeapAttributeNumber);
+
if (isKey)
uindexattrs = bms_add_member(uindexattrs,
attrnum - FirstLowInvalidHeapAttributeNumber);
@@ -3917,10 +3940,21 @@ RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs)
oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
relation->rd_indexattr = bms_copy(indexattrs);
relation->rd_keyattr = bms_copy(uindexattrs);
+ relation->rd_idattr = bms_copy(idindexattrs);
MemoryContextSwitchTo(oldcxt);
/* We return our original working copy for caller to play with */
- return keyAttrs ? uindexattrs : indexattrs;
+ switch(attrKind)
+ {
+ case INDEX_ATTR_BITMAP_IDENTITY_KEY:
+ return idindexattrs;
+ case INDEX_ATTR_BITMAP_KEY:
+ return uindexattrs;
+ case INDEX_ATTR_BITMAP_ALL:
+ return indexattrs;
+ default:
+ elog(ERROR, "unknown attrKind %u", attrKind);
+ }
}
/*
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 983cae7fda2..6096fe44457 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -170,7 +170,7 @@
# - Settings -
-#wal_level = minimal # minimal, archive, or hot_standby
+#wal_level = minimal # minimal, archive, hot_standby or logical
# (change requires restart)
#fsync = on # turns forced synchronization on or off
#synchronous_commit = on # synchronization level;
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index fde483a616a..8c6cf24d237 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -77,6 +77,8 @@ wal_level_str(WalLevel wal_level)
return "archive";
case WAL_LEVEL_HOT_STANDBY:
return "hot_standby";
+ case WAL_LEVEL_LOGICAL:
+ return "logical";
}
return _("unrecognized wal_level");
}
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 63b73d03291..438e79db48e 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -55,6 +55,22 @@
#define XLOG_HEAP2_VISIBLE 0x40
#define XLOG_HEAP2_MULTI_INSERT 0x50
#define XLOG_HEAP2_LOCK_UPDATED 0x60
+#define XLOG_HEAP2_NEW_CID 0x70
+
+/*
+ * xl_heap_* ->flag values, 8 bits are available.
+ */
+/* PD_ALL_VISIBLE was cleared */
+#define XLOG_HEAP_ALL_VISIBLE_CLEARED (1<<0)
+/* PD_ALL_VISIBLE was cleared in the 2nd page */
+#define XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED (1<<1)
+#define XLOG_HEAP_CONTAINS_OLD_TUPLE (1<<2)
+#define XLOG_HEAP_CONTAINS_OLD_KEY (1<<3)
+#define XLOG_HEAP_CONTAINS_NEW_TUPLE (1<<4)
+
+/* convenience macro for checking whether any form of old tuple was logged */
+#define XLOG_HEAP_CONTAINS_OLD \
+ (XLOG_HEAP_CONTAINS_OLD_TUPLE | XLOG_HEAP_CONTAINS_OLD_KEY)
/*
* All what we need to find changed tuple
@@ -78,10 +94,10 @@ typedef struct xl_heap_delete
xl_heaptid target; /* deleted tuple id */
TransactionId xmax; /* xmax of the deleted tuple */
uint8 infobits_set; /* infomask bits */
- bool all_visible_cleared; /* PD_ALL_VISIBLE was cleared */
+ uint8 flags;
} xl_heap_delete;
-#define SizeOfHeapDelete (offsetof(xl_heap_delete, all_visible_cleared) + sizeof(bool))
+#define SizeOfHeapDelete (offsetof(xl_heap_delete, flags) + sizeof(uint8))
/*
* We don't store the whole fixed part (HeapTupleHeaderData) of an inserted
@@ -100,15 +116,29 @@ typedef struct xl_heap_header
#define SizeOfHeapHeader (offsetof(xl_heap_header, t_hoff) + sizeof(uint8))
+/*
+ * Variant of xl_heap_header that contains the length of the tuple, which is
+ * useful if the length of the tuple cannot be computed using the overall
+ * record length. E.g. because there are several tuples inside a single
+ * record.
+ */
+typedef struct xl_heap_header_len
+{
+ uint16 t_len;
+ xl_heap_header header;
+} xl_heap_header_len;
+
+#define SizeOfHeapHeaderLen (offsetof(xl_heap_header_len, header) + SizeOfHeapHeader)
+
/* This is what we need to know about insert */
typedef struct xl_heap_insert
{
xl_heaptid target; /* inserted tuple id */
- bool all_visible_cleared; /* PD_ALL_VISIBLE was cleared */
+ uint8 flags;
/* xl_heap_header & TUPLE DATA FOLLOWS AT END OF STRUCT */
} xl_heap_insert;
-#define SizeOfHeapInsert (offsetof(xl_heap_insert, all_visible_cleared) + sizeof(bool))
+#define SizeOfHeapInsert (offsetof(xl_heap_insert, flags) + sizeof(uint8))
/*
* This is what we need to know about a multi-insert. The record consists of
@@ -120,7 +150,7 @@ typedef struct xl_heap_multi_insert
{
RelFileNode node;
BlockNumber blkno;
- bool all_visible_cleared;
+ uint8 flags;
uint16 ntuples;
OffsetNumber offsets[1];
@@ -147,13 +177,12 @@ typedef struct xl_heap_update
TransactionId old_xmax; /* xmax of the old tuple */
TransactionId new_xmax; /* xmax of the new tuple */
ItemPointerData newtid; /* new inserted tuple id */
- uint8 old_infobits_set; /* infomask bits to set on old tuple */
- bool all_visible_cleared; /* PD_ALL_VISIBLE was cleared */
- bool new_all_visible_cleared; /* same for the page of newtid */
+ uint8 old_infobits_set; /* infomask bits to set on old tuple */
+ uint8 flags;
/* NEW TUPLE xl_heap_header AND TUPLE DATA FOLLOWS AT END OF STRUCT */
} xl_heap_update;
-#define SizeOfHeapUpdate (offsetof(xl_heap_update, new_all_visible_cleared) + sizeof(bool))
+#define SizeOfHeapUpdate (offsetof(xl_heap_update, flags) + sizeof(uint8))
/*
* This is what we need to know about vacuum page cleanup/redirect
@@ -263,6 +292,29 @@ typedef struct xl_heap_visible
#define SizeOfHeapVisible (offsetof(xl_heap_visible, cutoff_xid) + sizeof(TransactionId))
+typedef struct xl_heap_new_cid
+{
+ /*
+ * store toplevel xid so we don't have to merge cids from different
+ * transactions
+ */
+ TransactionId top_xid;
+ CommandId cmin;
+ CommandId cmax;
+ /*
+ * don't really need the combocid since we have the actual values
+ * right in this struct, but the padding makes it free and its
+ * useful for debugging.
+ */
+ CommandId combocid;
+ /*
+ * Store the relfilenode/ctid pair to facilitate lookups.
+ */
+ xl_heaptid target;
+} xl_heap_new_cid;
+
+#define SizeOfHeapNewCid (offsetof(xl_heap_new_cid, target) + SizeOfHeapTid)
+
extern void HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
TransactionId *latestRemovedXid);
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 1d3e7d8938a..9632378865e 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -215,6 +215,7 @@ extern TransactionId GetCurrentTransactionId(void);
extern TransactionId GetCurrentTransactionIdIfAny(void);
extern TransactionId GetStableLatestTransactionId(void);
extern SubTransactionId GetCurrentSubTransactionId(void);
+extern void MarkCurrentTransactionIdLoggedIfAny(void);
extern bool SubTransactionIsActive(SubTransactionId subxid);
extern CommandId GetCurrentCommandId(bool used);
extern TimestampTz GetCurrentTransactionStartTimestamp(void);
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 002862cca50..7415a261bbd 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -197,7 +197,8 @@ typedef enum WalLevel
{
WAL_LEVEL_MINIMAL = 0,
WAL_LEVEL_ARCHIVE,
- WAL_LEVEL_HOT_STANDBY
+ WAL_LEVEL_HOT_STANDBY,
+ WAL_LEVEL_LOGICAL
} WalLevel;
extern int wal_level;
@@ -210,9 +211,12 @@ extern int wal_level;
*/
#define XLogIsNeeded() (wal_level >= WAL_LEVEL_ARCHIVE)
-/* Do we need to WAL-log information required only for Hot Standby? */
+/* Do we need to WAL-log information required only for Hot Standby and logical replication? */
#define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_HOT_STANDBY)
+/* Do we need to WAL-log information required only for logical replication? */
+#define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL)
+
#ifdef WAL_DEBUG
extern bool XLOG_DEBUG;
#endif
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 9fba8c3db86..64ba55355b9 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -55,7 +55,7 @@ typedef struct BkpBlock
/*
* Each page of XLOG file has a header like this:
*/
-#define XLOG_PAGE_MAGIC 0xD078 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD079 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 21d5871454b..ad878cf1a22 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -104,6 +104,7 @@ typedef struct RelationData
List *rd_indexlist; /* list of OIDs of indexes on relation */
Bitmapset *rd_indexattr; /* identifies columns used in indexes */
Bitmapset *rd_keyattr; /* cols that can be ref'd by foreign keys */
+ Bitmapset *rd_idattr; /* included in replica identity index */
Oid rd_oidindex; /* OID of unique index on OID, if any */
LockInfoData rd_lockInfo; /* lock mgr's info for locking relation */
RuleLock *rd_rules; /* rewrite rules */
@@ -453,6 +454,29 @@ typedef struct StdRdOptions
*/
#define RelationIsPopulated(relation) ((relation)->rd_rel->relispopulated)
+/*
+ * RelationIsAccessibleInLogicalDecoding
+ * True if we need to log enough information to have access via
+ * decoding snapshot.
+ */
+#define RelationIsAccessibleInLogicalDecoding(relation) \
+ (XLogLogicalInfoActive() && \
+ RelationNeedsWAL(relation) && \
+ IsCatalogRelation(relation))
+
+/*
+ * RelationIsLogicallyLogged
+ * True if we need to log enough information to extract the data from the
+ * WAL stream.
+ *
+ * We don't log information for unlogged tables (since they don't WAL log
+ * anyway) and for system tables (their content is hard to make sense of, and
+ * it would complicate decoding slightly for little gain).
+ */
+#define RelationIsLogicallyLogged(relation) \
+ (XLogLogicalInfoActive() && \
+ RelationNeedsWAL(relation) && \
+ !IsCatalogRelation(relation))
/* routines in utils/cache/relcache.c */
extern void RelationIncrementReferenceCount(Relation rel);
diff --git a/src/include/utils/relcache.h b/src/include/utils/relcache.h
index 8ac2549cb3a..d7604ec113f 100644
--- a/src/include/utils/relcache.h
+++ b/src/include/utils/relcache.h
@@ -41,7 +41,17 @@ extern List *RelationGetIndexList(Relation relation);
extern Oid RelationGetOidIndex(Relation relation);
extern List *RelationGetIndexExpressions(Relation relation);
extern List *RelationGetIndexPredicate(Relation relation);
-extern Bitmapset *RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs);
+
+typedef enum IndexAttrBitmapKind
+{
+ INDEX_ATTR_BITMAP_ALL,
+ INDEX_ATTR_BITMAP_KEY,
+ INDEX_ATTR_BITMAP_IDENTITY_KEY
+} IndexAttrBitmapKind;
+
+extern Bitmapset *RelationGetIndexAttrBitmap(Relation relation,
+ IndexAttrBitmapKind keyAttrs);
+
extern void RelationGetExclusionInfo(Relation indexRelation,
Oid **operators,
Oid **procs,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index b20eb0d5ac9..c5200372aec 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -791,6 +791,7 @@ IdentifySystemCmd
IncrementVarSublevelsUp_context
Index
IndexArrayKeyInfo
+IndexAttrBitmapKind
IndexBuildCallback
IndexBuildResult
IndexBulkDeleteCallback
@@ -2419,11 +2420,13 @@ xl_heap_cleanup_info
xl_heap_delete
xl_heap_freeze
xl_heap_header
+xl_heap_header_len
xl_heap_inplace
xl_heap_insert
xl_heap_lock
xl_heap_lock_updated
xl_heap_multi_insert
+xl_heap_new_cid
xl_heap_newpage
xl_heap_update
xl_heap_visible