aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/execMain.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/execMain.c')
-rw-r--r--src/backend/executor/execMain.c125
1 files changed, 122 insertions, 3 deletions
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 71c07288a19..0f47c7e0104 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -42,6 +42,7 @@
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/namespace.h"
+#include "catalog/partition.h"
#include "commands/matview.h"
#include "commands/trigger.h"
#include "executor/execdebug.h"
@@ -825,6 +826,7 @@ InitPlan(QueryDesc *queryDesc, int eflags)
InitResultRelInfo(resultRelInfo,
resultRelation,
resultRelationIndex,
+ true,
estate->es_instrument);
resultRelInfo++;
}
@@ -1019,6 +1021,7 @@ CheckValidResultRel(Relation resultRel, CmdType operation)
switch (resultRel->rd_rel->relkind)
{
case RELKIND_RELATION:
+ case RELKIND_PARTITIONED_TABLE:
/* OK */
break;
case RELKIND_SEQUENCE:
@@ -1152,6 +1155,7 @@ CheckValidRowMarkRel(Relation rel, RowMarkType markType)
switch (rel->rd_rel->relkind)
{
case RELKIND_RELATION:
+ case RELKIND_PARTITIONED_TABLE:
/* OK */
break;
case RELKIND_SEQUENCE:
@@ -1212,6 +1216,7 @@ void
InitResultRelInfo(ResultRelInfo *resultRelInfo,
Relation resultRelationDesc,
Index resultRelationIndex,
+ bool load_partition_check,
int instrument_options)
{
MemSet(resultRelInfo, 0, sizeof(ResultRelInfo));
@@ -1249,6 +1254,10 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
resultRelInfo->ri_ConstraintExprs = NULL;
resultRelInfo->ri_junkFilter = NULL;
resultRelInfo->ri_projectReturning = NULL;
+ if (load_partition_check)
+ resultRelInfo->ri_PartitionCheck =
+ RelationGetPartitionQual(resultRelationDesc,
+ true);
}
/*
@@ -1311,6 +1320,7 @@ ExecGetTriggerResultRel(EState *estate, Oid relid)
InitResultRelInfo(rInfo,
rel,
0, /* dummy rangetable index */
+ true,
estate->es_instrument);
estate->es_trig_target_relations =
lappend(estate->es_trig_target_relations, rInfo);
@@ -1691,6 +1701,46 @@ ExecRelCheck(ResultRelInfo *resultRelInfo,
return NULL;
}
+/*
+ * ExecPartitionCheck --- check that tuple meets the partition constraint.
+ *
+ * Note: This is called *iff* resultRelInfo is the main target table.
+ */
+static bool
+ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
+ EState *estate)
+{
+ ExprContext *econtext;
+
+ /*
+ * If first time through, build expression state tree for the partition
+ * check expression. Keep it in the per-query memory context so they'll
+ * survive throughout the query.
+ */
+ if (resultRelInfo->ri_PartitionCheckExpr == NULL)
+ {
+ List *qual = resultRelInfo->ri_PartitionCheck;
+
+ resultRelInfo->ri_PartitionCheckExpr = (List *)
+ ExecPrepareExpr((Expr *) qual, estate);
+ }
+
+ /*
+ * We will use the EState's per-tuple context for evaluating constraint
+ * expressions (creating it if it's not already there).
+ */
+ econtext = GetPerTupleExprContext(estate);
+
+ /* Arrange for econtext's scan tuple to be the tuple under test */
+ econtext->ecxt_scantuple = slot;
+
+ /*
+ * As in case of the catalogued constraints, we treat a NULL result as
+ * success here, not a failure.
+ */
+ return ExecQual(resultRelInfo->ri_PartitionCheckExpr, econtext, true);
+}
+
void
ExecConstraints(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate)
@@ -1702,9 +1752,9 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
Bitmapset *insertedCols;
Bitmapset *updatedCols;
- Assert(constr);
+ Assert(constr || resultRelInfo->ri_PartitionCheck);
- if (constr->has_not_null)
+ if (constr && constr->has_not_null)
{
int natts = tupdesc->natts;
int attrChk;
@@ -1735,7 +1785,7 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
}
}
- if (constr->num_check > 0)
+ if (constr && constr->num_check > 0)
{
const char *failed;
@@ -1759,6 +1809,26 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
errtableconstraint(rel, failed)));
}
}
+
+ if (resultRelInfo->ri_PartitionCheck &&
+ !ExecPartitionCheck(resultRelInfo, slot, estate))
+ {
+ char *val_desc;
+
+ insertedCols = GetInsertedColumns(resultRelInfo, estate);
+ updatedCols = GetUpdatedColumns(resultRelInfo, estate);
+ modifiedCols = bms_union(insertedCols, updatedCols);
+ val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
+ slot,
+ tupdesc,
+ modifiedCols,
+ 64);
+ ereport(ERROR,
+ (errcode(ERRCODE_CHECK_VIOLATION),
+ errmsg("new row for relation \"%s\" violates partition constraint",
+ RelationGetRelationName(rel)),
+ val_desc ? errdetail("Failing row contains %s.", val_desc) : 0));
+ }
}
/*
@@ -2926,3 +2996,52 @@ EvalPlanQualEnd(EPQState *epqstate)
epqstate->planstate = NULL;
epqstate->origslot = NULL;
}
+
+/*
+ * ExecFindPartition -- Find a leaf partition in the partition tree rooted
+ * at parent, for the heap tuple contained in *slot
+ *
+ * estate must be non-NULL; we'll need it to compute any expressions in the
+ * partition key(s)
+ *
+ * If no leaf partition is found, this routine errors out with the appropriate
+ * error message, else it returns the leaf partition sequence number returned
+ * by get_partition_for_tuple() unchanged.
+ */
+int
+ExecFindPartition(ResultRelInfo *resultRelInfo, PartitionDispatch *pd,
+ TupleTableSlot *slot, EState *estate)
+{
+ int result;
+ Oid failed_at;
+ ExprContext *econtext = GetPerTupleExprContext(estate);
+
+ econtext->ecxt_scantuple = slot;
+ result = get_partition_for_tuple(pd, slot, estate, &failed_at);
+ if (result < 0)
+ {
+ Relation rel = resultRelInfo->ri_RelationDesc;
+ char *val_desc;
+ Bitmapset *insertedCols,
+ *updatedCols,
+ *modifiedCols;
+ TupleDesc tupDesc = RelationGetDescr(rel);
+
+ insertedCols = GetInsertedColumns(resultRelInfo, estate);
+ updatedCols = GetUpdatedColumns(resultRelInfo, estate);
+ modifiedCols = bms_union(insertedCols, updatedCols);
+ val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
+ slot,
+ tupDesc,
+ modifiedCols,
+ 64);
+ Assert(OidIsValid(failed_at));
+ ereport(ERROR,
+ (errcode(ERRCODE_CHECK_VIOLATION),
+ errmsg("no partition of relation \"%s\" found for row",
+ get_rel_name(failed_at)),
+ val_desc ? errdetail("Failing row contains %s.", val_desc) : 0));
+ }
+
+ return result;
+}