diff options
Diffstat (limited to 'contrib/dbmirror/pending.c')
-rw-r--r-- | contrib/dbmirror/pending.c | 445 |
1 files changed, 347 insertions, 98 deletions
diff --git a/contrib/dbmirror/pending.c b/contrib/dbmirror/pending.c index 4703d30f3c8..24fb71b9e2d 100644 --- a/contrib/dbmirror/pending.c +++ b/contrib/dbmirror/pending.c @@ -1,6 +1,7 @@ /**************************************************************************** * pending.c - * $PostgreSQL: pgsql/contrib/dbmirror/pending.c,v 1.15 2003/11/29 22:39:19 pgsql Exp $ + * $Id: pending.c,v 1.16 2004/02/17 03:34:35 momjian Exp $ + * $PostgreSQL: pgsql/contrib/dbmirror/pending.c,v 1.16 2004/02/17 03:34:35 momjian Exp $ * * This file contains a trigger for Postgresql-7.x to record changes to tables * to a pending table for mirroring. @@ -34,35 +35,60 @@ #include <executor/spi.h> #include <commands/trigger.h> #include <utils/lsyscache.h> +#include <utils/array.h> + enum FieldUsage { PRIMARY = 0, NONPRIMARY, ALL, NUM_FIELDUSAGE }; int storePending(char *cpTableName, HeapTuple tBeforeTuple, - HeapTuple tAfterTuple, - TupleDesc tTupdesc, - TriggerData *tpTrigdata, char cOp); + HeapTuple tAfterTuple, + TupleDesc tTupdesc, + Oid tableOid, + char cOp); + + int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc, - TriggerData *tpTrigdata); -int storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, - TriggerData *tpTrigData, int iIncludeKeyData); + Oid tableOid); +int storeData(char *cpTableName, HeapTuple tTupleData, + TupleDesc tTupleDesc,Oid tableOid,int iIncludeKeyData); int2vector *getPrimaryKey(Oid tblOid); -char *packageData(HeapTuple tTupleData, TupleDesc tTupleDecs, - TriggerData *tTrigData, +char *packageData(HeapTuple tTupleData, TupleDesc tTupleDecs, Oid tableOid, enum FieldUsage eKeyUsage); + #define BUFFER_SIZE 256 #define MAX_OID_LEN 10 -/*#define DEBUG_OUTPUT 1 */ +#define DEBUG_OUTPUT 1 extern Datum recordchange(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(recordchange); +#if defined DEBUG_OUTPUT +#define debug_msg2(x,y) elog(NOTICE,x,y) +#define debug_msg(x) elog(NOTICE,x) +#define debug_msg3(x,y,z) elog(NOTICE,x,y,z) +#else +#define debug_msg2(x,y) +#define debug_msg(x) +#define debug_msg(x,y,z) + +#endif + + + +extern Datum nextval(PG_FUNCTION_ARGS); +extern Datum setval(PG_FUNCTION_ARGS); + +int saveSequenceUpdate(const text * sequenceName, + int nextSequenceValue); + + /***************************************************************************** * The entry point for the trigger function. * The Trigger takes a single SQL 'text' argument indicating the name of the @@ -81,13 +107,15 @@ recordchange(PG_FUNCTION_ARGS) char op = 0; char *schemaname; char *fullyqualtblname; + char *pkxpress=NULL; if (fcinfo->context != NULL) { if (SPI_connect() < 0) { - elog(NOTICE, "storePending could not connect to SPI"); + ereport(ERROR,(errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("dbmirror:recordchange could not connect to SPI"))); return -1; } trigdata = (TriggerData *) fcinfo->context; @@ -124,8 +152,15 @@ recordchange(PG_FUNCTION_ARGS) beforeTuple = trigdata->tg_trigtuple; op = 'd'; } + else + { + ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), + errmsg("dbmirror:recordchange Unknown operation"))); + + } - if (storePending(fullyqualtblname, beforeTuple, afterTuple, tupdesc, trigdata, op)) + if (storePending(fullyqualtblname, beforeTuple, afterTuple, + tupdesc, retTuple->t_tableOid, op)) { /* An error occoured. Skip the operation. */ ereport(ERROR, @@ -135,10 +170,11 @@ recordchange(PG_FUNCTION_ARGS) return PointerGetDatum(NULL); } -#if defined DEBUG_OUTPUT - elog(NOTICE, "returning on success"); -#endif + debug_msg("dbmirror:recordchange returning on success"); + SPI_pfree(fullyqualtblname); + if(pkxpress != NULL) + SPI_pfree(pkxpress); SPI_finish(); return PointerGetDatum(retTuple); } @@ -160,41 +196,45 @@ int storePending(char *cpTableName, HeapTuple tBeforeTuple, HeapTuple tAfterTuple, TupleDesc tTupDesc, - TriggerData *tpTrigData, char cOp) + Oid tableOid, + char cOp) { - char *cpQueryBase = "INSERT INTO \"Pending\" (\"TableName\",\"Op\",\"XID\") VALUES ($1,$2,$3)"; + char *cpQueryBase = "INSERT INTO dbmirror_pending (TableName,Op,XID) VALUES ($1,$2,$3)"; int iResult = 0; HeapTuple tCurTuple; + char nulls[3]=" "; /* Points the current tuple(before or after) */ - Datum saPlanData[4]; - Oid taPlanArgTypes[3] = {NAMEOID, CHAROID, INT4OID}; + Datum saPlanData[3]; + Oid taPlanArgTypes[4] = {NAMEOID, + CHAROID, + INT4OID}; void *vpPlan; tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple; - vpPlan = SPI_prepare(cpQueryBase, 3, taPlanArgTypes); if (vpPlan == NULL) - elog(NOTICE, "error creating plan"); - /* SPI_saveplan(vpPlan); */ + ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION), + errmsg("dbmirror:storePending error creating plan"))); + saPlanData[0] = PointerGetDatum(cpTableName); saPlanData[1] = CharGetDatum(cOp); saPlanData[2] = Int32GetDatum(GetCurrentTransactionId()); - - iResult = SPI_execp(vpPlan, saPlanData, NULL, 1); + iResult = SPI_execp(vpPlan, saPlanData, nulls, 1); if (iResult < 0) - elog(NOTICE, "storedPending fired (%s) returned %d", cpQueryBase, iResult); + elog(NOTICE, "storedPending fired (%s) returned %d", + cpQueryBase, iResult); -#if defined DEBUG_OUTPUT - elog(NOTICE, "row successfully stored in pending table"); -#endif + + debug_msg("dbmirror:storePending row successfully stored in pending table"); + if (cOp == 'd') { @@ -202,7 +242,8 @@ storePending(char *cpTableName, HeapTuple tBeforeTuple, * This is a record of a delete operation. * Just store the key data. */ - iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tpTrigData); + iResult = storeKeyInfo(cpTableName, + tBeforeTuple, tTupDesc, tableOid); } else if (cOp == 'i') { @@ -210,20 +251,22 @@ storePending(char *cpTableName, HeapTuple tBeforeTuple, * An Insert operation. * Store all data */ - iResult = storeData(cpTableName, tAfterTuple, tTupDesc, tpTrigData, TRUE); + iResult = storeData(cpTableName, tAfterTuple, + tTupDesc, tableOid,TRUE); } else { /* op must be an update. */ - iResult = storeKeyInfo(cpTableName, tBeforeTuple, tTupDesc, tpTrigData); - iResult = iResult ? iResult : storeData(cpTableName, tAfterTuple, tTupDesc, - tpTrigData, TRUE); + iResult = storeKeyInfo(cpTableName, tBeforeTuple, + tTupDesc, tableOid); + iResult = iResult ? iResult : + storeData(cpTableName, tAfterTuple, tTupDesc, + tableOid,TRUE); } -#if defined DEBUG_OUTPUT - elog(NOTICE, "done storing keyinfo"); -#endif + + debug_msg("dbmirror:storePending done storing keyinfo"); return iResult; @@ -231,12 +274,11 @@ storePending(char *cpTableName, HeapTuple tBeforeTuple, int storeKeyInfo(char *cpTableName, HeapTuple tTupleData, - TupleDesc tTupleDesc, - TriggerData *tpTrigData) + TupleDesc tTupleDesc, Oid tableOid) { Oid saPlanArgTypes[1] = {NAMEOID}; - char *insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'t',$1)"; + char *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'t',$1)"; void *pplan; Datum saPlanData[1]; char *cpKeyData; @@ -250,7 +292,7 @@ storeKeyInfo(char *cpTableName, HeapTuple tTupleData, } /* pplan = SPI_saveplan(pplan); */ - cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, PRIMARY); + cpKeyData = packageData(tTupleData, tTupleDesc,tableOid, PRIMARY); if (cpKeyData == NULL) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), @@ -258,9 +300,9 @@ storeKeyInfo(char *cpTableName, HeapTuple tTupleData, errmsg("there is no PRIMARY KEY for table %s", cpTableName))); -#if defined DEBUG_OUTPUT - elog(NOTICE, "key data: %s", cpKeyData); -#endif + + debug_msg2("dbmirror:storeKeyInfo key data: %s", cpKeyData); + saPlanData[0] = PointerGetDatum(cpKeyData); iRetCode = SPI_execp(pplan, saPlanData, NULL, 1); @@ -270,12 +312,12 @@ storeKeyInfo(char *cpTableName, HeapTuple tTupleData, if (iRetCode != SPI_OK_INSERT) { - elog(NOTICE, "error inserting row in pendingDelete"); + ereport(ERROR,(errcode(ERRCODE_TRIGGERED_ACTION_EXCEPTION) + ,errmsg("error inserting row in pendingDelete"))); return -1; } -#if defined DEBUG_OUTPUT - elog(NOTICE, "insert successful"); -#endif + + debug_msg("insert successful"); return 0; @@ -318,12 +360,12 @@ getPrimaryKey(Oid tblOid) * Stores a copy of the non-key data for the row. *****************************************************************************/ int -storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, - TriggerData *tpTrigData, int iIncludeKeyData) +storeData(char *cpTableName, HeapTuple tTupleData, + TupleDesc tTupleDesc,Oid tableOid, int iIncludeKeyData) { Oid planArgTypes[1] = {NAMEOID}; - char *insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'f',$1)"; + char *insQuery = "INSERT INTO dbmirror_pendingdata (SeqId,IsKey,Data) VALUES(currval('dbmirror_pending_seqid_seq'),'f',$1)"; void *pplan; Datum planData[1]; char *cpKeyData; @@ -338,9 +380,10 @@ storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, /* pplan = SPI_saveplan(pplan); */ if (iIncludeKeyData == 0) - cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, NONPRIMARY); + cpKeyData = packageData(tTupleData, tTupleDesc, + tableOid, NONPRIMARY); else - cpKeyData = packageData(tTupleData, tTupleDesc, tpTrigData, ALL); + cpKeyData = packageData(tTupleData, tTupleDesc,tableOid, ALL); planData[0] = PointerGetDatum(cpKeyData); iRetValue = SPI_execp(pplan, planData, NULL, 1); @@ -353,9 +396,9 @@ storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, elog(NOTICE, "error inserting row in pendingDelete"); return -1; } -#if defined DEBUG_OUTPUT - elog(NOTICE, "insert successful"); -#endif + + debug_msg("dbmirror:storeKeyData insert successful"); + return 0; @@ -376,8 +419,7 @@ storeData(char *cpTableName, HeapTuple tTupleData, TupleDesc tTupleDesc, * ALL implies include all fields. */ char * -packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, - TriggerData *tpTrigData, +packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, Oid tableOid, enum FieldUsage eKeyUsage) { int iNumCols; @@ -391,14 +433,17 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, if (eKeyUsage != ALL) { - tpPKeys = getPrimaryKey(tpTrigData->tg_relation->rd_id); + tpPKeys = getPrimaryKey(tableOid); if (tpPKeys == NULL) return NULL; } -#if defined DEBUG_OUTPUT + if (tpPKeys != NULL) - elog(NOTICE, "have primary keys"); -#endif + { + debug_msg("dbmirror:packageData have primary keys"); + + } + cpDataBlock = SPI_palloc(BUFFER_SIZE); iDataBlockSize = BUFFER_SIZE; iUsedDataBlock = 0; /* To account for the null */ @@ -417,49 +462,58 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, { /* Determine if this is a primary key or not. */ iIsPrimaryKey = 0; - for (iPrimaryKeyIndex = 0; (*tpPKeys)[iPrimaryKeyIndex] != 0; + for (iPrimaryKeyIndex = 0; + (*tpPKeys)[iPrimaryKeyIndex] != 0; iPrimaryKeyIndex++) { - if ((*tpPKeys)[iPrimaryKeyIndex] == iColumnCounter) + if ((*tpPKeys)[iPrimaryKeyIndex] + == iColumnCounter) { iIsPrimaryKey = 1; break; } } - if (iIsPrimaryKey ? (eKeyUsage != PRIMARY) : (eKeyUsage != NONPRIMARY)) + if (iIsPrimaryKey ? (eKeyUsage != PRIMARY) : + (eKeyUsage != NONPRIMARY)) { /** * Don't use. */ -#if defined DEBUG_OUTPUT - elog(NOTICE, "skipping column"); -#endif + + debug_msg("dbmirror:packageData skipping column"); + continue; } } /* KeyUsage!=ALL */ -#ifndef NODROPCOLUMN - if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped) - { - /** - * This column has been dropped. - * Do not mirror it. - */ - continue; - } -#endif - cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs - [iColumnCounter - 1]->attname)); -#if defined DEBUG_OUTPUT - elog(NOTICE, "field name: %s", cpFieldName); -#endif - while (iDataBlockSize - iUsedDataBlock < strlen(cpFieldName) + 6) + + if(tTupleDesc->attrs[iColumnCounter-1]->attisdropped) + { + /** + * This column has been dropped. + * Do not mirror it. + */ + continue; + } + + cpFieldName = DatumGetPointer(NameGetDatum + + (&tTupleDesc->attrs + [iColumnCounter - 1]->attname)); + + debug_msg2("dbmirror:packageData field name: %s", cpFieldName); + + while (iDataBlockSize - iUsedDataBlock < + strlen(cpFieldName) + 6) { - cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE); + cpDataBlock = SPI_repalloc(cpDataBlock, + iDataBlockSize + + BUFFER_SIZE); iDataBlockSize = iDataBlockSize + BUFFER_SIZE; } sprintf(cpDataBlock + iUsedDataBlock, "\"%s\"=", cpFieldName); iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName) + 3; - cpFieldData = SPI_getvalue(tTupleData, tTupleDesc, iColumnCounter); + cpFieldData = SPI_getvalue(tTupleData, tTupleDesc, + iColumnCounter); cpUnFormatedPtr = cpFieldData; cpFormatedPtr = cpDataBlock + iUsedDataBlock; @@ -477,15 +531,17 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, continue; } -#if defined DEBUG_OUTPUT - elog(NOTICE, "field data: \"%s\"", cpFieldData); - elog(NOTICE, "starting format loop"); -#endif + debug_msg2("dbmirror:packageData field data: \"%s\"", + cpFieldData); + debug_msg("dbmirror:packageData starting format loop"); + while (*cpUnFormatedPtr != 0) { while (iDataBlockSize - iUsedDataBlock < 2) { - cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE); + cpDataBlock = SPI_repalloc(cpDataBlock, + iDataBlockSize + + BUFFER_SIZE); iDataBlockSize = iDataBlockSize + BUFFER_SIZE; cpFormatedPtr = cpDataBlock + iUsedDataBlock; } @@ -505,25 +561,218 @@ packageData(HeapTuple tTupleData, TupleDesc tTupleDesc, while (iDataBlockSize - iUsedDataBlock < 3) { - cpDataBlock = SPI_repalloc(cpDataBlock, iDataBlockSize + BUFFER_SIZE); + cpDataBlock = SPI_repalloc(cpDataBlock, + iDataBlockSize + + BUFFER_SIZE); iDataBlockSize = iDataBlockSize + BUFFER_SIZE; cpFormatedPtr = cpDataBlock + iUsedDataBlock; } sprintf(cpFormatedPtr, "' "); iUsedDataBlock = iUsedDataBlock + 2; -#if defined DEBUG_OUTPUT - elog(NOTICE, "data block: \"%s\"", cpDataBlock); -#endif + + debug_msg2("dbmirror:packageData data block: \"%s\"", + cpDataBlock); } /* for iColumnCounter */ if (tpPKeys != NULL) SPI_pfree(tpPKeys); -#if defined DEBUG_OUTPUT - elog(NOTICE, "returning DataBlockSize:%d iUsedDataBlock:%d", iDataBlockSize, - iUsedDataBlock); -#endif + + debug_msg3("dbmirror:packageData returning DataBlockSize:%d iUsedDataBlock:%d", + iDataBlockSize, + iUsedDataBlock); + memset(cpDataBlock + iUsedDataBlock, 0, iDataBlockSize - iUsedDataBlock); return cpDataBlock; } + + +PG_FUNCTION_INFO_V1(setval); + +Datum setval(PG_FUNCTION_ARGS) +{ + + + text * sequenceName; + + Oid setvalArgTypes[2] = {TEXTOID,INT4OID}; + int nextValue; + void * setvalPlan=NULL; + Datum setvalData[2]; + const char * setvalQuery = "SELECT setval_pg($1,$2)"; + int ret; + + sequenceName = PG_GETARG_TEXT_P(0); + nextValue = PG_GETARG_INT32(1); + + setvalData[0] = PointerGetDatum(sequenceName); + setvalData[1] = Int32GetDatum(nextValue); + + if (SPI_connect() < 0) + { + ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("dbmirror:setval could not connect to SPI"))); + return -1; + } + + setvalPlan = SPI_prepare(setvalQuery,2,setvalArgTypes); + if(setvalPlan == NULL) + { + ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("dbmirror:setval could not prepare plan"))); + return -1; + } + + ret = SPI_execp(setvalPlan,setvalData,NULL,1); + + if(ret != SPI_OK_SELECT || SPI_processed != 1) + return -1; + + debug_msg2("dbmirror:setval: setval_pg returned ok:%d",nextValue); + + ret = saveSequenceUpdate(sequenceName,nextValue); + + SPI_pfree(setvalPlan); + + SPI_finish(); + debug_msg("dbmirror:setval about to return"); + return Int64GetDatum(nextValue); + +} + + + +PG_FUNCTION_INFO_V1(nextval); + +Datum +nextval(PG_FUNCTION_ARGS) +{ + text * sequenceName; + + const char * nextvalQuery = "SELECT nextval_pg($1)"; + Oid nextvalArgTypes[1] = {TEXTOID}; + void * nextvalPlan=NULL; + Datum nextvalData[1]; + + + int ret; + HeapTuple resTuple; + char isNull; + int nextSequenceValue; + + + + debug_msg("dbmirror:nextval Starting pending.so:nextval"); + + + sequenceName = PG_GETARG_TEXT_P(0); + + if (SPI_connect() < 0) + { + ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("dbmirror:nextval could not connect to SPI"))); + return -1; + } + + nextvalPlan = SPI_prepare(nextvalQuery,1,nextvalArgTypes); + + + debug_msg("prepared plan to call nextval_pg"); + + + if(nextvalPlan==NULL) + { + ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("dbmirror:nextval error creating plan"))); + return -1; + } + nextvalData[0] = PointerGetDatum(sequenceName); + + ret = SPI_execp(nextvalPlan,nextvalData,NULL,1); + + debug_msg("dbmirror:Executed call to nextval_pg"); + + + if(ret != SPI_OK_SELECT || SPI_processed != 1) + return -1; + + resTuple = SPI_tuptable->vals[0]; + + debug_msg("dbmirror:nextval Set resTuple"); + + nextSequenceValue =*(DatumGetPointer(SPI_getbinval(resTuple, + SPI_tuptable->tupdesc, + 1,&isNull))); + + + + debug_msg2("dbmirror:nextval Set SPI_getbinval:%d",nextSequenceValue); + + + saveSequenceUpdate(sequenceName,nextSequenceValue); + SPI_pfree(resTuple); + SPI_pfree(nextvalPlan); + + SPI_finish(); + + return Int64GetDatum(nextSequenceValue); +} + + +int +saveSequenceUpdate(const text * sequenceName, + int nextSequenceVal) +{ + + Oid insertArgTypes[2] = {TEXTOID,INT4OID}; + Oid insertDataArgTypes[1] = {NAMEOID}; + void * insertPlan=NULL; + void * insertDataPlan=NULL; + Datum insertDatum[2]; + Datum insertDataDatum[1]; + char nextSequenceText[32]; + + const char * insertQuery = + "INSERT INTO dbmirror_Pending (TableName,Op,XID) VALUES" \ + "($1,'s',$2)"; + const char * insertDataQuery = + "INSERT INTO dbmirror_PendingData(SeqId,IsKey,Data) VALUES " \ + "(currval('dbmirror_pending_seqid_seq'),'t',$1)"; + + int ret; + + + insertPlan = SPI_prepare(insertQuery,2,insertArgTypes); + insertDataPlan = SPI_prepare(insertDataQuery,1,insertDataArgTypes); + + debug_msg("Prepared insert query"); + + + if(insertPlan == NULL || insertDataPlan == NULL) + { + ereport(ERROR,(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),errmsg("dbmirror:nextval error creating plan"))); + } + + insertDatum[1] = Int32GetDatum(GetCurrentTransactionId()); + insertDatum[0] = PointerGetDatum(sequenceName); + + sprintf(nextSequenceText,"%d",nextSequenceVal); + insertDataDatum[0] = PointerGetDatum(nextSequenceText); + debug_msg2("dbmirror:savesequenceupdate: Setting value %s", + nextSequenceText); + + debug_msg("dbmirror:About to execute insert query"); + + ret = SPI_execp(insertPlan,insertDatum,NULL,1); + + ret = SPI_execp(insertDataPlan,insertDataDatum,NULL,1); + + debug_msg("dbmirror:Insert query finished"); + SPI_pfree(insertPlan); + SPI_pfree(insertDataPlan); + + return ret; + +} + |