This should fix a bug where a row that was updated or

deleted that had another row inserted/updated to its old
value during the same statement or other statements before the
integrity check for noaction would incorrectly error.  This
could happen in deferred constraints or due to triggers or
functions. It's effectively a reworking of the previous patch that
did a not exists to instead do a separate check.

Stephan Szabo
This commit is contained in:
Bruce Momjian 2002-07-30 16:33:21 +00:00
parent 9f1fc1080e
commit 578e71fee5

View file

@ -17,7 +17,7 @@
*
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
*
* $Header: /cvsroot/pgsql/src/backend/utils/adt/ri_triggers.c,v 1.39 2002/06/21 02:59:38 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/utils/adt/ri_triggers.c,v 1.40 2002/07/30 16:33:21 momjian Exp $
*
* ----------
*/
@ -130,6 +130,10 @@ static void ri_BuildQueryKeyFull(RI_QueryKey *key, Oid constr_id,
int32 constr_queryno,
Relation fk_rel, Relation pk_rel,
int argc, char **argv);
static void ri_BuildQueryKeyPkCheck(RI_QueryKey *key, Oid constr_id,
int32 constr_queryno,
Relation pk_rel,
int argc, char **argv);
static bool ri_KeysEqual(Relation rel, HeapTuple oldtup, HeapTuple newtup,
RI_QueryKey *key, int pairidx);
static bool ri_AllKeysUnequal(Relation rel, HeapTuple oldtup, HeapTuple newtup,
@ -137,12 +141,13 @@ static bool ri_AllKeysUnequal(Relation rel, HeapTuple oldtup, HeapTuple newtup,
static bool ri_OneKeyEqual(Relation rel, int column, HeapTuple oldtup,
HeapTuple newtup, RI_QueryKey *key, int pairidx);
static bool ri_AttributesEqual(Oid typeid, Datum oldvalue, Datum newvalue);
static bool ri_Check_Pk_Match(Relation pk_rel, HeapTuple old_row,
Oid tgoid, int match_type, int tgnargs, char **tgargs);
static void ri_InitHashTables(void);
static void *ri_FetchPreparedPlan(RI_QueryKey *key);
static void ri_HashPreparedPlan(RI_QueryKey *key, void *plan);
/* ----------
* RI_FKey_check -
*
@ -385,6 +390,7 @@ RI_FKey_check(PG_FUNCTION_ARGS)
if (SPI_connect() != SPI_OK_CONNECT)
elog(WARNING, "SPI_connect() failed in RI_FKey_check()");
/*
* Fetch or prepare a saved plan for the real check
*/
@ -512,6 +518,161 @@ RI_FKey_check_upd(PG_FUNCTION_ARGS)
}
/* ----------
* ri_Check_Pk_Match
*
* Check for matching value of old pk row in current state for
* noaction triggers. Returns false if no row was found and a fk row
* could potentially be referencing this row, true otherwise.
* ----------
*/
static bool
ri_Check_Pk_Match(Relation pk_rel, HeapTuple old_row, Oid tgoid, int match_type, int tgnargs, char **tgargs) {
void *qplan;
RI_QueryKey qkey;
bool isnull;
Datum check_values[RI_MAX_NUMKEYS];
char check_nulls[RI_MAX_NUMKEYS + 1];
int i;
Oid save_uid;
bool result;
save_uid = GetUserId();
ri_BuildQueryKeyPkCheck(&qkey, tgoid,
RI_PLAN_CHECK_LOOKUPPK, pk_rel,
tgnargs, tgargs);
switch (ri_NullCheck(pk_rel, old_row, &qkey, RI_KEYPAIR_PK_IDX))
{
case RI_KEYS_ALL_NULL:
/*
* No check - nothing could have been referencing this row anyway.
*/
return true;
case RI_KEYS_SOME_NULL:
/*
* This is the only case that differs between the three kinds
* of MATCH.
*/
switch (match_type)
{
case RI_MATCH_TYPE_FULL:
case RI_MATCH_TYPE_UNSPECIFIED:
/*
* MATCH <unspecified>/FULL - if ANY column is null, we
* can't be matching to this row already.
*/
return true;
case RI_MATCH_TYPE_PARTIAL:
/*
* MATCH PARTIAL - all non-null columns must match.
* (not implemented, can be done by modifying the
* query below to only include non-null columns, or by
* writing a special version here)
*/
elog(ERROR, "MATCH PARTIAL not yet implemented");
break;
}
case RI_KEYS_NONE_NULL:
/*
* Have a full qualified key - continue below for all three
* kinds of MATCH.
*/
break;
}
if (SPI_connect() != SPI_OK_CONNECT)
elog(WARNING, "SPI_connect() failed in RI_FKey_check()");
/*
* Fetch or prepare a saved plan for the real check
*/
if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
{
char querystr[MAX_QUOTED_REL_NAME_LEN + 100 +
(MAX_QUOTED_NAME_LEN + 32) * RI_MAX_NUMKEYS];
char pkrelname[MAX_QUOTED_REL_NAME_LEN];
char attname[MAX_QUOTED_NAME_LEN];
const char *querysep;
Oid queryoids[RI_MAX_NUMKEYS];
/* ----------
* The query string built is
* SELECT 1 FROM ONLY <pktable> WHERE pkatt1 = $1 [AND ...]
* The type id's for the $ parameters are those of the
* corresponding FK attributes. Thus, SPI_prepare could
* eventually fail if the parser cannot identify some way
* how to compare these two types by '='.
* ----------
*/
quoteRelationName(pkrelname, pk_rel);
sprintf(querystr, "SELECT 1 FROM ONLY %s x", pkrelname);
querysep = "WHERE";
for (i = 0; i < qkey.nkeypairs; i++)
{
quoteOneName(attname,
tgargs[RI_FIRST_ATTNAME_ARGNO + i * 2 + RI_KEYPAIR_PK_IDX]);
sprintf(querystr + strlen(querystr), " %s %s = $%d",
querysep, attname, i+1);
querysep = "AND";
queryoids[i] = SPI_gettypeid(pk_rel->rd_att,
qkey.keypair[i][RI_KEYPAIR_PK_IDX]);
}
strcat(querystr, " FOR UPDATE OF x");
/*
* Prepare, save and remember the new plan.
*/
qplan = SPI_prepare(querystr, qkey.nkeypairs, queryoids);
qplan = SPI_saveplan(qplan);
ri_HashPreparedPlan(&qkey, qplan);
}
/*
* We have a plan now. Build up the arguments for SPI_execp() from the
* key values in the new FK tuple.
*/
for (i = 0; i < qkey.nkeypairs; i++)
{
check_values[i] = SPI_getbinval(old_row,
pk_rel->rd_att,
qkey.keypair[i][RI_KEYPAIR_PK_IDX],
&isnull);
if (isnull)
check_nulls[i] = 'n';
else
check_nulls[i] = ' ';
}
check_nulls[i] = '\0';
/*
* Now check that foreign key exists in PK table
*/
SetUserId(RelationGetForm(pk_rel)->relowner);
if (SPI_execp(qplan, check_values, check_nulls, 1) != SPI_OK_SELECT)
elog(ERROR, "SPI_execp() failed in ri_Check_Pk_Match()");
SetUserId(save_uid);
result = (SPI_processed!=0);
if (SPI_finish() != SPI_OK_FINISH)
elog(WARNING, "SPI_finish() failed in ri_Check_Pk_Match()");
return result;
}
/* ----------
* RI_FKey_noaction_del -
*
@ -535,6 +696,7 @@ RI_FKey_noaction_del(PG_FUNCTION_ARGS)
char del_nulls[RI_MAX_NUMKEYS + 1];
bool isnull;
int i;
int match_type;
Oid save_uid;
save_uid = GetUserId();
@ -581,7 +743,18 @@ RI_FKey_noaction_del(PG_FUNCTION_ARGS)
pk_rel = trigdata->tg_relation;
old_row = trigdata->tg_trigtuple;
switch (ri_DetermineMatchType(tgargs[RI_MATCH_TYPE_ARGNO]))
match_type = ri_DetermineMatchType(tgargs[RI_MATCH_TYPE_ARGNO]);
if (ri_Check_Pk_Match(pk_rel, old_row, trigdata->tg_trigger->tgoid,
match_type, tgnargs, tgargs)) {
/*
* There's either another row, or no row could match this
* one. In either case, we don't need to do the check.
*/
heap_close(fk_rel, RowShareLock);
return PointerGetDatum(NULL);
}
switch (match_type)
{
/* ----------
* SQL3 11.9 <referential constraint definition>
@ -746,6 +919,7 @@ RI_FKey_noaction_upd(PG_FUNCTION_ARGS)
char upd_nulls[RI_MAX_NUMKEYS + 1];
bool isnull;
int i;
int match_type;
Oid save_uid;
save_uid = GetUserId();
@ -793,7 +967,18 @@ RI_FKey_noaction_upd(PG_FUNCTION_ARGS)
new_row = trigdata->tg_newtuple;
old_row = trigdata->tg_trigtuple;
switch (ri_DetermineMatchType(tgargs[RI_MATCH_TYPE_ARGNO]))
match_type = ri_DetermineMatchType(tgargs[RI_MATCH_TYPE_ARGNO]);
if (ri_Check_Pk_Match(pk_rel, old_row, trigdata->tg_trigger->tgoid,
match_type, tgnargs, tgargs)) {
/*
* There's either another row, or no row could match this
* one. In either case, we don't need to do the check.
*/
heap_close(fk_rel, RowShareLock);
return PointerGetDatum(NULL);
}
switch (match_type)
{
/* ----------
* SQL3 11.9 <referential constraint definition>
@ -3042,6 +3227,59 @@ ri_BuildQueryKeyFull(RI_QueryKey *key, Oid constr_id, int32 constr_queryno,
}
}
/* ----------
* ri_BuildQueryKeyPkCheck -
*
* Build up a new hashtable key for a prepared SPI plan of a
* check for PK rows in noaction triggers.
*
* constr_type is FULL
* constr_id is the OID of the pg_trigger row that invoked us
* constr_queryno is an internal number of the query inside the proc
* pk_relid is the OID of referenced relation
* nkeypairs is the number of keypairs
* following are the attribute number keypairs of the trigger invocation
*
* At least for MATCH FULL this builds a unique key per plan.
* ----------
*/
static void
ri_BuildQueryKeyPkCheck(RI_QueryKey *key, Oid constr_id, int32 constr_queryno,
Relation pk_rel,
int argc, char **argv)
{
int i;
int j;
int fno;
/*
* Initialize the key and fill in type, oid's and number of keypairs
*/
memset((void *) key, 0, sizeof(RI_QueryKey));
key->constr_type = RI_MATCH_TYPE_FULL;
key->constr_id = constr_id;
key->constr_queryno = constr_queryno;
key->fk_relid = 0;
key->pk_relid = pk_rel->rd_id;
key->nkeypairs = (argc - RI_FIRST_ATTNAME_ARGNO) / 2;
/*
* Lookup the attribute numbers of the arguments to the trigger call
* and fill in the keypairs.
*/
for (i = 0, j = RI_FIRST_ATTNAME_ARGNO + RI_KEYPAIR_PK_IDX; j < argc; i++, j += 2)
{
fno = SPI_fnumber(pk_rel->rd_att, argv[j]);
if (fno == SPI_ERROR_NOATTRIBUTE)
elog(ERROR, "constraint %s: table %s does not have an attribute %s",
argv[RI_CONSTRAINT_NAME_ARGNO],
RelationGetRelationName(pk_rel),
argv[j + 1]);
key->keypair[i][RI_KEYPAIR_PK_IDX] = fno;
key->keypair[i][RI_KEYPAIR_FK_IDX] = 0;
}
}
/* ----------
* ri_NullCheck -
@ -3378,3 +3616,4 @@ ri_AttributesEqual(Oid typeid, Datum oldvalue, Datum newvalue)
return DatumGetBool(FunctionCall2(&(entry->oprfmgrinfo),
oldvalue, newvalue));
}