aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/execReplication.c
diff options
context:
space:
mode:
authorAndres Freund <andres@anarazel.de>2019-03-23 19:55:57 -0700
committerAndres Freund <andres@anarazel.de>2019-03-23 19:55:57 -0700
commit5db6df0c0117ff2a4e0cd87594d2db408cd5022f (patch)
tree7b06b96b6f8c1b7e4cdfb602af357f81e21f23b1 /src/backend/executor/execReplication.c
parentf778e537a0d02d5e05016da3e6f4068914101dee (diff)
downloadpostgresql-5db6df0c0117ff2a4e0cd87594d2db408cd5022f.tar.gz
postgresql-5db6df0c0117ff2a4e0cd87594d2db408cd5022f.zip
tableam: Add tuple_{insert, delete, update, lock} and use.
This adds new, required, table AM callbacks for insert/delete/update and lock_tuple. To be able to reasonably use those, the EvalPlanQual mechanism had to be adapted, moving more logic into the AM. Previously both delete/update/lock call-sites and the EPQ mechanism had to have awareness of the specific tuple format to be able to fetch the latest version of a tuple. Obviously that needs to be abstracted away. To do so, move the logic that find the latest row version into the AM. lock_tuple has a new flag argument, TUPLE_LOCK_FLAG_FIND_LAST_VERSION, that forces it to lock the last version, rather than the current one. It'd have been possible to do so via a separate callback as well, but finding the last version usually also necessitates locking the newest version, making it sensible to combine the two. This replaces the previous use of EvalPlanQualFetch(). Additionally HeapTupleUpdated, which previously signaled either a concurrent update or delete, is now split into two, to avoid callers needing AM specific knowledge to differentiate. The move of finding the latest row version into tuple_lock means that encountering a row concurrently moved into another partition will now raise an error about "tuple to be locked" rather than "tuple to be updated/deleted" - which is accurate, as that always happens when locking rows. While possible slightly less helpful for users, it seems like an acceptable trade-off. As part of this commit HTSU_Result has been renamed to TM_Result, and its members been expanded to differentiated between updating and deleting. HeapUpdateFailureData has been renamed to TM_FailureData. The interface to speculative insertion is changed so nodeModifyTable.c does not have to set the speculative token itself anymore. Instead there's a version of tuple_insert, tuple_insert_speculative, that performs the speculative insertion (without requiring a flag to signal that fact), and the speculative insertion is either made permanent with table_complete_speculative(succeeded = true) or aborted with succeeded = false). Note that multi_insert is not yet routed through tableam, nor is COPY. Changing multi_insert requires changes to copy.c that are large enough to better be done separately. Similarly, although simpler, CREATE TABLE AS and CREATE MATERIALIZED VIEW are also only going to be adjusted in a later commit. Author: Andres Freund and Haribabu Kommi Discussion: https://postgr.es/m/20180703070645.wchpu5muyto5n647@alap3.anarazel.de https://postgr.es/m/20190313003903.nwvrxi7rw3ywhdel@alap3.anarazel.de https://postgr.es/m/20160812231527.GA690404@alvherre.pgsql
Diffstat (limited to 'src/backend/executor/execReplication.c')
-rw-r--r--src/backend/executor/execReplication.c137
1 files changed, 54 insertions, 83 deletions
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index c539bb5a3f6..d8b48c667ce 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -15,7 +15,6 @@
#include "postgres.h"
#include "access/genam.h"
-#include "access/heapam.h"
#include "access/relscan.h"
#include "access/tableam.h"
#include "access/transam.h"
@@ -168,35 +167,28 @@ retry:
/* Found tuple, try to lock it in the lockmode. */
if (found)
{
- Buffer buf;
- HeapUpdateFailureData hufd;
- HTSU_Result res;
- HeapTupleData locktup;
- HeapTupleTableSlot *hslot = (HeapTupleTableSlot *)outslot;
-
- /* Only a heap tuple has item pointers. */
- Assert(TTS_IS_HEAPTUPLE(outslot) || TTS_IS_BUFFERTUPLE(outslot));
- ItemPointerCopy(&hslot->tuple->t_self, &locktup.t_self);
+ TM_FailureData tmfd;
+ TM_Result res;
PushActiveSnapshot(GetLatestSnapshot());
- res = heap_lock_tuple(rel, &locktup, GetCurrentCommandId(false),
- lockmode,
- LockWaitBlock,
- false /* don't follow updates */ ,
- &buf, &hufd);
- /* the tuple slot already has the buffer pinned */
- ReleaseBuffer(buf);
+ res = table_lock_tuple(rel, &(outslot->tts_tid), GetLatestSnapshot(),
+ outslot,
+ GetCurrentCommandId(false),
+ lockmode,
+ LockWaitBlock,
+ 0 /* don't follow updates */ ,
+ &tmfd);
PopActiveSnapshot();
switch (res)
{
- case HeapTupleMayBeUpdated:
+ case TM_Ok:
break;
- case HeapTupleUpdated:
+ case TM_Updated:
/* XXX: Improve handling here */
- if (ItemPointerIndicatesMovedPartitions(&hufd.ctid))
+ if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
ereport(LOG,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
@@ -205,11 +197,17 @@ retry:
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("concurrent update, retrying")));
goto retry;
- case HeapTupleInvisible:
+ case TM_Deleted:
+ /* XXX: Improve handling here */
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent delete, retrying")));
+ goto retry;
+ case TM_Invisible:
elog(ERROR, "attempted to lock invisible tuple");
break;
default:
- elog(ERROR, "unexpected heap_lock_tuple status: %u", res);
+ elog(ERROR, "unexpected table_lock_tuple status: %u", res);
break;
}
}
@@ -333,35 +331,28 @@ retry:
/* Found tuple, try to lock it in the lockmode. */
if (found)
{
- Buffer buf;
- HeapUpdateFailureData hufd;
- HTSU_Result res;
- HeapTupleData locktup;
- HeapTupleTableSlot *hslot = (HeapTupleTableSlot *)outslot;
-
- /* Only a heap tuple has item pointers. */
- Assert(TTS_IS_HEAPTUPLE(outslot) || TTS_IS_BUFFERTUPLE(outslot));
- ItemPointerCopy(&hslot->tuple->t_self, &locktup.t_self);
+ TM_FailureData tmfd;
+ TM_Result res;
PushActiveSnapshot(GetLatestSnapshot());
- res = heap_lock_tuple(rel, &locktup, GetCurrentCommandId(false),
- lockmode,
- LockWaitBlock,
- false /* don't follow updates */ ,
- &buf, &hufd);
- /* the tuple slot already has the buffer pinned */
- ReleaseBuffer(buf);
+ res = table_lock_tuple(rel, &(outslot->tts_tid), GetLatestSnapshot(),
+ outslot,
+ GetCurrentCommandId(false),
+ lockmode,
+ LockWaitBlock,
+ 0 /* don't follow updates */ ,
+ &tmfd);
PopActiveSnapshot();
switch (res)
{
- case HeapTupleMayBeUpdated:
+ case TM_Ok:
break;
- case HeapTupleUpdated:
+ case TM_Updated:
/* XXX: Improve handling here */
- if (ItemPointerIndicatesMovedPartitions(&hufd.ctid))
+ if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
ereport(LOG,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
@@ -370,11 +361,17 @@ retry:
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("concurrent update, retrying")));
goto retry;
- case HeapTupleInvisible:
+ case TM_Deleted:
+ /* XXX: Improve handling here */
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent delete, retrying")));
+ goto retry;
+ case TM_Invisible:
elog(ERROR, "attempted to lock invisible tuple");
break;
default:
- elog(ERROR, "unexpected heap_lock_tuple status: %u", res);
+ elog(ERROR, "unexpected table_lock_tuple status: %u", res);
break;
}
}
@@ -395,7 +392,6 @@ void
ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
{
bool skip_tuple = false;
- HeapTuple tuple;
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
Relation rel = resultRelInfo->ri_RelationDesc;
@@ -422,16 +418,11 @@ ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
if (resultRelInfo->ri_PartitionCheck)
ExecPartitionCheck(resultRelInfo, slot, estate, true);
- /* Materialize slot into a tuple that we can scribble upon. */
- tuple = ExecFetchSlotHeapTuple(slot, true, NULL);
-
/* OK, store the tuple and create index entries for it */
- simple_heap_insert(rel, tuple);
- ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
+ simple_table_insert(resultRelInfo->ri_RelationDesc, slot);
if (resultRelInfo->ri_NumIndices > 0)
- recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
- estate, false, NULL,
+ recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL,
NIL);
/* AFTER ROW INSERT Triggers */
@@ -459,13 +450,9 @@ ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate,
TupleTableSlot *searchslot, TupleTableSlot *slot)
{
bool skip_tuple = false;
- HeapTuple tuple;
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
Relation rel = resultRelInfo->ri_RelationDesc;
- HeapTupleTableSlot *hsearchslot = (HeapTupleTableSlot *)searchslot;
-
- /* We expect the searchslot to contain a heap tuple. */
- Assert(TTS_IS_HEAPTUPLE(searchslot) || TTS_IS_BUFFERTUPLE(searchslot));
+ ItemPointer tid = &(searchslot->tts_tid);
/* For now we support only tables. */
Assert(rel->rd_rel->relkind == RELKIND_RELATION);
@@ -477,14 +464,14 @@ ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate,
resultRelInfo->ri_TrigDesc->trig_update_before_row)
{
if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
- &hsearchslot->tuple->t_self,
- NULL, slot))
+ tid, NULL, slot))
skip_tuple = true; /* "do nothing" */
}
if (!skip_tuple)
{
List *recheckIndexes = NIL;
+ bool update_indexes;
/* Check the constraints of the tuple */
if (rel->rd_att->constr)
@@ -492,23 +479,16 @@ ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate,
if (resultRelInfo->ri_PartitionCheck)
ExecPartitionCheck(resultRelInfo, slot, estate, true);
- /* Materialize slot into a tuple that we can scribble upon. */
- tuple = ExecFetchSlotHeapTuple(slot, true, NULL);
+ simple_table_update(rel, tid, slot,estate->es_snapshot,
+ &update_indexes);
- /* OK, update the tuple and index entries for it */
- simple_heap_update(rel, &hsearchslot->tuple->t_self, tuple);
- ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
-
- if (resultRelInfo->ri_NumIndices > 0 &&
- !HeapTupleIsHeapOnly(tuple))
- recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
- estate, false, NULL,
+ if (resultRelInfo->ri_NumIndices > 0 && update_indexes)
+ recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL,
NIL);
/* AFTER ROW UPDATE Triggers */
ExecARUpdateTriggers(estate, resultRelInfo,
- &(tuple->t_self),
- NULL, slot,
+ tid, NULL, slot,
recheckIndexes, NULL);
list_free(recheckIndexes);
@@ -528,11 +508,7 @@ ExecSimpleRelationDelete(EState *estate, EPQState *epqstate,
bool skip_tuple = false;
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
Relation rel = resultRelInfo->ri_RelationDesc;
- HeapTupleTableSlot *hsearchslot = (HeapTupleTableSlot *)searchslot;
-
- /* For now we support only tables and heap tuples. */
- Assert(rel->rd_rel->relkind == RELKIND_RELATION);
- Assert(TTS_IS_HEAPTUPLE(searchslot) || TTS_IS_BUFFERTUPLE(searchslot));
+ ItemPointer tid = &searchslot->tts_tid;
CheckCmdReplicaIdentity(rel, CMD_DELETE);
@@ -541,23 +517,18 @@ ExecSimpleRelationDelete(EState *estate, EPQState *epqstate,
resultRelInfo->ri_TrigDesc->trig_delete_before_row)
{
skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
- &hsearchslot->tuple->t_self,
- NULL, NULL);
+ tid, NULL, NULL);
}
if (!skip_tuple)
{
- List *recheckIndexes = NIL;
-
/* OK, delete the tuple */
- simple_heap_delete(rel, &hsearchslot->tuple->t_self);
+ simple_table_delete(rel, tid, estate->es_snapshot);
/* AFTER ROW DELETE Triggers */
ExecARDeleteTriggers(estate, resultRelInfo,
- &hsearchslot->tuple->t_self, NULL, NULL);
-
- list_free(recheckIndexes);
+ tid, NULL, NULL);
}
}