diff options
Diffstat (limited to 'src/backend/commands/typecmds.c')
-rw-r--r-- | src/backend/commands/typecmds.c | 335 |
1 files changed, 243 insertions, 92 deletions
diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c index f4cdec3bf2f..315b0feb8e4 100644 --- a/src/backend/commands/typecmds.c +++ b/src/backend/commands/typecmds.c @@ -126,15 +126,19 @@ static Oid findTypeSubscriptingFunction(List *procname, Oid typeOid); static Oid findRangeSubOpclass(List *opcname, Oid subtype); static Oid findRangeCanonicalFunction(List *procname, Oid typeOid); static Oid findRangeSubtypeDiffFunction(List *procname, Oid subtype); -static void validateDomainConstraint(Oid domainoid, char *ccbin); +static void validateDomainCheckConstraint(Oid domainoid, const char *ccbin); +static void validateDomainNotNullConstraint(Oid domainoid); static List *get_rels_with_domain(Oid domainOid, LOCKMODE lockmode); static void checkEnumOwner(HeapTuple tup); -static char *domainAddConstraint(Oid domainOid, Oid domainNamespace, - Oid baseTypeOid, - int typMod, Constraint *constr, - const char *domainName, ObjectAddress *constrAddr); +static char *domainAddCheckConstraint(Oid domainOid, Oid domainNamespace, + Oid baseTypeOid, + int typMod, Constraint *constr, + const char *domainName, ObjectAddress *constrAddr); static Node *replace_domain_constraint_value(ParseState *pstate, ColumnRef *cref); +static void domainAddNotNullConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid, + int typMod, Constraint *constr, + const char *domainName, ObjectAddress *constrAddr); static void AlterTypeRecurse(Oid typeOid, bool isImplicitArray, HeapTuple tup, Relation catalog, AlterTypeRecurseParams *atparams); @@ -1105,9 +1109,15 @@ DefineDomain(CreateDomainStmt *stmt) switch (constr->contype) { case CONSTR_CHECK: - domainAddConstraint(address.objectId, domainNamespace, - basetypeoid, basetypeMod, - constr, domainName, NULL); + domainAddCheckConstraint(address.objectId, domainNamespace, + basetypeoid, basetypeMod, + constr, domainName, NULL); + break; + + case CONSTR_NOTNULL: + domainAddNotNullConstraint(address.objectId, domainNamespace, + basetypeoid, basetypeMod, + constr, domainName, NULL); break; /* Other constraint types were fully processed above */ @@ -2723,66 +2733,32 @@ AlterDomainNotNull(List *names, bool notNull) return address; } - /* Adding a NOT NULL constraint requires checking existing columns */ if (notNull) { - List *rels; - ListCell *rt; + Constraint *constr; - /* Fetch relation list with attributes based on this domain */ - /* ShareLock is sufficient to prevent concurrent data changes */ + constr = makeNode(Constraint); + constr->contype = CONSTR_NOTNULL; + constr->initially_valid = true; + constr->location = -1; - rels = get_rels_with_domain(domainoid, ShareLock); - - foreach(rt, rels) - { - RelToCheck *rtc = (RelToCheck *) lfirst(rt); - Relation testrel = rtc->rel; - TupleDesc tupdesc = RelationGetDescr(testrel); - TupleTableSlot *slot; - TableScanDesc scan; - Snapshot snapshot; - - /* Scan all tuples in this relation */ - snapshot = RegisterSnapshot(GetLatestSnapshot()); - scan = table_beginscan(testrel, snapshot, 0, NULL); - slot = table_slot_create(testrel, NULL); - while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) - { - int i; + domainAddNotNullConstraint(domainoid, typTup->typnamespace, + typTup->typbasetype, typTup->typtypmod, + constr, NameStr(typTup->typname), NULL); - /* Test attributes that are of the domain */ - for (i = 0; i < rtc->natts; i++) - { - int attnum = rtc->atts[i]; - Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + validateDomainNotNullConstraint(domainoid); + } + else + { + HeapTuple conTup; + ObjectAddress conobj; - if (slot_attisnull(slot, attnum)) - { - /* - * In principle the auxiliary information for this - * error should be errdatatype(), but errtablecol() - * seems considerably more useful in practice. Since - * this code only executes in an ALTER DOMAIN command, - * the client should already know which domain is in - * question. - */ - ereport(ERROR, - (errcode(ERRCODE_NOT_NULL_VIOLATION), - errmsg("column \"%s\" of table \"%s\" contains null values", - NameStr(attr->attname), - RelationGetRelationName(testrel)), - errtablecol(testrel, attnum))); - } - } - } - ExecDropSingleTupleTableSlot(slot); - table_endscan(scan); - UnregisterSnapshot(snapshot); + conTup = findDomainNotNullConstraint(domainoid); + if (conTup == NULL) + elog(ERROR, "could not find not-null constraint on domain \"%s\"", NameStr(typTup->typname)); - /* Close each rel after processing, but keep lock */ - table_close(testrel, NoLock); - } + ObjectAddressSet(conobj, ConstraintRelationId, ((Form_pg_constraint) GETSTRUCT(conTup))->oid); + performDeletion(&conobj, DROP_RESTRICT, 0); } /* @@ -2863,10 +2839,17 @@ AlterDomainDropConstraint(List *names, const char *constrName, /* There can be at most one matching row */ if ((contup = systable_getnext(conscan)) != NULL) { + Form_pg_constraint construct = (Form_pg_constraint) GETSTRUCT(contup); ObjectAddress conobj; + if (construct->contype == CONSTRAINT_NOTNULL) + { + ((Form_pg_type) GETSTRUCT(tup))->typnotnull = false; + CatalogTupleUpdate(rel, &tup->t_self, tup); + } + conobj.classId = ConstraintRelationId; - conobj.objectId = ((Form_pg_constraint) GETSTRUCT(contup))->oid; + conobj.objectId = construct->oid; conobj.objectSubId = 0; performDeletion(&conobj, behavior, 0); @@ -2921,7 +2904,7 @@ AlterDomainAddConstraint(List *names, Node *newConstraint, Form_pg_type typTup; Constraint *constr; char *ccbin; - ObjectAddress address; + ObjectAddress address = InvalidObjectAddress; /* Make a TypeName so we can use standard type lookup machinery */ typename = makeTypeNameFromNameList(names); @@ -2947,6 +2930,7 @@ AlterDomainAddConstraint(List *names, Node *newConstraint, switch (constr->contype) { case CONSTR_CHECK: + case CONSTR_NOTNULL: /* processed below */ break; @@ -2989,29 +2973,52 @@ AlterDomainAddConstraint(List *names, Node *newConstraint, break; } - /* - * Since all other constraint types throw errors, this must be a check - * constraint. First, process the constraint expression and add an entry - * to pg_constraint. - */ + if (constr->contype == CONSTR_CHECK) + { + /* + * First, process the constraint expression and add an entry to + * pg_constraint. + */ - ccbin = domainAddConstraint(domainoid, typTup->typnamespace, - typTup->typbasetype, typTup->typtypmod, - constr, NameStr(typTup->typname), constrAddr); + ccbin = domainAddCheckConstraint(domainoid, typTup->typnamespace, + typTup->typbasetype, typTup->typtypmod, + constr, NameStr(typTup->typname), constrAddr); - /* - * If requested to validate the constraint, test all values stored in the - * attributes based on the domain the constraint is being added to. - */ - if (!constr->skip_validation) - validateDomainConstraint(domainoid, ccbin); - /* - * We must send out an sinval message for the domain, to ensure that any - * dependent plans get rebuilt. Since this command doesn't change the - * domain's pg_type row, that won't happen automatically; do it manually. - */ - CacheInvalidateHeapTuple(typrel, tup, NULL); + /* + * If requested to validate the constraint, test all values stored in + * the attributes based on the domain the constraint is being added + * to. + */ + if (!constr->skip_validation) + validateDomainCheckConstraint(domainoid, ccbin); + + /* + * We must send out an sinval message for the domain, to ensure that + * any dependent plans get rebuilt. Since this command doesn't change + * the domain's pg_type row, that won't happen automatically; do it + * manually. + */ + CacheInvalidateHeapTuple(typrel, tup, NULL); + } + else if (constr->contype == CONSTR_NOTNULL) + { + /* Is the domain already set NOT NULL? */ + if (typTup->typnotnull) + { + table_close(typrel, RowExclusiveLock); + return address; + } + domainAddNotNullConstraint(domainoid, typTup->typnamespace, + typTup->typbasetype, typTup->typtypmod, + constr, NameStr(typTup->typname), constrAddr); + + if (!constr->skip_validation) + validateDomainNotNullConstraint(domainoid); + + typTup->typnotnull = true; + CatalogTupleUpdate(typrel, &tup->t_self, tup); + } ObjectAddressSet(address, TypeRelationId, domainoid); @@ -3096,7 +3103,7 @@ AlterDomainValidateConstraint(List *names, const char *constrName) val = SysCacheGetAttrNotNull(CONSTROID, tuple, Anum_pg_constraint_conbin); conbin = TextDatumGetCString(val); - validateDomainConstraint(domainoid, conbin); + validateDomainCheckConstraint(domainoid, conbin); /* * Now update the catalog, while we have the door open. @@ -3122,8 +3129,76 @@ AlterDomainValidateConstraint(List *names, const char *constrName) return address; } +/* + * Verify that all columns currently using the domain are not null. + */ +static void +validateDomainNotNullConstraint(Oid domainoid) +{ + List *rels; + ListCell *rt; + + /* Fetch relation list with attributes based on this domain */ + /* ShareLock is sufficient to prevent concurrent data changes */ + + rels = get_rels_with_domain(domainoid, ShareLock); + + foreach(rt, rels) + { + RelToCheck *rtc = (RelToCheck *) lfirst(rt); + Relation testrel = rtc->rel; + TupleDesc tupdesc = RelationGetDescr(testrel); + TupleTableSlot *slot; + TableScanDesc scan; + Snapshot snapshot; + + /* Scan all tuples in this relation */ + snapshot = RegisterSnapshot(GetLatestSnapshot()); + scan = table_beginscan(testrel, snapshot, 0, NULL); + slot = table_slot_create(testrel, NULL); + while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) + { + int i; + + /* Test attributes that are of the domain */ + for (i = 0; i < rtc->natts; i++) + { + int attnum = rtc->atts[i]; + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + + if (slot_attisnull(slot, attnum)) + { + /* + * In principle the auxiliary information for this error + * should be errdatatype(), but errtablecol() seems + * considerably more useful in practice. Since this code + * only executes in an ALTER DOMAIN command, the client + * should already know which domain is in question. + */ + ereport(ERROR, + (errcode(ERRCODE_NOT_NULL_VIOLATION), + errmsg("column \"%s\" of table \"%s\" contains null values", + NameStr(attr->attname), + RelationGetRelationName(testrel)), + errtablecol(testrel, attnum))); + } + } + } + ExecDropSingleTupleTableSlot(slot); + table_endscan(scan); + UnregisterSnapshot(snapshot); + + /* Close each rel after processing, but keep lock */ + table_close(testrel, NoLock); + } +} + +/* + * Verify that all columns currently using the domain satisfy the given check + * constraint expression. + */ static void -validateDomainConstraint(Oid domainoid, char *ccbin) +validateDomainCheckConstraint(Oid domainoid, const char *ccbin) { Expr *expr = (Expr *) stringToNode(ccbin); List *rels; @@ -3429,12 +3504,12 @@ checkDomainOwner(HeapTuple tup) } /* - * domainAddConstraint - code shared between CREATE and ALTER DOMAIN + * domainAddCheckConstraint - code shared between CREATE and ALTER DOMAIN */ static char * -domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid, - int typMod, Constraint *constr, - const char *domainName, ObjectAddress *constrAddr) +domainAddCheckConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid, + int typMod, Constraint *constr, + const char *domainName, ObjectAddress *constrAddr) { Node *expr; char *ccbin; @@ -3442,6 +3517,8 @@ domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid, CoerceToDomainValue *domVal; Oid ccoid; + Assert(constr->contype == CONSTR_CHECK); + /* * Assign or validate constraint name */ @@ -3562,9 +3639,10 @@ replace_domain_constraint_value(ParseState *pstate, ColumnRef *cref) { /* * Check for a reference to "value", and if that's what it is, replace - * with a CoerceToDomainValue as prepared for us by domainAddConstraint. - * (We handle VALUE as a name, not a keyword, to avoid breaking a lot of - * applications that have used VALUE as a column name in the past.) + * with a CoerceToDomainValue as prepared for us by + * domainAddCheckConstraint. (We handle VALUE as a name, not a keyword, to + * avoid breaking a lot of applications that have used VALUE as a column + * name in the past.) */ if (list_length(cref->fields) == 1) { @@ -3584,6 +3662,79 @@ replace_domain_constraint_value(ParseState *pstate, ColumnRef *cref) return NULL; } +/* + * domainAddNotNullConstraint - code shared between CREATE and ALTER DOMAIN + */ +static void +domainAddNotNullConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid, + int typMod, Constraint *constr, + const char *domainName, ObjectAddress *constrAddr) +{ + Oid ccoid; + + Assert(constr->contype == CONSTR_NOTNULL); + + /* + * Assign or validate constraint name + */ + if (constr->conname) + { + if (ConstraintNameIsUsed(CONSTRAINT_DOMAIN, + domainOid, + constr->conname)) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("constraint \"%s\" for domain \"%s\" already exists", + constr->conname, domainName))); + } + else + constr->conname = ChooseConstraintName(domainName, + NULL, + "not_null", + domainNamespace, + NIL); + + /* + * Store the constraint in pg_constraint + */ + ccoid = + CreateConstraintEntry(constr->conname, /* Constraint Name */ + domainNamespace, /* namespace */ + CONSTRAINT_NOTNULL, /* Constraint Type */ + false, /* Is Deferrable */ + false, /* Is Deferred */ + !constr->skip_validation, /* Is Validated */ + InvalidOid, /* no parent constraint */ + InvalidOid, /* not a relation constraint */ + NULL, + 0, + 0, + domainOid, /* domain constraint */ + InvalidOid, /* no associated index */ + InvalidOid, /* Foreign key fields */ + NULL, + NULL, + NULL, + NULL, + 0, + ' ', + ' ', + NULL, + 0, + ' ', + NULL, /* not an exclusion constraint */ + NULL, + NULL, + true, /* is local */ + 0, /* inhcount */ + false, /* connoinherit */ + false, /* conperiod */ + false); /* is_internal */ + + if (constrAddr) + ObjectAddressSet(*constrAddr, ConstraintRelationId, ccoid); +} + /* * Execute ALTER TYPE RENAME |