diff options
author | Andres Freund <andres@anarazel.de> | 2019-03-23 19:55:57 -0700 |
---|---|---|
committer | Andres Freund <andres@anarazel.de> | 2019-03-23 19:55:57 -0700 |
commit | 5db6df0c0117ff2a4e0cd87594d2db408cd5022f (patch) | |
tree | 7b06b96b6f8c1b7e4cdfb602af357f81e21f23b1 /src/backend/executor/execReplication.c | |
parent | f778e537a0d02d5e05016da3e6f4068914101dee (diff) | |
download | postgresql-5db6df0c0117ff2a4e0cd87594d2db408cd5022f.tar.gz postgresql-5db6df0c0117ff2a4e0cd87594d2db408cd5022f.zip |
tableam: Add tuple_{insert, delete, update, lock} and use.
This adds new, required, table AM callbacks for insert/delete/update
and lock_tuple. To be able to reasonably use those, the EvalPlanQual
mechanism had to be adapted, moving more logic into the AM.
Previously both delete/update/lock call-sites and the EPQ mechanism had
to have awareness of the specific tuple format to be able to fetch the
latest version of a tuple. Obviously that needs to be abstracted
away. To do so, move the logic that find the latest row version into
the AM. lock_tuple has a new flag argument,
TUPLE_LOCK_FLAG_FIND_LAST_VERSION, that forces it to lock the last
version, rather than the current one. It'd have been possible to do
so via a separate callback as well, but finding the last version
usually also necessitates locking the newest version, making it
sensible to combine the two. This replaces the previous use of
EvalPlanQualFetch(). Additionally HeapTupleUpdated, which previously
signaled either a concurrent update or delete, is now split into two,
to avoid callers needing AM specific knowledge to differentiate.
The move of finding the latest row version into tuple_lock means that
encountering a row concurrently moved into another partition will now
raise an error about "tuple to be locked" rather than "tuple to be
updated/deleted" - which is accurate, as that always happens when
locking rows. While possible slightly less helpful for users, it seems
like an acceptable trade-off.
As part of this commit HTSU_Result has been renamed to TM_Result, and
its members been expanded to differentiated between updating and
deleting. HeapUpdateFailureData has been renamed to TM_FailureData.
The interface to speculative insertion is changed so nodeModifyTable.c
does not have to set the speculative token itself anymore. Instead
there's a version of tuple_insert, tuple_insert_speculative, that
performs the speculative insertion (without requiring a flag to signal
that fact), and the speculative insertion is either made permanent
with table_complete_speculative(succeeded = true) or aborted with
succeeded = false).
Note that multi_insert is not yet routed through tableam, nor is
COPY. Changing multi_insert requires changes to copy.c that are large
enough to better be done separately.
Similarly, although simpler, CREATE TABLE AS and CREATE MATERIALIZED
VIEW are also only going to be adjusted in a later commit.
Author: Andres Freund and Haribabu Kommi
Discussion:
https://postgr.es/m/20180703070645.wchpu5muyto5n647@alap3.anarazel.de
https://postgr.es/m/20190313003903.nwvrxi7rw3ywhdel@alap3.anarazel.de
https://postgr.es/m/20160812231527.GA690404@alvherre.pgsql
Diffstat (limited to 'src/backend/executor/execReplication.c')
-rw-r--r-- | src/backend/executor/execReplication.c | 137 |
1 files changed, 54 insertions, 83 deletions
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index c539bb5a3f6..d8b48c667ce 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -15,7 +15,6 @@ #include "postgres.h" #include "access/genam.h" -#include "access/heapam.h" #include "access/relscan.h" #include "access/tableam.h" #include "access/transam.h" @@ -168,35 +167,28 @@ retry: /* Found tuple, try to lock it in the lockmode. */ if (found) { - Buffer buf; - HeapUpdateFailureData hufd; - HTSU_Result res; - HeapTupleData locktup; - HeapTupleTableSlot *hslot = (HeapTupleTableSlot *)outslot; - - /* Only a heap tuple has item pointers. */ - Assert(TTS_IS_HEAPTUPLE(outslot) || TTS_IS_BUFFERTUPLE(outslot)); - ItemPointerCopy(&hslot->tuple->t_self, &locktup.t_self); + TM_FailureData tmfd; + TM_Result res; PushActiveSnapshot(GetLatestSnapshot()); - res = heap_lock_tuple(rel, &locktup, GetCurrentCommandId(false), - lockmode, - LockWaitBlock, - false /* don't follow updates */ , - &buf, &hufd); - /* the tuple slot already has the buffer pinned */ - ReleaseBuffer(buf); + res = table_lock_tuple(rel, &(outslot->tts_tid), GetLatestSnapshot(), + outslot, + GetCurrentCommandId(false), + lockmode, + LockWaitBlock, + 0 /* don't follow updates */ , + &tmfd); PopActiveSnapshot(); switch (res) { - case HeapTupleMayBeUpdated: + case TM_Ok: break; - case HeapTupleUpdated: + case TM_Updated: /* XXX: Improve handling here */ - if (ItemPointerIndicatesMovedPartitions(&hufd.ctid)) + if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid)) ereport(LOG, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); @@ -205,11 +197,17 @@ retry: (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("concurrent update, retrying"))); goto retry; - case HeapTupleInvisible: + case TM_Deleted: + /* XXX: Improve handling here */ + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent delete, retrying"))); + goto retry; + case TM_Invisible: elog(ERROR, "attempted to lock invisible tuple"); break; default: - elog(ERROR, "unexpected heap_lock_tuple status: %u", res); + elog(ERROR, "unexpected table_lock_tuple status: %u", res); break; } } @@ -333,35 +331,28 @@ retry: /* Found tuple, try to lock it in the lockmode. */ if (found) { - Buffer buf; - HeapUpdateFailureData hufd; - HTSU_Result res; - HeapTupleData locktup; - HeapTupleTableSlot *hslot = (HeapTupleTableSlot *)outslot; - - /* Only a heap tuple has item pointers. */ - Assert(TTS_IS_HEAPTUPLE(outslot) || TTS_IS_BUFFERTUPLE(outslot)); - ItemPointerCopy(&hslot->tuple->t_self, &locktup.t_self); + TM_FailureData tmfd; + TM_Result res; PushActiveSnapshot(GetLatestSnapshot()); - res = heap_lock_tuple(rel, &locktup, GetCurrentCommandId(false), - lockmode, - LockWaitBlock, - false /* don't follow updates */ , - &buf, &hufd); - /* the tuple slot already has the buffer pinned */ - ReleaseBuffer(buf); + res = table_lock_tuple(rel, &(outslot->tts_tid), GetLatestSnapshot(), + outslot, + GetCurrentCommandId(false), + lockmode, + LockWaitBlock, + 0 /* don't follow updates */ , + &tmfd); PopActiveSnapshot(); switch (res) { - case HeapTupleMayBeUpdated: + case TM_Ok: break; - case HeapTupleUpdated: + case TM_Updated: /* XXX: Improve handling here */ - if (ItemPointerIndicatesMovedPartitions(&hufd.ctid)) + if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid)) ereport(LOG, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); @@ -370,11 +361,17 @@ retry: (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("concurrent update, retrying"))); goto retry; - case HeapTupleInvisible: + case TM_Deleted: + /* XXX: Improve handling here */ + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent delete, retrying"))); + goto retry; + case TM_Invisible: elog(ERROR, "attempted to lock invisible tuple"); break; default: - elog(ERROR, "unexpected heap_lock_tuple status: %u", res); + elog(ERROR, "unexpected table_lock_tuple status: %u", res); break; } } @@ -395,7 +392,6 @@ void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot) { bool skip_tuple = false; - HeapTuple tuple; ResultRelInfo *resultRelInfo = estate->es_result_relation_info; Relation rel = resultRelInfo->ri_RelationDesc; @@ -422,16 +418,11 @@ ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot) if (resultRelInfo->ri_PartitionCheck) ExecPartitionCheck(resultRelInfo, slot, estate, true); - /* Materialize slot into a tuple that we can scribble upon. */ - tuple = ExecFetchSlotHeapTuple(slot, true, NULL); - /* OK, store the tuple and create index entries for it */ - simple_heap_insert(rel, tuple); - ItemPointerCopy(&tuple->t_self, &slot->tts_tid); + simple_table_insert(resultRelInfo->ri_RelationDesc, slot); if (resultRelInfo->ri_NumIndices > 0) - recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), - estate, false, NULL, + recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL, NIL); /* AFTER ROW INSERT Triggers */ @@ -459,13 +450,9 @@ ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot) { bool skip_tuple = false; - HeapTuple tuple; ResultRelInfo *resultRelInfo = estate->es_result_relation_info; Relation rel = resultRelInfo->ri_RelationDesc; - HeapTupleTableSlot *hsearchslot = (HeapTupleTableSlot *)searchslot; - - /* We expect the searchslot to contain a heap tuple. */ - Assert(TTS_IS_HEAPTUPLE(searchslot) || TTS_IS_BUFFERTUPLE(searchslot)); + ItemPointer tid = &(searchslot->tts_tid); /* For now we support only tables. */ Assert(rel->rd_rel->relkind == RELKIND_RELATION); @@ -477,14 +464,14 @@ ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, resultRelInfo->ri_TrigDesc->trig_update_before_row) { if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo, - &hsearchslot->tuple->t_self, - NULL, slot)) + tid, NULL, slot)) skip_tuple = true; /* "do nothing" */ } if (!skip_tuple) { List *recheckIndexes = NIL; + bool update_indexes; /* Check the constraints of the tuple */ if (rel->rd_att->constr) @@ -492,23 +479,16 @@ ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, if (resultRelInfo->ri_PartitionCheck) ExecPartitionCheck(resultRelInfo, slot, estate, true); - /* Materialize slot into a tuple that we can scribble upon. */ - tuple = ExecFetchSlotHeapTuple(slot, true, NULL); + simple_table_update(rel, tid, slot,estate->es_snapshot, + &update_indexes); - /* OK, update the tuple and index entries for it */ - simple_heap_update(rel, &hsearchslot->tuple->t_self, tuple); - ItemPointerCopy(&tuple->t_self, &slot->tts_tid); - - if (resultRelInfo->ri_NumIndices > 0 && - !HeapTupleIsHeapOnly(tuple)) - recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self), - estate, false, NULL, + if (resultRelInfo->ri_NumIndices > 0 && update_indexes) + recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL, NIL); /* AFTER ROW UPDATE Triggers */ ExecARUpdateTriggers(estate, resultRelInfo, - &(tuple->t_self), - NULL, slot, + tid, NULL, slot, recheckIndexes, NULL); list_free(recheckIndexes); @@ -528,11 +508,7 @@ ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, bool skip_tuple = false; ResultRelInfo *resultRelInfo = estate->es_result_relation_info; Relation rel = resultRelInfo->ri_RelationDesc; - HeapTupleTableSlot *hsearchslot = (HeapTupleTableSlot *)searchslot; - - /* For now we support only tables and heap tuples. */ - Assert(rel->rd_rel->relkind == RELKIND_RELATION); - Assert(TTS_IS_HEAPTUPLE(searchslot) || TTS_IS_BUFFERTUPLE(searchslot)); + ItemPointer tid = &searchslot->tts_tid; CheckCmdReplicaIdentity(rel, CMD_DELETE); @@ -541,23 +517,18 @@ ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, resultRelInfo->ri_TrigDesc->trig_delete_before_row) { skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo, - &hsearchslot->tuple->t_self, - NULL, NULL); + tid, NULL, NULL); } if (!skip_tuple) { - List *recheckIndexes = NIL; - /* OK, delete the tuple */ - simple_heap_delete(rel, &hsearchslot->tuple->t_self); + simple_table_delete(rel, tid, estate->es_snapshot); /* AFTER ROW DELETE Triggers */ ExecARDeleteTriggers(estate, resultRelInfo, - &hsearchslot->tuple->t_self, NULL, NULL); - - list_free(recheckIndexes); + tid, NULL, NULL); } } |