diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/access/heap/heapam.c | 603 | ||||
-rw-r--r-- | src/backend/access/rmgrdesc/heapdesc.c | 9 | ||||
-rw-r--r-- | src/backend/access/rmgrdesc/xlogdesc.c | 1 | ||||
-rw-r--r-- | src/backend/access/transam/twophase.c | 4 | ||||
-rw-r--r-- | src/backend/access/transam/xact.c | 41 | ||||
-rwxr-xr-x | src/backend/access/transam/xlog.c | 8 | ||||
-rw-r--r-- | src/backend/catalog/index.c | 2 | ||||
-rw-r--r-- | src/backend/commands/trigger.c | 3 | ||||
-rw-r--r-- | src/backend/postmaster/postmaster.c | 4 | ||||
-rw-r--r-- | src/backend/utils/cache/relcache.c | 50 | ||||
-rw-r--r-- | src/backend/utils/misc/postgresql.conf.sample | 2 | ||||
-rw-r--r-- | src/bin/pg_controldata/pg_controldata.c | 2 | ||||
-rw-r--r-- | src/include/access/heapam_xlog.h | 70 | ||||
-rw-r--r-- | src/include/access/xact.h | 1 | ||||
-rw-r--r-- | src/include/access/xlog.h | 8 | ||||
-rw-r--r-- | src/include/access/xlog_internal.h | 2 | ||||
-rw-r--r-- | src/include/utils/rel.h | 24 | ||||
-rw-r--r-- | src/include/utils/relcache.h | 12 | ||||
-rw-r--r-- | src/tools/pgindent/typedefs.list | 3 |
19 files changed, 714 insertions, 135 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 2035a2158f1..249fffeb061 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -85,12 +85,14 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, Buffer newbuf, HeapTuple oldtup, - HeapTuple newtup, bool all_visible_cleared, - bool new_all_visible_cleared); + HeapTuple newtup, HeapTuple old_key_tup, + bool all_visible_cleared, bool new_all_visible_cleared); static void HeapSatisfiesHOTandKeyUpdate(Relation relation, - Bitmapset *hot_attrs, Bitmapset *key_attrs, - bool *satisfies_hot, bool *satisfies_key, - HeapTuple oldtup, HeapTuple newtup); + Bitmapset *hot_attrs, + Bitmapset *key_attrs, Bitmapset *id_attrs, + bool *satisfies_hot, bool *satisfies_key, + bool *satisfies_id, + HeapTuple oldtup, HeapTuple newtup); static void compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask, uint16 old_infomask2, TransactionId add_to_xmax, LockTupleMode mode, bool is_update, @@ -108,6 +110,9 @@ static void MultiXactIdWait(MultiXactId multi, MultiXactStatus status, static bool ConditionalMultiXactIdWait(MultiXactId multi, MultiXactStatus status, int *remaining, uint16 infomask); +static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup); +static HeapTuple ExtractReplicaIdentity(Relation rel, HeapTuple tup, bool key_modified, + bool *copy); /* @@ -2103,11 +2108,24 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, xl_heap_insert xlrec; xl_heap_header xlhdr; XLogRecPtr recptr; - XLogRecData rdata[3]; + XLogRecData rdata[4]; Page page = BufferGetPage(buffer); uint8 info = XLOG_HEAP_INSERT; + bool need_tuple_data; + + /* + * For logical decoding, we need the tuple even if we're doing a + * full page write, so make sure to log it separately. (XXX We could + * alternatively store a pointer into the FPW). + * + * Also, if this is a catalog, we need to transmit combocids to + * properly decode, so log that as well. + */ + need_tuple_data = RelationIsLogicallyLogged(relation); + if (RelationIsAccessibleInLogicalDecoding(relation)) + log_heap_new_cid(relation, heaptup); - xlrec.all_visible_cleared = all_visible_cleared; + xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; xlrec.target.node = relation->rd_node; xlrec.target.tid = heaptup->t_self; rdata[0].data = (char *) &xlrec; @@ -2126,18 +2144,36 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, */ rdata[1].data = (char *) &xlhdr; rdata[1].len = SizeOfHeapHeader; - rdata[1].buffer = buffer; + rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer; rdata[1].buffer_std = true; rdata[1].next = &(rdata[2]); /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ rdata[2].data = (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits); rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits); - rdata[2].buffer = buffer; + rdata[2].buffer = need_tuple_data ? InvalidBuffer : buffer; rdata[2].buffer_std = true; rdata[2].next = NULL; /* + * Make a separate rdata entry for the tuple's buffer if we're + * doing logical decoding, so that an eventual FPW doesn't + * remove the tuple's data. + */ + if (need_tuple_data) + { + rdata[2].next = &(rdata[3]); + + rdata[3].data = NULL; + rdata[3].len = 0; + rdata[3].buffer = buffer; + rdata[3].buffer_std = true; + rdata[3].next = NULL; + + xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + } + + /* * If this is the single and first tuple on page, we can reinit the * page instead of restoring the whole thing. Set flag, and hide * buffer references from XLogInsert. @@ -2146,7 +2182,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, PageGetMaxOffsetNumber(page) == FirstOffsetNumber) { info |= XLOG_HEAP_INIT_PAGE; - rdata[1].buffer = rdata[2].buffer = InvalidBuffer; + rdata[1].buffer = rdata[2].buffer = rdata[3].buffer = InvalidBuffer; } recptr = XLogInsert(RM_HEAP_ID, info, rdata); @@ -2272,6 +2308,8 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, Page page; bool needwal; Size saveFreeSpace; + bool need_tuple_data = RelationIsLogicallyLogged(relation); + bool need_cids = RelationIsAccessibleInLogicalDecoding(relation); needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation); saveFreeSpace = RelationGetTargetPageFreeSpace(relation, @@ -2358,7 +2396,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, { XLogRecPtr recptr; xl_heap_multi_insert *xlrec; - XLogRecData rdata[2]; + XLogRecData rdata[3]; uint8 info = XLOG_HEAP2_MULTI_INSERT; char *tupledata; int totaldatalen; @@ -2388,7 +2426,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, /* the rest of the scratch space is used for tuple data */ tupledata = scratchptr; - xlrec->all_visible_cleared = all_visible_cleared; + xlrec->flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; xlrec->node = relation->rd_node; xlrec->blkno = BufferGetBlockNumber(buffer); xlrec->ntuples = nthispage; @@ -2420,6 +2458,13 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, datalen); tuphdr->datalen = datalen; scratchptr += datalen; + + /* + * We don't use heap_multi_insert for catalog tuples yet, but + * better be prepared... + */ + if (need_cids) + log_heap_new_cid(relation, heaptup); } totaldatalen = scratchptr - tupledata; Assert((scratchptr - scratch) < BLCKSZ); @@ -2431,17 +2476,34 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples, rdata[1].data = tupledata; rdata[1].len = totaldatalen; - rdata[1].buffer = buffer; + rdata[1].buffer = need_tuple_data ? InvalidBuffer : buffer; rdata[1].buffer_std = true; rdata[1].next = NULL; /* + * Make a separate rdata entry for the tuple's buffer if + * we're doing logical decoding, so that an eventual FPW + * doesn't remove the tuple's data. + */ + if (need_tuple_data) + { + rdata[1].next = &(rdata[2]); + + rdata[2].data = NULL; + rdata[2].len = 0; + rdata[2].buffer = buffer; + rdata[2].buffer_std = true; + rdata[2].next = NULL; + xlrec->flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + } + + /* * If we're going to reinitialize the whole page using the WAL * record, hide buffer reference from XLogInsert. */ if (init) { - rdata[1].buffer = InvalidBuffer; + rdata[1].buffer = rdata[2].buffer = InvalidBuffer; info |= XLOG_HEAP_INIT_PAGE; } @@ -2561,6 +2623,8 @@ heap_delete(Relation relation, ItemPointer tid, bool have_tuple_lock = false; bool iscombo; bool all_visible_cleared = false; + HeapTuple old_key_tuple = NULL; /* replica identity of the tuple */ + bool old_key_copied = false; Assert(ItemPointerIsValid(tid)); @@ -2734,6 +2798,12 @@ l1: /* replace cid with a combo cid if necessary */ HeapTupleHeaderAdjustCmax(tp.t_data, &cid, &iscombo); + /* + * Compute replica identity tuple before entering the critical section so + * we don't PANIC upon a memory allocation failure. + */ + old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied); + START_CRIT_SECTION(); /* @@ -2786,9 +2856,13 @@ l1: { xl_heap_delete xlrec; XLogRecPtr recptr; - XLogRecData rdata[2]; + XLogRecData rdata[4]; + + /* For logical decode we need combocids to properly decode the catalog */ + if (RelationIsAccessibleInLogicalDecoding(relation)) + log_heap_new_cid(relation, &tp); - xlrec.all_visible_cleared = all_visible_cleared; + xlrec.flags = all_visible_cleared ? XLOG_HEAP_ALL_VISIBLE_CLEARED : 0; xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask, tp.t_data->t_infomask2); xlrec.target.node = relation->rd_node; @@ -2805,6 +2879,37 @@ l1: rdata[1].buffer_std = true; rdata[1].next = NULL; + /* + * Log replica identity of the deleted tuple if there is one + */ + if (old_key_tuple != NULL) + { + xl_heap_header xlhdr; + + xlhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2; + xlhdr.t_infomask = old_key_tuple->t_data->t_infomask; + xlhdr.t_hoff = old_key_tuple->t_data->t_hoff; + + rdata[1].next = &(rdata[2]); + rdata[2].data = (char*)&xlhdr; + rdata[2].len = SizeOfHeapHeader; + rdata[2].buffer = InvalidBuffer; + rdata[2].next = NULL; + + rdata[2].next = &(rdata[3]); + rdata[3].data = (char *) old_key_tuple->t_data + + offsetof(HeapTupleHeaderData, t_bits); + rdata[3].len = old_key_tuple->t_len + - offsetof(HeapTupleHeaderData, t_bits); + rdata[3].buffer = InvalidBuffer; + rdata[3].next = NULL; + + if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE; + else + xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY; + } + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata); PageSetLSN(page, recptr); @@ -2850,6 +2955,9 @@ l1: pgstat_count_heap_delete(relation); + if (old_key_tuple != NULL && old_key_copied) + heap_freetuple(old_key_tuple); + return HeapTupleMayBeUpdated; } @@ -2934,9 +3042,12 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, TransactionId xid = GetCurrentTransactionId(); Bitmapset *hot_attrs; Bitmapset *key_attrs; + Bitmapset *id_attrs; ItemId lp; HeapTupleData oldtup; HeapTuple heaptup; + HeapTuple old_key_tuple = NULL; + bool old_key_copied = false; Page page; BlockNumber block; MultiXactStatus mxact_status; @@ -2952,6 +3063,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, bool iscombo; bool satisfies_hot; bool satisfies_key; + bool satisfies_id; bool use_hot_update = false; bool key_intact; bool all_visible_cleared = false; @@ -2979,8 +3091,10 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, * Note that we get a copy here, so we need not worry about relcache flush * happening midway through. */ - hot_attrs = RelationGetIndexAttrBitmap(relation, false); - key_attrs = RelationGetIndexAttrBitmap(relation, true); + hot_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_ALL); + key_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_KEY); + id_attrs = RelationGetIndexAttrBitmap(relation, + INDEX_ATTR_BITMAP_IDENTITY_KEY); block = ItemPointerGetBlockNumber(otid); buffer = ReadBuffer(relation, block); @@ -3038,9 +3152,9 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, * is updates that don't manipulate key columns, not those that * serendipitiously arrive at the same key values. */ - HeapSatisfiesHOTandKeyUpdate(relation, hot_attrs, key_attrs, + HeapSatisfiesHOTandKeyUpdate(relation, hot_attrs, key_attrs, id_attrs, &satisfies_hot, &satisfies_key, - &oldtup, newtup); + &satisfies_id, &oldtup, newtup); if (satisfies_key) { *lockmode = LockTupleNoKeyExclusive; @@ -3514,6 +3628,14 @@ l2: PageSetFull(page); } + /* + * Compute replica identity tuple before entering the critical section so + * we don't PANIC upon a memory allocation failure. + * ExtractReplicaIdentity() will return NULL if nothing needs to be + * logged. + */ + old_key_tuple = ExtractReplicaIdentity(relation, &oldtup, !satisfies_id, &old_key_copied); + /* NO EREPORT(ERROR) from here till changes are logged */ START_CRIT_SECTION(); @@ -3589,11 +3711,23 @@ l2: /* XLOG stuff */ if (RelationNeedsWAL(relation)) { - XLogRecPtr recptr = log_heap_update(relation, buffer, - newbuf, &oldtup, heaptup, - all_visible_cleared, - all_visible_cleared_new); + XLogRecPtr recptr; + /* + * For logical decoding we need combocids to properly decode the + * catalog. + */ + if (RelationIsAccessibleInLogicalDecoding(relation)) + { + log_heap_new_cid(relation, &oldtup); + log_heap_new_cid(relation, heaptup); + } + + recptr = log_heap_update(relation, buffer, + newbuf, &oldtup, heaptup, + old_key_tuple, + all_visible_cleared, + all_visible_cleared_new); if (newbuf != buffer) { PageSetLSN(BufferGetPage(newbuf), recptr); @@ -3644,6 +3778,9 @@ l2: heap_freetuple(heaptup); } + if (old_key_tuple != NULL && old_key_copied) + heap_freetuple(old_key_tuple); + bms_free(hot_attrs); bms_free(key_attrs); @@ -3731,63 +3868,72 @@ heap_tuple_attr_equals(TupleDesc tupdesc, int attrnum, /* * Check which columns are being updated. * - * This simultaneously checks conditions for HOT updates and for FOR KEY - * SHARE updates. Since much of the time they will be checking very similar - * sets of columns, and doing the same tests on them, it makes sense to - * optimize and do them together. + * This simultaneously checks conditions for HOT updates, for FOR KEY + * SHARE updates, and REPLICA IDENTITY concerns. Since much of the time they + * will be checking very similar sets of columns, and doing the same tests on + * them, it makes sense to optimize and do them together. * - * We receive two bitmapsets comprising the two sets of columns we're + * We receive three bitmapsets comprising the three sets of columns we're * interested in. Note these are destructively modified; that is OK since * this is invoked at most once in heap_update. * * hot_result is set to TRUE if it's okay to do a HOT update (i.e. it does not * modified indexed columns); key_result is set to TRUE if the update does not - * modify columns used in the key. + * modify columns used in the key; id_result is set to TRUE if the update does + * not modify columns in any index marked as the REPLICA IDENTITY. */ static void -HeapSatisfiesHOTandKeyUpdate(Relation relation, - Bitmapset *hot_attrs, Bitmapset *key_attrs, +HeapSatisfiesHOTandKeyUpdate(Relation relation, Bitmapset *hot_attrs, + Bitmapset *key_attrs, Bitmapset *id_attrs, bool *satisfies_hot, bool *satisfies_key, + bool *satisfies_id, HeapTuple oldtup, HeapTuple newtup) { int next_hot_attnum; int next_key_attnum; + int next_id_attnum; bool hot_result = true; bool key_result = true; - bool key_done = false; - bool hot_done = false; + bool id_result = true; - next_hot_attnum = bms_first_member(hot_attrs); - if (next_hot_attnum == -1) - hot_done = true; - else - /* Adjust for system attributes */ - next_hot_attnum += FirstLowInvalidHeapAttributeNumber; + /* If REPLICA IDENTITY is set to FULL, id_attrs will be empty. */ + Assert(bms_is_subset(id_attrs, key_attrs)); + Assert(bms_is_subset(key_attrs, hot_attrs)); + /* + * If one of these sets contains no remaining bits, bms_first_member will + * return -1, and after adding FirstLowInvalidHeapAttributeNumber (which + * is negative!) we'll get an attribute number that can't possibly be + * real, and thus won't match any actual attribute number. + */ + next_hot_attnum = bms_first_member(hot_attrs); + next_hot_attnum += FirstLowInvalidHeapAttributeNumber; next_key_attnum = bms_first_member(key_attrs); - if (next_key_attnum == -1) - key_done = true; - else - /* Adjust for system attributes */ - next_key_attnum += FirstLowInvalidHeapAttributeNumber; + next_key_attnum += FirstLowInvalidHeapAttributeNumber; + next_id_attnum = bms_first_member(id_attrs); + next_id_attnum += FirstLowInvalidHeapAttributeNumber; for (;;) { - int check_now; bool changed; + int check_now; - /* both bitmapsets are now empty */ - if (key_done && hot_done) - break; - - /* XXX there's probably an easier way ... */ - if (hot_done) - check_now = next_key_attnum; - if (key_done) + /* + * Since the HOT attributes are a superset of the key attributes and + * the key attributes are a superset of the id attributes, this logic + * is guaranteed to identify the next column that needs to be + * checked. + */ + if (hot_result && next_hot_attnum > FirstLowInvalidHeapAttributeNumber) check_now = next_hot_attnum; + else if (key_result && next_key_attnum > FirstLowInvalidHeapAttributeNumber) + check_now = next_key_attnum; + else if (id_result && next_id_attnum > FirstLowInvalidHeapAttributeNumber) + check_now = next_id_attnum; else - check_now = Min(next_hot_attnum, next_key_attnum); + break; + /* See whether it changed. */ changed = !heap_tuple_attr_equals(RelationGetDescr(relation), check_now, oldtup, newtup); if (changed) @@ -3796,34 +3942,42 @@ HeapSatisfiesHOTandKeyUpdate(Relation relation, hot_result = false; if (check_now == next_key_attnum) key_result = false; - } + if (check_now == next_id_attnum) + id_result = false; - /* if both are false now, we can stop checking */ - if (!hot_result && !key_result) - break; + /* if all are false now, we can stop checking */ + if (!hot_result && !key_result && !id_result) + break; + } - if (check_now == next_hot_attnum) + /* + * Advance the next attribute numbers for the sets that contain + * the attribute we just checked. As we work our way through the + * columns, the next_attnum values will rise; but when each set + * becomes empty, bms_first_member() will return -1 and the attribute + * number will end up with a value less than + * FirstLowInvalidHeapAttributeNumber. + */ + if (hot_result && check_now == next_hot_attnum) { next_hot_attnum = bms_first_member(hot_attrs); - if (next_hot_attnum == -1) - hot_done = true; - else - /* Adjust for system attributes */ - next_hot_attnum += FirstLowInvalidHeapAttributeNumber; + next_hot_attnum += FirstLowInvalidHeapAttributeNumber; } - if (check_now == next_key_attnum) + if (key_result && check_now == next_key_attnum) { next_key_attnum = bms_first_member(key_attrs); - if (next_key_attnum == -1) - key_done = true; - else - /* Adjust for system attributes */ - next_key_attnum += FirstLowInvalidHeapAttributeNumber; + next_key_attnum += FirstLowInvalidHeapAttributeNumber; + } + if (id_result && check_now == next_id_attnum) + { + next_id_attnum = bms_first_member(id_attrs); + next_id_attnum += FirstLowInvalidHeapAttributeNumber; } } *satisfies_hot = hot_result; *satisfies_key = key_result; + *satisfies_id = id_result; } /* @@ -6140,14 +6294,17 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer, static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, Buffer newbuf, HeapTuple oldtup, HeapTuple newtup, + HeapTuple old_key_tuple, bool all_visible_cleared, bool new_all_visible_cleared) { xl_heap_update xlrec; - xl_heap_header xlhdr; + xl_heap_header_len xlhdr; + xl_heap_header_len xlhdr_idx; uint8 info; XLogRecPtr recptr; - XLogRecData rdata[4]; + XLogRecData rdata[7]; Page page = BufferGetPage(newbuf); + bool need_tuple_data = RelationIsLogicallyLogged(reln); /* Caller should not call me on a non-WAL-logged relation */ Assert(RelationNeedsWAL(reln)); @@ -6163,9 +6320,12 @@ log_heap_update(Relation reln, Buffer oldbuf, xlrec.old_infobits_set = compute_infobits(oldtup->t_data->t_infomask, oldtup->t_data->t_infomask2); xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data); - xlrec.all_visible_cleared = all_visible_cleared; + xlrec.flags = 0; + if (all_visible_cleared) + xlrec.flags |= XLOG_HEAP_ALL_VISIBLE_CLEARED; xlrec.newtid = newtup->t_self; - xlrec.new_all_visible_cleared = new_all_visible_cleared; + if (new_all_visible_cleared) + xlrec.flags |= XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED; rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfHeapUpdate; @@ -6178,33 +6338,86 @@ log_heap_update(Relation reln, Buffer oldbuf, rdata[1].buffer_std = true; rdata[1].next = &(rdata[2]); - xlhdr.t_infomask2 = newtup->t_data->t_infomask2; - xlhdr.t_infomask = newtup->t_data->t_infomask; - xlhdr.t_hoff = newtup->t_data->t_hoff; + xlhdr.header.t_infomask2 = newtup->t_data->t_infomask2; + xlhdr.header.t_infomask = newtup->t_data->t_infomask; + xlhdr.header.t_hoff = newtup->t_data->t_hoff; + xlhdr.t_len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits); /* - * As with insert records, we need not store the rdata[2] segment if we - * decide to store the whole buffer instead. + * As with insert records, we need not store the rdata[2] segment + * if we decide to store the whole buffer instead unless we're + * doing logical decoding. */ rdata[2].data = (char *) &xlhdr; - rdata[2].len = SizeOfHeapHeader; - rdata[2].buffer = newbuf; + rdata[2].len = SizeOfHeapHeaderLen; + rdata[2].buffer = need_tuple_data ? InvalidBuffer : newbuf; rdata[2].buffer_std = true; rdata[2].next = &(rdata[3]); /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ - rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits); + rdata[3].data = (char *) newtup->t_data + + offsetof(HeapTupleHeaderData, t_bits); rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits); - rdata[3].buffer = newbuf; + rdata[3].buffer = need_tuple_data ? InvalidBuffer : newbuf; rdata[3].buffer_std = true; rdata[3].next = NULL; + /* + * Separate storage for the FPW buffer reference of the new page in the + * wal_level >= logical case. + */ + if (need_tuple_data) + { + rdata[3].next = &(rdata[4]); + + rdata[4].data = NULL, + rdata[4].len = 0; + rdata[4].buffer = newbuf; + rdata[4].buffer_std = true; + rdata[4].next = NULL; + xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; + + /* We need to log a tuple identity */ + if (old_key_tuple) + { + /* don't really need this, but its more comfy to decode */ + xlhdr_idx.header.t_infomask2 = old_key_tuple->t_data->t_infomask2; + xlhdr_idx.header.t_infomask = old_key_tuple->t_data->t_infomask; + xlhdr_idx.header.t_hoff = old_key_tuple->t_data->t_hoff; + xlhdr_idx.t_len = old_key_tuple->t_len; + + rdata[4].next = &(rdata[5]); + rdata[5].data = (char *) &xlhdr_idx; + rdata[5].len = SizeOfHeapHeaderLen; + rdata[5].buffer = InvalidBuffer; + rdata[5].next = &(rdata[6]); + + /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ + rdata[6].data = (char *) old_key_tuple->t_data + + offsetof(HeapTupleHeaderData, t_bits); + rdata[6].len = old_key_tuple->t_len + - offsetof(HeapTupleHeaderData, t_bits); + rdata[6].buffer = InvalidBuffer; + rdata[6].next = NULL; + + if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE; + else + xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_KEY; + } + } + /* If new tuple is the single and first tuple on page... */ if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber && PageGetMaxOffsetNumber(page) == FirstOffsetNumber) { + XLogRecData *rcur = &rdata[2]; info |= XLOG_HEAP_INIT_PAGE; - rdata[2].buffer = rdata[3].buffer = InvalidBuffer; + while (rcur != NULL) + { + rcur->buffer = InvalidBuffer; + rcur = rcur->next; + } } recptr = XLogInsert(RM_HEAP_ID, info, rdata); @@ -6340,6 +6553,184 @@ log_newpage_buffer(Buffer buffer, bool page_std) } /* + * Perform XLogInsert of a XLOG_HEAP2_NEW_CID record + * + * This is only used in wal_level >= WAL_LEVEL_LOGICAL, and only for catalog + * tuples. + */ +static XLogRecPtr +log_heap_new_cid(Relation relation, HeapTuple tup) +{ + xl_heap_new_cid xlrec; + + XLogRecPtr recptr; + XLogRecData rdata[1]; + HeapTupleHeader hdr = tup->t_data; + + Assert(ItemPointerIsValid(&tup->t_self)); + Assert(tup->t_tableOid != InvalidOid); + + xlrec.top_xid = GetTopTransactionId(); + xlrec.target.node = relation->rd_node; + xlrec.target.tid = tup->t_self; + + /* + * If the tuple got inserted & deleted in the same TX we definitely have a + * combocid, set cmin and cmax. + */ + if (hdr->t_infomask & HEAP_COMBOCID) + { + Assert(!(hdr->t_infomask & HEAP_XMAX_INVALID)); + Assert(!(hdr->t_infomask & HEAP_XMIN_INVALID)); + xlrec.cmin = HeapTupleHeaderGetCmin(hdr); + xlrec.cmax = HeapTupleHeaderGetCmax(hdr); + xlrec.combocid = HeapTupleHeaderGetRawCommandId(hdr); + } + /* No combocid, so only cmin or cmax can be set by this TX */ + else + { + /* + * Tuple inserted. + * + * We need to check for LOCK ONLY because multixacts might be + * transferred to the new tuple in case of FOR KEY SHARE updates in + * which case there will be a xmax, although the tuple just got + * inserted. + */ + if (hdr->t_infomask & HEAP_XMAX_INVALID || + HEAP_XMAX_IS_LOCKED_ONLY(hdr->t_infomask)) + { + xlrec.cmin = HeapTupleHeaderGetRawCommandId(hdr); + xlrec.cmax = InvalidCommandId; + } + /* Tuple from a different tx updated or deleted. */ + else + { + xlrec.cmin = InvalidCommandId; + xlrec.cmax = HeapTupleHeaderGetRawCommandId(hdr); + + } + xlrec.combocid = InvalidCommandId; + } + + rdata[0].data = (char *) &xlrec; + rdata[0].len = SizeOfHeapNewCid; + rdata[0].buffer = InvalidBuffer; + rdata[0].next = NULL; + + recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID, rdata); + + return recptr; +} + +/* + * Build a heap tuple representing the configured REPLICA IDENTITY to represent + * the old tuple in a UPDATE or DELETE. + * + * Returns NULL if there's no need to log a identity or if there's no suitable + * key in the Relation relation. + */ +static HeapTuple +ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_changed, bool *copy) +{ + TupleDesc desc = RelationGetDescr(relation); + Relation idx_rel; + TupleDesc idx_desc; + char replident = relation->rd_rel->relreplident; + HeapTuple key_tuple = NULL; + bool copy_oid = false; + bool nulls[MaxHeapAttributeNumber]; + Datum values[MaxHeapAttributeNumber]; + int natt; + + *copy = false; + + if (!RelationIsLogicallyLogged(relation)) + return NULL; + + if (replident == REPLICA_IDENTITY_NOTHING) + return NULL; + + if (replident == REPLICA_IDENTITY_FULL) + { + /* + * When logging the entire old tuple, it very well could contain + * toasted columns. If so, force them to be inlined. + */ + if (HeapTupleHasExternal(tp)) + { + *copy = true; + tp = toast_flatten_tuple(tp, RelationGetDescr(relation)); + } + return tp; + } + + /* if the key hasn't changed and we're only logging the key, we're done */ + if (!key_changed) + return NULL; + + /* needs to already have been fetched? */ + if (relation->rd_indexvalid == 0) + RelationGetIndexList(relation); + + if (!OidIsValid(relation->rd_replidindex)) + { + elog(DEBUG4, "Could not find configured replica identity for table \"%s\"", + RelationGetRelationName(relation)); + return NULL; + } + + idx_rel = RelationIdGetRelation(relation->rd_replidindex); + idx_desc = RelationGetDescr(idx_rel); + + /* deform tuple, so we have fast access to columns */ + heap_deform_tuple(tp, desc, values, nulls); + + /* set all columns to NULL, regardless of whether they actually are */ + memset(nulls, 1, sizeof(nulls)); + + /* + * Now set all columns contained in the index to NOT NULL, they cannot + * currently be NULL. + */ + for (natt = 0; natt < idx_desc->natts; natt++) + { + int attno = idx_rel->rd_index->indkey.values[natt]; + + if (attno == ObjectIdAttributeNumber) + copy_oid = true; + else if (attno < 0) + elog(ERROR, "system column in index"); + else + nulls[attno - 1] = false; + } + + key_tuple = heap_form_tuple(desc, values, nulls); + *copy = true; + RelationClose(idx_rel); + + /* XXX: we could also do this unconditionally, the space is used anyway */ + if (copy_oid) + HeapTupleSetOid(key_tuple, HeapTupleGetOid(tp)); + + /* + * If the tuple, which by here only contains indexed columns, still has + * toasted columns, force them to be inlined. This is somewhat unlikely + * since there's limits on the size of indexed columns, so we don't + * duplicate toast_flatten_tuple()s functionality in the above loop over + * the indexed columns, even if it would be more efficient. + */ + if (HeapTupleHasExternal(key_tuple)) + { + HeapTuple oldtup = key_tuple; + key_tuple = toast_flatten_tuple(oldtup, RelationGetDescr(relation)); + heap_freetuple(oldtup); + } + + return key_tuple; +} + +/* * Handles CLEANUP_INFO */ static void @@ -6714,7 +7105,7 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); Buffer vmbuffer = InvalidBuffer; @@ -6763,7 +7154,7 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) /* Mark the page as a candidate for pruning */ PageSetPrunable(page, record->xl_xid); - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); /* Make sure there is no forward chain link in t_ctid */ @@ -6797,7 +7188,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); Buffer vmbuffer = InvalidBuffer; @@ -6868,7 +7259,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) PageSetLSN(page, lsn); - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); MarkBufferDirty(buffer); @@ -6931,7 +7322,7 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->node); Buffer vmbuffer = InvalidBuffer; @@ -7014,7 +7405,7 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record) PageSetLSN(page, lsn); - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); MarkBufferDirty(buffer); @@ -7053,7 +7444,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) HeapTupleHeaderData hdr; char data[MaxHeapTupleSize]; } tbuf; - xl_heap_header xlhdr; + xl_heap_header_len xlhdr; int hsize; uint32 newlen; Size freespace; @@ -7062,7 +7453,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); BlockNumber block = ItemPointerGetBlockNumber(&xlrec->target.tid); @@ -7140,7 +7531,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) /* Mark the page as a candidate for pruning */ PageSetPrunable(page, record->xl_xid); - if (xlrec->all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); /* @@ -7164,7 +7555,7 @@ newt:; * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ - if (xlrec->new_all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); BlockNumber block = ItemPointerGetBlockNumber(&xlrec->newtid); @@ -7222,13 +7613,13 @@ newsame:; if (PageGetMaxOffsetNumber(page) + 1 < offnum) elog(PANIC, "heap_update_redo: invalid max offset number"); - hsize = SizeOfHeapUpdate + SizeOfHeapHeader; + hsize = SizeOfHeapUpdate + SizeOfHeapHeaderLen; - newlen = record->xl_len - hsize; - Assert(newlen <= MaxHeapTupleSize); memcpy((char *) &xlhdr, (char *) xlrec + SizeOfHeapUpdate, - SizeOfHeapHeader); + SizeOfHeapHeaderLen); + newlen = xlhdr.t_len; + Assert(newlen <= MaxHeapTupleSize); htup = &tbuf.hdr; MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData)); /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ @@ -7236,9 +7627,9 @@ newsame:; (char *) xlrec + hsize, newlen); newlen += offsetof(HeapTupleHeaderData, t_bits); - htup->t_infomask2 = xlhdr.t_infomask2; - htup->t_infomask = xlhdr.t_infomask; - htup->t_hoff = xlhdr.t_hoff; + htup->t_infomask2 = xlhdr.header.t_infomask2; + htup->t_infomask = xlhdr.header.t_infomask; + htup->t_hoff = xlhdr.header.t_hoff; HeapTupleHeaderSetXmin(htup, record->xl_xid); HeapTupleHeaderSetCmin(htup, FirstCommandId); @@ -7250,7 +7641,7 @@ newsame:; if (offnum == InvalidOffsetNumber) elog(PANIC, "heap_update_redo: failed to add tuple"); - if (xlrec->new_all_visible_cleared) + if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ @@ -7501,6 +7892,12 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record) case XLOG_HEAP2_LOCK_UPDATED: heap_xlog_lock_updated(lsn, record); break; + case XLOG_HEAP2_NEW_CID: + /* + * Nothing to do on a real replay, only used during logical + * decoding. + */ + break; default: elog(PANIC, "heap2_redo: unknown op code %u", info); } diff --git a/src/backend/access/rmgrdesc/heapdesc.c b/src/backend/access/rmgrdesc/heapdesc.c index e14c0539105..39c53d0022d 100644 --- a/src/backend/access/rmgrdesc/heapdesc.c +++ b/src/backend/access/rmgrdesc/heapdesc.c @@ -184,6 +184,15 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec) xlrec->infobits_set); out_target(buf, &(xlrec->target)); } + else if (info == XLOG_HEAP2_NEW_CID) + { + xl_heap_new_cid *xlrec = (xl_heap_new_cid *) rec; + + appendStringInfo(buf, "new_cid: "); + out_target(buf, &(xlrec->target)); + appendStringInfo(buf, "; cmin: %u, cmax: %u, combo: %u", + xlrec->cmin, xlrec->cmax, xlrec->combocid); + } else appendStringInfoString(buf, "UNKNOWN"); } diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c index 1d70494233e..dd67b1f643f 100644 --- a/src/backend/access/rmgrdesc/xlogdesc.c +++ b/src/backend/access/rmgrdesc/xlogdesc.c @@ -28,6 +28,7 @@ const struct config_enum_entry wal_level_options[] = { {"minimal", WAL_LEVEL_MINIMAL, false}, {"archive", WAL_LEVEL_ARCHIVE, false}, {"hot_standby", WAL_LEVEL_HOT_STANDBY, false}, + {"logical", WAL_LEVEL_LOGICAL, false}, {NULL, 0, false} }; diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index e975f8d26d0..48203d2c81e 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -47,6 +47,7 @@ #include "access/twophase.h" #include "access/twophase_rmgr.h" #include "access/xact.h" +#include "access/xlog.h" #include "access/xlogutils.h" #include "catalog/pg_type.h" #include "catalog/storage.h" @@ -1920,7 +1921,8 @@ RecoverPreparedTransactions(void) * the prepared transaction generated xid assignment records. Test * here must match one used in AssignTransactionId(). */ - if (InHotStandby && hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS) + if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS || + XLogLogicalInfoActive())) overwriteOK = true; /* diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b467b5c89d1..ad0d19c4a90 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -148,6 +148,7 @@ typedef struct TransactionStateData int prevSecContext; /* previous SecurityRestrictionContext */ bool prevXactReadOnly; /* entry-time xact r/o state */ bool startedInRecovery; /* did we start in recovery? */ + bool didLogXid; /* has xid been included in WAL record? */ struct TransactionStateData *parent; /* back link to parent */ } TransactionStateData; @@ -177,6 +178,7 @@ static TransactionStateData TopTransactionStateData = { 0, /* previous SecurityRestrictionContext */ false, /* entry-time xact r/o state */ false, /* startedInRecovery */ + false, /* didLogXid */ NULL /* link to parent state block */ }; @@ -395,6 +397,19 @@ GetCurrentTransactionIdIfAny(void) } /* + * MarkCurrentTransactionIdLoggedIfAny + * + * Remember that the current xid - if it is assigned - now has been wal logged. + */ +void +MarkCurrentTransactionIdLoggedIfAny(void) +{ + if (TransactionIdIsValid(CurrentTransactionState->transactionId)) + CurrentTransactionState->didLogXid = true; +} + + +/* * GetStableLatestTransactionId * * Get the transaction's XID if it has one, else read the next-to-be-assigned @@ -435,6 +450,7 @@ AssignTransactionId(TransactionState s) { bool isSubXact = (s->parent != NULL); ResourceOwner currentOwner; + bool log_unknown_top = false; /* Assert that caller didn't screw up */ Assert(!TransactionIdIsValid(s->transactionId)); @@ -470,6 +486,20 @@ AssignTransactionId(TransactionState s) } /* + * When wal_level=logical, guarantee that a subtransaction's xid can only + * be seen in the WAL stream if its toplevel xid has been logged + * before. If necessary we log a xact_assignment record with fewer than + * PGPROC_MAX_CACHED_SUBXIDS. Note that it is fine if didLogXid isn't set + * for a transaction even though it appears in a WAL record, we just might + * superfluously log something. That can happen when an xid is included + * somewhere inside a wal record, but not in XLogRecord->xl_xid, like in + * xl_standby_locks. + */ + if (isSubXact && XLogLogicalInfoActive() && + !TopTransactionStateData.didLogXid) + log_unknown_top = true; + + /* * Generate a new Xid and record it in PG_PROC and pg_subtrans. * * NB: we must make the subtrans entry BEFORE the Xid appears anywhere in @@ -523,6 +553,9 @@ AssignTransactionId(TransactionState s) * top-level transaction that each subxact belongs to. This is correct in * recovery only because aborted subtransactions are separately WAL * logged. + * + * This is correct even for the case where several levels above us didn't + * have an xid assigned as we recursed up to them beforehand. */ if (isSubXact && XLogStandbyInfoActive()) { @@ -533,7 +566,8 @@ AssignTransactionId(TransactionState s) * ensure this test matches similar one in * RecoverPreparedTransactions() */ - if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS) + if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS || + log_unknown_top) { XLogRecData rdata[2]; xl_xact_assignment xlrec; @@ -552,13 +586,15 @@ AssignTransactionId(TransactionState s) rdata[0].next = &rdata[1]; rdata[1].data = (char *) unreportedXids; - rdata[1].len = PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId); + rdata[1].len = nUnreportedXids * sizeof(TransactionId); rdata[1].buffer = InvalidBuffer; rdata[1].next = NULL; (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT, rdata); nUnreportedXids = 0; + /* mark top, not current xact as having been logged */ + TopTransactionStateData.didLogXid = true; } } } @@ -1737,6 +1773,7 @@ StartTransaction(void) * initialize reported xid accounting */ nUnreportedXids = 0; + s->didLogXid = false; /* * must initialize resource-management stuff first diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index b68230d1969..6fa5479c92b 100755 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1191,6 +1191,8 @@ begin:; */ WALInsertSlotRelease(); + MarkCurrentTransactionIdLoggedIfAny(); + END_CRIT_SECTION(); /* @@ -5961,7 +5963,7 @@ CheckRequiredParameterValues(void) { if (ControlFile->wal_level < WAL_LEVEL_HOT_STANDBY) ereport(ERROR, - (errmsg("hot standby is not possible because wal_level was not set to \"hot_standby\" on the master server"), + (errmsg("hot standby is not possible because wal_level was not set to \"hot_standby\" or higher on the master server"), errhint("Either set wal_level to \"hot_standby\" on the master, or turn off hot_standby here."))); /* We ignore autovacuum_max_workers when we make this test. */ @@ -9650,7 +9652,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("WAL level not sufficient for making an online backup"), - errhint("wal_level must be set to \"archive\" or \"hot_standby\" at server start."))); + errhint("wal_level must be set to \"archive\", \"hot_standby\" or \"logical\" at server start."))); if (strlen(backupidstr) > MAXPGPATH) ereport(ERROR, @@ -9988,7 +9990,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("WAL level not sufficient for making an online backup"), - errhint("wal_level must be set to \"archive\" or \"hot_standby\" at server start."))); + errhint("wal_level must be set to \"archive\", \"hot_standby\" or \"logical\" at server start."))); /* * OK to update backup counters and forcePageWrites diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 02752406f53..aa31429b8ac 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -3321,7 +3321,7 @@ reindex_relation(Oid relid, int flags) /* Ensure rd_indexattr is valid; see comments for RelationSetIndexList */ if (is_pg_class) - (void) RelationGetIndexAttrBitmap(rel, false); + (void) RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_ALL); PG_TRY(); { diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index 0008fc633ba..4eff1845f5c 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -2355,7 +2355,8 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, * concurrency. */ modifiedCols = GetModifiedColumns(relinfo, estate); - keyCols = RelationGetIndexAttrBitmap(relinfo->ri_RelationDesc, true); + keyCols = RelationGetIndexAttrBitmap(relinfo->ri_RelationDesc, + INDEX_ATTR_BITMAP_KEY); if (bms_overlap(keyCols, modifiedCols)) lockmode = LockTupleExclusive; else diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index d294a5a47f8..048a1894e59 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -816,10 +816,10 @@ PostmasterMain(int argc, char *argv[]) } if (XLogArchiveMode && wal_level == WAL_LEVEL_MINIMAL) ereport(ERROR, - (errmsg("WAL archival (archive_mode=on) requires wal_level \"archive\" or \"hot_standby\""))); + (errmsg("WAL archival (archive_mode=on) requires wal_level \"archive\", \"hot_standby\" or \"logical\""))); if (max_wal_senders > 0 && wal_level == WAL_LEVEL_MINIMAL) ereport(ERROR, - (errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\" or \"hot_standby\""))); + (errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\", \"hot_standby\" or \"logical\""))); /* * Other one-time internal sanity checks can go here, if they are fast. diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index d0acca871e3..03bbb9f3381 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -3818,8 +3818,9 @@ RelationGetIndexPredicate(Relation relation) * simple index keys, but attributes used in expressions and partial-index * predicates.) * - * If "keyAttrs" is true, only attributes that can be referenced by foreign - * keys are considered. + * Depending on attrKind, a bitmap covering the attnums for all index columns, + * for all key columns or for all the columns the configured replica identity + * are returned. * * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that * we can include system attributes (e.g., OID) in the bitmap representation. @@ -3832,17 +3833,28 @@ RelationGetIndexPredicate(Relation relation) * be bms_free'd when not needed anymore. */ Bitmapset * -RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs) +RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind) { - Bitmapset *indexattrs; - Bitmapset *uindexattrs; + Bitmapset *indexattrs; /* indexed columns */ + Bitmapset *uindexattrs; /* columns in unique indexes */ + Bitmapset *idindexattrs; /* columns in the the replica identity */ List *indexoidlist; ListCell *l; MemoryContext oldcxt; /* Quick exit if we already computed the result. */ if (relation->rd_indexattr != NULL) - return bms_copy(keyAttrs ? relation->rd_keyattr : relation->rd_indexattr); + switch(attrKind) + { + case INDEX_ATTR_BITMAP_IDENTITY_KEY: + return bms_copy(relation->rd_idattr); + case INDEX_ATTR_BITMAP_KEY: + return bms_copy(relation->rd_keyattr); + case INDEX_ATTR_BITMAP_ALL: + return bms_copy(relation->rd_indexattr); + default: + elog(ERROR, "unknown attrKind %u", attrKind); + } /* Fast path if definitely no indexes */ if (!RelationGetForm(relation)->relhasindex) @@ -3869,13 +3881,16 @@ RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs) */ indexattrs = NULL; uindexattrs = NULL; + idindexattrs = NULL; foreach(l, indexoidlist) { Oid indexOid = lfirst_oid(l); Relation indexDesc; IndexInfo *indexInfo; int i; - bool isKey; + bool isKey; /* candidate key */ + bool isIDKey; /* replica identity index */ + indexDesc = index_open(indexOid, AccessShareLock); @@ -3887,6 +3902,9 @@ RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs) indexInfo->ii_Expressions == NIL && indexInfo->ii_Predicate == NIL; + /* Is this index the configured (or default) replica identity? */ + isIDKey = indexOid == relation->rd_replidindex; + /* Collect simple attribute references */ for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++) { @@ -3896,6 +3914,11 @@ RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs) { indexattrs = bms_add_member(indexattrs, attrnum - FirstLowInvalidHeapAttributeNumber); + + if (isIDKey) + idindexattrs = bms_add_member(idindexattrs, + attrnum - FirstLowInvalidHeapAttributeNumber); + if (isKey) uindexattrs = bms_add_member(uindexattrs, attrnum - FirstLowInvalidHeapAttributeNumber); @@ -3917,10 +3940,21 @@ RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs) oldcxt = MemoryContextSwitchTo(CacheMemoryContext); relation->rd_indexattr = bms_copy(indexattrs); relation->rd_keyattr = bms_copy(uindexattrs); + relation->rd_idattr = bms_copy(idindexattrs); MemoryContextSwitchTo(oldcxt); /* We return our original working copy for caller to play with */ - return keyAttrs ? uindexattrs : indexattrs; + switch(attrKind) + { + case INDEX_ATTR_BITMAP_IDENTITY_KEY: + return idindexattrs; + case INDEX_ATTR_BITMAP_KEY: + return uindexattrs; + case INDEX_ATTR_BITMAP_ALL: + return indexattrs; + default: + elog(ERROR, "unknown attrKind %u", attrKind); + } } /* diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 983cae7fda2..6096fe44457 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -170,7 +170,7 @@ # - Settings - -#wal_level = minimal # minimal, archive, or hot_standby +#wal_level = minimal # minimal, archive, hot_standby or logical # (change requires restart) #fsync = on # turns forced synchronization on or off #synchronous_commit = on # synchronization level; diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c index fde483a616a..8c6cf24d237 100644 --- a/src/bin/pg_controldata/pg_controldata.c +++ b/src/bin/pg_controldata/pg_controldata.c @@ -77,6 +77,8 @@ wal_level_str(WalLevel wal_level) return "archive"; case WAL_LEVEL_HOT_STANDBY: return "hot_standby"; + case WAL_LEVEL_LOGICAL: + return "logical"; } return _("unrecognized wal_level"); } diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index 63b73d03291..438e79db48e 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -55,6 +55,22 @@ #define XLOG_HEAP2_VISIBLE 0x40 #define XLOG_HEAP2_MULTI_INSERT 0x50 #define XLOG_HEAP2_LOCK_UPDATED 0x60 +#define XLOG_HEAP2_NEW_CID 0x70 + +/* + * xl_heap_* ->flag values, 8 bits are available. + */ +/* PD_ALL_VISIBLE was cleared */ +#define XLOG_HEAP_ALL_VISIBLE_CLEARED (1<<0) +/* PD_ALL_VISIBLE was cleared in the 2nd page */ +#define XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED (1<<1) +#define XLOG_HEAP_CONTAINS_OLD_TUPLE (1<<2) +#define XLOG_HEAP_CONTAINS_OLD_KEY (1<<3) +#define XLOG_HEAP_CONTAINS_NEW_TUPLE (1<<4) + +/* convenience macro for checking whether any form of old tuple was logged */ +#define XLOG_HEAP_CONTAINS_OLD \ + (XLOG_HEAP_CONTAINS_OLD_TUPLE | XLOG_HEAP_CONTAINS_OLD_KEY) /* * All what we need to find changed tuple @@ -78,10 +94,10 @@ typedef struct xl_heap_delete xl_heaptid target; /* deleted tuple id */ TransactionId xmax; /* xmax of the deleted tuple */ uint8 infobits_set; /* infomask bits */ - bool all_visible_cleared; /* PD_ALL_VISIBLE was cleared */ + uint8 flags; } xl_heap_delete; -#define SizeOfHeapDelete (offsetof(xl_heap_delete, all_visible_cleared) + sizeof(bool)) +#define SizeOfHeapDelete (offsetof(xl_heap_delete, flags) + sizeof(uint8)) /* * We don't store the whole fixed part (HeapTupleHeaderData) of an inserted @@ -100,15 +116,29 @@ typedef struct xl_heap_header #define SizeOfHeapHeader (offsetof(xl_heap_header, t_hoff) + sizeof(uint8)) +/* + * Variant of xl_heap_header that contains the length of the tuple, which is + * useful if the length of the tuple cannot be computed using the overall + * record length. E.g. because there are several tuples inside a single + * record. + */ +typedef struct xl_heap_header_len +{ + uint16 t_len; + xl_heap_header header; +} xl_heap_header_len; + +#define SizeOfHeapHeaderLen (offsetof(xl_heap_header_len, header) + SizeOfHeapHeader) + /* This is what we need to know about insert */ typedef struct xl_heap_insert { xl_heaptid target; /* inserted tuple id */ - bool all_visible_cleared; /* PD_ALL_VISIBLE was cleared */ + uint8 flags; /* xl_heap_header & TUPLE DATA FOLLOWS AT END OF STRUCT */ } xl_heap_insert; -#define SizeOfHeapInsert (offsetof(xl_heap_insert, all_visible_cleared) + sizeof(bool)) +#define SizeOfHeapInsert (offsetof(xl_heap_insert, flags) + sizeof(uint8)) /* * This is what we need to know about a multi-insert. The record consists of @@ -120,7 +150,7 @@ typedef struct xl_heap_multi_insert { RelFileNode node; BlockNumber blkno; - bool all_visible_cleared; + uint8 flags; uint16 ntuples; OffsetNumber offsets[1]; @@ -147,13 +177,12 @@ typedef struct xl_heap_update TransactionId old_xmax; /* xmax of the old tuple */ TransactionId new_xmax; /* xmax of the new tuple */ ItemPointerData newtid; /* new inserted tuple id */ - uint8 old_infobits_set; /* infomask bits to set on old tuple */ - bool all_visible_cleared; /* PD_ALL_VISIBLE was cleared */ - bool new_all_visible_cleared; /* same for the page of newtid */ + uint8 old_infobits_set; /* infomask bits to set on old tuple */ + uint8 flags; /* NEW TUPLE xl_heap_header AND TUPLE DATA FOLLOWS AT END OF STRUCT */ } xl_heap_update; -#define SizeOfHeapUpdate (offsetof(xl_heap_update, new_all_visible_cleared) + sizeof(bool)) +#define SizeOfHeapUpdate (offsetof(xl_heap_update, flags) + sizeof(uint8)) /* * This is what we need to know about vacuum page cleanup/redirect @@ -263,6 +292,29 @@ typedef struct xl_heap_visible #define SizeOfHeapVisible (offsetof(xl_heap_visible, cutoff_xid) + sizeof(TransactionId)) +typedef struct xl_heap_new_cid +{ + /* + * store toplevel xid so we don't have to merge cids from different + * transactions + */ + TransactionId top_xid; + CommandId cmin; + CommandId cmax; + /* + * don't really need the combocid since we have the actual values + * right in this struct, but the padding makes it free and its + * useful for debugging. + */ + CommandId combocid; + /* + * Store the relfilenode/ctid pair to facilitate lookups. + */ + xl_heaptid target; +} xl_heap_new_cid; + +#define SizeOfHeapNewCid (offsetof(xl_heap_new_cid, target) + SizeOfHeapTid) + extern void HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple, TransactionId *latestRemovedXid); diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 1d3e7d8938a..9632378865e 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -215,6 +215,7 @@ extern TransactionId GetCurrentTransactionId(void); extern TransactionId GetCurrentTransactionIdIfAny(void); extern TransactionId GetStableLatestTransactionId(void); extern SubTransactionId GetCurrentSubTransactionId(void); +extern void MarkCurrentTransactionIdLoggedIfAny(void); extern bool SubTransactionIsActive(SubTransactionId subxid); extern CommandId GetCurrentCommandId(bool used); extern TimestampTz GetCurrentTransactionStartTimestamp(void); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 002862cca50..7415a261bbd 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -197,7 +197,8 @@ typedef enum WalLevel { WAL_LEVEL_MINIMAL = 0, WAL_LEVEL_ARCHIVE, - WAL_LEVEL_HOT_STANDBY + WAL_LEVEL_HOT_STANDBY, + WAL_LEVEL_LOGICAL } WalLevel; extern int wal_level; @@ -210,9 +211,12 @@ extern int wal_level; */ #define XLogIsNeeded() (wal_level >= WAL_LEVEL_ARCHIVE) -/* Do we need to WAL-log information required only for Hot Standby? */ +/* Do we need to WAL-log information required only for Hot Standby and logical replication? */ #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_HOT_STANDBY) +/* Do we need to WAL-log information required only for logical replication? */ +#define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL) + #ifdef WAL_DEBUG extern bool XLOG_DEBUG; #endif diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 9fba8c3db86..64ba55355b9 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -55,7 +55,7 @@ typedef struct BkpBlock /* * Each page of XLOG file has a header like this: */ -#define XLOG_PAGE_MAGIC 0xD078 /* can be used as WAL version indicator */ +#define XLOG_PAGE_MAGIC 0xD079 /* can be used as WAL version indicator */ typedef struct XLogPageHeaderData { diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index 21d5871454b..ad878cf1a22 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -104,6 +104,7 @@ typedef struct RelationData List *rd_indexlist; /* list of OIDs of indexes on relation */ Bitmapset *rd_indexattr; /* identifies columns used in indexes */ Bitmapset *rd_keyattr; /* cols that can be ref'd by foreign keys */ + Bitmapset *rd_idattr; /* included in replica identity index */ Oid rd_oidindex; /* OID of unique index on OID, if any */ LockInfoData rd_lockInfo; /* lock mgr's info for locking relation */ RuleLock *rd_rules; /* rewrite rules */ @@ -453,6 +454,29 @@ typedef struct StdRdOptions */ #define RelationIsPopulated(relation) ((relation)->rd_rel->relispopulated) +/* + * RelationIsAccessibleInLogicalDecoding + * True if we need to log enough information to have access via + * decoding snapshot. + */ +#define RelationIsAccessibleInLogicalDecoding(relation) \ + (XLogLogicalInfoActive() && \ + RelationNeedsWAL(relation) && \ + IsCatalogRelation(relation)) + +/* + * RelationIsLogicallyLogged + * True if we need to log enough information to extract the data from the + * WAL stream. + * + * We don't log information for unlogged tables (since they don't WAL log + * anyway) and for system tables (their content is hard to make sense of, and + * it would complicate decoding slightly for little gain). + */ +#define RelationIsLogicallyLogged(relation) \ + (XLogLogicalInfoActive() && \ + RelationNeedsWAL(relation) && \ + !IsCatalogRelation(relation)) /* routines in utils/cache/relcache.c */ extern void RelationIncrementReferenceCount(Relation rel); diff --git a/src/include/utils/relcache.h b/src/include/utils/relcache.h index 8ac2549cb3a..d7604ec113f 100644 --- a/src/include/utils/relcache.h +++ b/src/include/utils/relcache.h @@ -41,7 +41,17 @@ extern List *RelationGetIndexList(Relation relation); extern Oid RelationGetOidIndex(Relation relation); extern List *RelationGetIndexExpressions(Relation relation); extern List *RelationGetIndexPredicate(Relation relation); -extern Bitmapset *RelationGetIndexAttrBitmap(Relation relation, bool keyAttrs); + +typedef enum IndexAttrBitmapKind +{ + INDEX_ATTR_BITMAP_ALL, + INDEX_ATTR_BITMAP_KEY, + INDEX_ATTR_BITMAP_IDENTITY_KEY +} IndexAttrBitmapKind; + +extern Bitmapset *RelationGetIndexAttrBitmap(Relation relation, + IndexAttrBitmapKind keyAttrs); + extern void RelationGetExclusionInfo(Relation indexRelation, Oid **operators, Oid **procs, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index b20eb0d5ac9..c5200372aec 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -791,6 +791,7 @@ IdentifySystemCmd IncrementVarSublevelsUp_context Index IndexArrayKeyInfo +IndexAttrBitmapKind IndexBuildCallback IndexBuildResult IndexBulkDeleteCallback @@ -2419,11 +2420,13 @@ xl_heap_cleanup_info xl_heap_delete xl_heap_freeze xl_heap_header +xl_heap_header_len xl_heap_inplace xl_heap_insert xl_heap_lock xl_heap_lock_updated xl_heap_multi_insert +xl_heap_new_cid xl_heap_newpage xl_heap_update xl_heap_visible |