diff options
Diffstat (limited to 'contrib/dblink/dblink.c')
-rw-r--r-- | contrib/dblink/dblink.c | 873 |
1 files changed, 370 insertions, 503 deletions
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index a8e9c5ab50e..acddd1d4698 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -4,8 +4,11 @@ * Functions returning results from a remote database * * Joe Conway <mail@joeconway.com> + * And contributors: + * Darko Prenosil <Darko.Prenosil@finteh.hr> + * Shridhar Daithankar <shridhar_daithankar@persistent.co.in> * - * Copyright (c) 2001, 2002 by PostgreSQL Global Development Group + * Copyright (c) 2001, 2002, 2003 by PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * * Permission to use, copy, modify, and distribute this software and its @@ -27,9 +30,7 @@ * */ #include "postgres.h" - #include "libpq-fe.h" - #include "fmgr.h" #include "funcapi.h" #include "access/tupdesc.h" @@ -51,13 +52,27 @@ #include "utils/array.h" #include "utils/lsyscache.h" #include "utils/syscache.h" +#include "utils/palloc.h" +#include "utils/dynahash.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" #include "dblink.h" +typedef struct remoteConn +{ + PGconn *con; /* Hold the remote connection */ + bool remoteTrFlag; /* Indicates whether or not a transaction + * on remote database is in progress*/ +} remoteConn; + /* * Internal declarations */ -static dblink_results *init_dblink_results(MemoryContext fn_mcxt); +static remoteConn *getConnectionByName(const char *name); +static HTAB *createConnHash(void); +static bool createNewConnection(const char *name,remoteConn *con); +static void deleteConnection(const char *name); static char **get_pkey_attnames(Oid relid, int16 *numatts); static char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); static char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals); @@ -67,17 +82,32 @@ static char *quote_ident_cstr(char *rawstr); static int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key); static HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals); static Oid get_relid_from_relname(text *relname_text); -static dblink_results *get_res_ptr(int32 res_id_index); -static void append_res_ptr(dblink_results * results); -static void remove_res_ptr(dblink_results * results); static TupleDesc pgresultGetTupleDesc(PGresult *res); static char *generate_relation_name(Oid relid); /* Global */ -List *res_id = NIL; -int res_id_index = 0; -PGconn *persistent_conn = NULL; +List *res_id = NIL; +int res_id_index = 0; +PGconn *persistent_conn = NULL; +static HTAB *remoteConnHash=NULL; + +/* +Following is list that holds multiple remote connections. +Calling convention of each dblink function changes to accept +connection name as the first parameter. The connection list is +much like ecpg e.g. a mapping between a name and a PGconn object. +*/ + +typedef struct remoteConnHashEnt +{ + char name[NAMEDATALEN]; + remoteConn *rcon; +} remoteConnHashEnt; + +/* initial number of connection hashes */ +#define NUMCONN 16 +/* general utility */ #define GET_TEXT(cstrp) DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(cstrp))) #define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp))) #define xpfree(var_) \ @@ -88,6 +118,41 @@ PGconn *persistent_conn = NULL; var_ = NULL; \ } \ } while (0) +#define DBLINK_RES_ERROR(p1, p2) \ + do { \ + msg = pstrdup(PQerrorMessage(conn)); \ + if (res) \ + PQclear(res); \ + elog(ERROR, "%s: %s: %s", p1, p2, msg); \ + } while (0) +#define DBLINK_CONN_NOT_AVAIL(p1) \ + do { \ + if(conname) \ + elog(ERROR, "%s: connection %s not available", p1, conname); \ + else \ + elog(ERROR, "%s: connection not available", p1); \ + } while (0) +#define DBLINK_GET_CONN(p1) \ + do { \ + char *conname_or_str = GET_STR(PG_GETARG_TEXT_P(0)); \ + rcon = getConnectionByName(conname_or_str); \ + if(rcon) \ + { \ + conn = rcon->con; \ + freeconn = false; \ + } \ + else \ + { \ + connstr = conname_or_str; \ + conn = PQconnectdb(connstr); \ + if (PQstatus(conn) == CONNECTION_BAD) \ + { \ + msg = pstrdup(PQerrorMessage(conn)); \ + PQfinish(conn); \ + elog(ERROR, "%s: connection error: %s", p1, msg); \ + } \ + } \ + } while (0) /* @@ -97,28 +162,52 @@ PG_FUNCTION_INFO_V1(dblink_connect); Datum dblink_connect(PG_FUNCTION_ARGS) { - char *connstr = GET_STR(PG_GETARG_TEXT_P(0)); + char *connstr = NULL; + char *connname = NULL; char *msg; - text *result_text; MemoryContext oldcontext; + PGconn *conn = NULL; + remoteConn *rcon = NULL; - if (persistent_conn != NULL) - PQfinish(persistent_conn); + if(PG_NARGS()==2) + { + connstr = GET_STR(PG_GETARG_TEXT_P(1)); + connname = GET_STR(PG_GETARG_TEXT_P(0)); + } + else if(PG_NARGS()==1) + connstr = GET_STR(PG_GETARG_TEXT_P(0)); oldcontext = MemoryContextSwitchTo(TopMemoryContext); - persistent_conn = PQconnectdb(connstr); + + if(connname) + rcon=(remoteConn *) palloc(sizeof(remoteConn)); + conn = PQconnectdb(connstr); + MemoryContextSwitchTo(oldcontext); - if (PQstatus(persistent_conn) == CONNECTION_BAD) + if (PQstatus(conn) == CONNECTION_BAD) { - msg = pstrdup(PQerrorMessage(persistent_conn)); - PQfinish(persistent_conn); - persistent_conn = NULL; + msg = pstrdup(PQerrorMessage(conn)); + PQfinish(conn); + if(rcon) + pfree(rcon); elog(ERROR, "dblink_connect: connection error: %s", msg); } - result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); - PG_RETURN_TEXT_P(result_text); + if(connname) + { + rcon->con = conn; + if(createNewConnection(connname, rcon) == false) + { + PQfinish(conn); + pfree(rcon); + elog(ERROR, "dblink_connect: cannot save named connection"); + } + } + else + persistent_conn = conn; + + PG_RETURN_TEXT_P(GET_TEXT("OK")); } /* @@ -128,15 +217,37 @@ PG_FUNCTION_INFO_V1(dblink_disconnect); Datum dblink_disconnect(PG_FUNCTION_ARGS) { - text *result_text; + char *str = NULL; + remoteConn *rcon = NULL; + PGconn *conn = NULL; + + if (PG_NARGS() ==1 ) + { + str = GET_STR(PG_GETARG_TEXT_P(0)); + rcon = getConnectionByName(str); + if (rcon) + conn = rcon->con; + } + else + conn = persistent_conn; - if (persistent_conn != NULL) - PQfinish(persistent_conn); + if (!conn) + { + if (str) + elog(ERROR,"dblink_disconnect: connection named \"%s\" not found", + str); + else + elog(ERROR,"dblink_disconnect: connection not found"); + } - persistent_conn = NULL; + PQfinish(conn); + if (rcon) + { + deleteConnection(str); + pfree(rcon); + } - result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); - PG_RETURN_TEXT_P(result_text); + PG_RETURN_TEXT_P(GET_TEXT("OK")); } /* @@ -149,27 +260,35 @@ dblink_open(PG_FUNCTION_ARGS) char *msg; PGresult *res = NULL; PGconn *conn = NULL; - text *result_text; - char *curname = GET_STR(PG_GETARG_TEXT_P(0)); - char *sql = GET_STR(PG_GETARG_TEXT_P(1)); + char *curname = NULL; + char *sql = NULL; + char *conname = NULL; StringInfo str = makeStringInfo(); + remoteConn *rcon = NULL; - if (persistent_conn != NULL) + if(PG_NARGS() == 2) + { + curname = GET_STR(PG_GETARG_TEXT_P(0)); + sql = GET_STR(PG_GETARG_TEXT_P(1)); conn = persistent_conn; - else - elog(ERROR, "dblink_open: no connection available"); + } + else if(PG_NARGS() == 3) + { + conname = GET_STR(PG_GETARG_TEXT_P(0)); + curname = GET_STR(PG_GETARG_TEXT_P(1)); + sql = GET_STR(PG_GETARG_TEXT_P(2)); + rcon = getConnectionByName(conname); + if (rcon) + conn = rcon->con; + } + + if (!conn) + DBLINK_CONN_NOT_AVAIL("dblink_open"); res = PQexec(conn, "BEGIN"); if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - msg = pstrdup(PQerrorMessage(conn)); - PQclear(res); + DBLINK_RES_ERROR("dblink_open", "begin error"); - PQfinish(conn); - persistent_conn = NULL; - - elog(ERROR, "dblink_open: begin error: %s", msg); - } PQclear(res); appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql); @@ -177,19 +296,11 @@ dblink_open(PG_FUNCTION_ARGS) if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) - { - msg = pstrdup(PQerrorMessage(conn)); - - PQclear(res); - - PQfinish(conn); - persistent_conn = NULL; + DBLINK_RES_ERROR("dblink_open", "sql error"); - elog(ERROR, "dblink: sql error: %s", msg); - } + PQclear(res); - result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); - PG_RETURN_TEXT_P(result_text); + PG_RETURN_TEXT_P(GET_TEXT("OK")); } /* @@ -201,49 +312,46 @@ dblink_close(PG_FUNCTION_ARGS) { PGconn *conn = NULL; PGresult *res = NULL; - char *curname = GET_STR(PG_GETARG_TEXT_P(0)); + char *curname = NULL; + char *conname = NULL; StringInfo str = makeStringInfo(); - text *result_text; char *msg; + remoteConn *rcon = NULL; - if (persistent_conn != NULL) + if (PG_NARGS() == 1) + { + curname = GET_STR(PG_GETARG_TEXT_P(0)); conn = persistent_conn; - else - elog(ERROR, "dblink_close: no connection available"); + } + else if (PG_NARGS()==2) + { + conname = GET_STR(PG_GETARG_TEXT_P(0)); + curname = GET_STR(PG_GETARG_TEXT_P(1)); + rcon = getConnectionByName(conname); + if(rcon) + conn = rcon->con; + } + + if (!conn) + DBLINK_CONN_NOT_AVAIL("dblink_close"); appendStringInfo(str, "CLOSE %s", curname); /* close the cursor */ res = PQexec(conn, str->data); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) - { - msg = pstrdup(PQerrorMessage(conn)); - PQclear(res); - - PQfinish(persistent_conn); - persistent_conn = NULL; - - elog(ERROR, "dblink_close: sql error: %s", msg); - } + DBLINK_RES_ERROR("dblink_close", "sql error"); PQclear(res); /* commit the transaction */ res = PQexec(conn, "COMMIT"); if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - msg = pstrdup(PQerrorMessage(conn)); - PQclear(res); - - PQfinish(persistent_conn); - persistent_conn = NULL; + DBLINK_RES_ERROR("dblink_close", "commit error"); - elog(ERROR, "dblink_close: commit error: %s", msg); - } PQclear(res); - result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum("OK"))); - PG_RETURN_TEXT_P(result_text); + PG_RETURN_TEXT_P(GET_TEXT("OK")); } /* @@ -262,6 +370,8 @@ dblink_fetch(PG_FUNCTION_ARGS) char *msg; PGresult *res = NULL; MemoryContext oldcontext; + char *conname = NULL; + remoteConn *rcon=NULL; /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) @@ -271,8 +381,28 @@ dblink_fetch(PG_FUNCTION_ARGS) Oid funcid = fcinfo->flinfo->fn_oid; PGconn *conn = NULL; StringInfo str = makeStringInfo(); - char *curname = GET_STR(PG_GETARG_TEXT_P(0)); - int howmany = PG_GETARG_INT32(1); + char *curname = NULL; + int howmany = 0; + + if (PG_NARGS() == 3) + { + conname = GET_STR(PG_GETARG_TEXT_P(0)); + curname = GET_STR(PG_GETARG_TEXT_P(1)); + howmany = PG_GETARG_INT32(2); + + rcon = getConnectionByName(conname); + if(rcon) + conn = rcon->con; + } + else if (PG_NARGS() == 2) + { + curname = GET_STR(PG_GETARG_TEXT_P(0)); + howmany = PG_GETARG_INT32(1); + conn = persistent_conn; + } + + if(!conn) + DBLINK_CONN_NOT_AVAIL("dblink_fetch"); /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); @@ -283,11 +413,6 @@ dblink_fetch(PG_FUNCTION_ARGS) */ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); - if (persistent_conn != NULL) - conn = persistent_conn; - else - elog(ERROR, "dblink_fetch: no connection available"); - appendStringInfo(str, "FETCH %d FROM %s", howmany, curname); res = PQexec(conn, str->data); @@ -295,19 +420,13 @@ dblink_fetch(PG_FUNCTION_ARGS) (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { - msg = pstrdup(PQerrorMessage(conn)); - PQclear(res); - - PQfinish(persistent_conn); - persistent_conn = NULL; - - elog(ERROR, "dblink_fetch: sql error: %s", msg); + DBLINK_RES_ERROR("dblink_fetch", "sql error"); } else if (PQresultStatus(res) == PGRES_COMMAND_OK) { /* cursor does not exist - closed already or bad name */ PQclear(res); - elog(ERROR, "dblink_fetch: cursor %s does not exist", curname); + elog(ERROR, "dblink_fetch: cursor not found: %s", curname); } funcctx->max_calls = PQntuples(res); @@ -380,8 +499,8 @@ dblink_fetch(PG_FUNCTION_ARGS) SRF_RETURN_NEXT(funcctx, result); } else -/* do when there is no more left */ { + /* do when there is no more left */ PQclear(res); SRF_RETURN_DONE(funcctx); } @@ -405,6 +524,7 @@ dblink_record(PG_FUNCTION_ARGS) bool is_sql_cmd = false; char *sql_cmd_status = NULL; MemoryContext oldcontext; + bool freeconn = true; /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) @@ -415,6 +535,8 @@ dblink_record(PG_FUNCTION_ARGS) PGconn *conn = NULL; char *connstr = NULL; char *sql = NULL; + char *conname = NULL; + remoteConn *rcon=NULL; /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); @@ -425,70 +547,51 @@ dblink_record(PG_FUNCTION_ARGS) */ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); - if (fcinfo->nargs == 2) + if (PG_NARGS() == 2) { - connstr = GET_STR(PG_GETARG_TEXT_P(0)); + DBLINK_GET_CONN("dblink"); sql = GET_STR(PG_GETARG_TEXT_P(1)); - - conn = PQconnectdb(connstr); - if (PQstatus(conn) == CONNECTION_BAD) - { - msg = pstrdup(PQerrorMessage(conn)); - PQfinish(conn); - elog(ERROR, "dblink: connection error: %s", msg); - } } - else if (fcinfo->nargs == 1) + else if (PG_NARGS() == 1) { + conn = persistent_conn; sql = GET_STR(PG_GETARG_TEXT_P(0)); - - if (persistent_conn != NULL) - conn = persistent_conn; - else - elog(ERROR, "dblink: no connection available"); } else elog(ERROR, "dblink: wrong number of arguments"); + if(!conn) + DBLINK_CONN_NOT_AVAIL("dblink_record"); + res = PQexec(conn, sql); if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) + DBLINK_RES_ERROR("dblink", "sql error"); + + if (PQresultStatus(res) == PGRES_COMMAND_OK) { - msg = pstrdup(PQerrorMessage(conn)); - PQclear(res); - PQfinish(conn); - if (fcinfo->nargs == 1) - persistent_conn = NULL; + is_sql_cmd = true; + + /* need a tuple descriptor representing one TEXT column */ + tupdesc = CreateTemplateTupleDesc(1, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", + TEXTOID, -1, 0, false); - elog(ERROR, "dblink: sql error: %s", msg); + /* + * and save a copy of the command status string to return + * as our result tuple + */ + sql_cmd_status = PQcmdStatus(res); + funcctx->max_calls = 1; } else - { - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - is_sql_cmd = true; - - /* need a tuple descriptor representing one TEXT column */ - tupdesc = CreateTemplateTupleDesc(1, false); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", - TEXTOID, -1, 0, false); - - /* - * and save a copy of the command status string to return - * as our result tuple - */ - sql_cmd_status = PQcmdStatus(res); - funcctx->max_calls = 1; - } - else - funcctx->max_calls = PQntuples(res); + funcctx->max_calls = PQntuples(res); - /* got results, keep track of them */ - funcctx->user_fctx = res; + /* got results, keep track of them */ + funcctx->user_fctx = res; - /* if needed, close the connection to the database and cleanup */ - if (fcinfo->nargs == 2) - PQfinish(conn); - } + /* if needed, close the connection to the database and cleanup */ + if (freeconn && PG_NARGS() == 2) + PQfinish(conn); /* fast track when no results */ if (funcctx->max_calls < 1) @@ -567,8 +670,8 @@ dblink_record(PG_FUNCTION_ARGS) SRF_RETURN_NEXT(funcctx, result); } else -/* do when there is no more left */ { + /* do when there is no more left */ PQclear(res); SRF_RETURN_DONE(funcctx); } @@ -583,272 +686,62 @@ dblink_exec(PG_FUNCTION_ARGS) { char *msg; PGresult *res = NULL; - char *sql_cmd_status = NULL; + text *sql_cmd_status = NULL; TupleDesc tupdesc = NULL; - text *result_text; PGconn *conn = NULL; char *connstr = NULL; char *sql = NULL; + char *conname = NULL; + remoteConn *rcon=NULL; + bool freeconn = true; - if (fcinfo->nargs == 2) + if (PG_NARGS() == 2) { - connstr = GET_STR(PG_GETARG_TEXT_P(0)); + DBLINK_GET_CONN("dblink_exec"); sql = GET_STR(PG_GETARG_TEXT_P(1)); - - conn = PQconnectdb(connstr); - if (PQstatus(conn) == CONNECTION_BAD) - { - msg = pstrdup(PQerrorMessage(conn)); - PQfinish(conn); - elog(ERROR, "dblink_exec: connection error: %s", msg); - } } - else if (fcinfo->nargs == 1) + else if (PG_NARGS() == 1) { + conn = persistent_conn; sql = GET_STR(PG_GETARG_TEXT_P(0)); - - if (persistent_conn != NULL) - conn = persistent_conn; - else - elog(ERROR, "dblink_exec: no connection available"); } else elog(ERROR, "dblink_exec: wrong number of arguments"); + if(!conn) + DBLINK_CONN_NOT_AVAIL("dblink_exec"); res = PQexec(conn, sql); - if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) + DBLINK_RES_ERROR("dblink_exec", "sql error"); + + if (PQresultStatus(res) == PGRES_COMMAND_OK) { - msg = pstrdup(PQerrorMessage(conn)); - PQclear(res); - PQfinish(conn); - if (fcinfo->nargs == 1) - persistent_conn = NULL; + /* need a tuple descriptor representing one TEXT column */ + tupdesc = CreateTemplateTupleDesc(1, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", + TEXTOID, -1, 0, false); - elog(ERROR, "dblink_exec: sql error: %s", msg); + /* + * and save a copy of the command status string to return as + * our result tuple + */ + sql_cmd_status = GET_TEXT(PQcmdStatus(res)); } else - { - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - /* need a tuple descriptor representing one TEXT column */ - tupdesc = CreateTemplateTupleDesc(1, false); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", - TEXTOID, -1, 0, false); + elog(ERROR, "dblink_exec: queries returning results not allowed"); - /* - * and save a copy of the command status string to return as - * our result tuple - */ - sql_cmd_status = PQcmdStatus(res); - } - else - elog(ERROR, "dblink_exec: queries returning results not allowed"); - } PQclear(res); /* if needed, close the connection to the database and cleanup */ - if (fcinfo->nargs == 2) + if (freeconn && fcinfo->nargs == 2) PQfinish(conn); - result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql_cmd_status))); - PG_RETURN_TEXT_P(result_text); + PG_RETURN_TEXT_P(sql_cmd_status); } -/* - * Note: this original version of dblink is DEPRECATED; - * it *will* be removed in favor of the new version on next release - */ -PG_FUNCTION_INFO_V1(dblink); -Datum -dblink(PG_FUNCTION_ARGS) -{ - PGconn *conn = NULL; - PGresult *res = NULL; - dblink_results *results; - char *optstr; - char *sqlstatement; - char *execstatement; - char *msg; - int ntuples = 0; - ReturnSetInfo *rsi; - - if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo)) - elog(ERROR, "dblink: function called in context that does not accept a set result"); - - optstr = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0)))); - sqlstatement = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1)))); - - if (fcinfo->flinfo->fn_extra == NULL) - { - - conn = PQconnectdb(optstr); - if (PQstatus(conn) == CONNECTION_BAD) - { - msg = pstrdup(PQerrorMessage(conn)); - PQfinish(conn); - elog(ERROR, "dblink: connection error: %s", msg); - } - - execstatement = (char *) palloc(strlen(sqlstatement) + 1); - if (execstatement != NULL) - { - strcpy(execstatement, sqlstatement); - strcat(execstatement, "\0"); - } - else - elog(ERROR, "dblink: insufficient memory"); - - res = PQexec(conn, execstatement); - if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) - { - msg = pstrdup(PQerrorMessage(conn)); - PQclear(res); - PQfinish(conn); - elog(ERROR, "dblink: sql error: %s", msg); - } - else - { - /* - * got results, start fetching them - */ - ntuples = PQntuples(res); - - /* - * increment resource index - */ - res_id_index++; - - results = init_dblink_results(fcinfo->flinfo->fn_mcxt); - results->tup_num = 0; - results->res_id_index = res_id_index; - results->res = res; - - /* - * Append node to res_id to hold pointer to results. Needed by - * dblink_tok to access the data - */ - append_res_ptr(results); - - /* - * save pointer to results for the next function manager call - */ - fcinfo->flinfo->fn_extra = (void *) results; - - /* close the connection to the database and cleanup */ - PQfinish(conn); - - rsi = (ReturnSetInfo *) fcinfo->resultinfo; - rsi->isDone = ExprMultipleResult; - - PG_RETURN_INT32(res_id_index); - } - } - else - { - /* - * check for more results - */ - results = fcinfo->flinfo->fn_extra; - - results->tup_num++; - res_id_index = results->res_id_index; - ntuples = PQntuples(results->res); - - if (results->tup_num < ntuples) - { - /* - * fetch them if available - */ - - rsi = (ReturnSetInfo *) fcinfo->resultinfo; - rsi->isDone = ExprMultipleResult; - - PG_RETURN_INT32(res_id_index); - } - else - { - /* - * or if no more, clean things up - */ - results = fcinfo->flinfo->fn_extra; - - remove_res_ptr(results); - PQclear(results->res); - pfree(results); - fcinfo->flinfo->fn_extra = NULL; - - rsi = (ReturnSetInfo *) fcinfo->resultinfo; - rsi->isDone = ExprEndResult; - - PG_RETURN_NULL(); - } - } - PG_RETURN_NULL(); -} - -/* - * Note: dblink_tok is DEPRECATED; - * it *will* be removed in favor of the new version on next release - * - * dblink_tok - * parse dblink output string - * return fldnum item (0 based) - * based on provided field separator - */ -PG_FUNCTION_INFO_V1(dblink_tok); -Datum -dblink_tok(PG_FUNCTION_ARGS) -{ - dblink_results *results; - int fldnum; - text *result_text; - char *result; - int nfields = 0; - int text_len = 0; - - results = get_res_ptr(PG_GETARG_INT32(0)); - if (results == NULL) - { - if (res_id != NIL) - { - freeList(res_id); - res_id = NIL; - res_id_index = 0; - } - - elog(ERROR, "dblink_tok: function called with invalid resource id"); - } - - fldnum = PG_GETARG_INT32(1); - if (fldnum < 0) - elog(ERROR, "dblink_tok: field number < 0 not permitted"); - - nfields = PQnfields(results->res); - if (fldnum > (nfields - 1)) - elog(ERROR, "dblink_tok: field number %d does not exist", fldnum); - - if (PQgetisnull(results->res, results->tup_num, fldnum) == 1) - PG_RETURN_NULL(); - else - { - text_len = PQgetlength(results->res, results->tup_num, fldnum); - - result = (char *) palloc(text_len + 1); - - if (result != NULL) - { - strcpy(result, PQgetvalue(results->res, results->tup_num, fldnum)); - strcat(result, "\0"); - } - else - elog(ERROR, "dblink: insufficient memory"); - - result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result))); - - PG_RETURN_TEXT_P(result_text); - } -} /* * dblink_get_pkey @@ -923,7 +816,7 @@ dblink_get_pkey(PG_FUNCTION_ARGS) funcctx->user_fctx = results; } else -/* fast track when no results */ + /* fast track when no results */ SRF_RETURN_DONE(funcctx); MemoryContextSwitchTo(oldcontext); @@ -965,37 +858,10 @@ dblink_get_pkey(PG_FUNCTION_ARGS) SRF_RETURN_NEXT(funcctx, result); } else -/* do when there is no more left */ - SRF_RETURN_DONE(funcctx); -} - -/* - * Note: dblink_last_oid is DEPRECATED; - * it *will* be removed on next release - * - * dblink_last_oid - * return last inserted oid - */ -PG_FUNCTION_INFO_V1(dblink_last_oid); -Datum -dblink_last_oid(PG_FUNCTION_ARGS) -{ - dblink_results *results; - - results = get_res_ptr(PG_GETARG_INT32(0)); - if (results == NULL) { - if (res_id != NIL) - { - freeList(res_id); - res_id = NIL; - res_id_index = 0; - } - - elog(ERROR, "dblink_tok: function called with invalid resource id"); + /* do when there is no more left */ + SRF_RETURN_DONE(funcctx); } - - PG_RETURN_OID(PQoidValue(results->res)); } @@ -1043,7 +909,6 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS) int i; char *ptr; char *sql; - text *sql_text; int16 typlen; bool typbyval; char typalign; @@ -1139,14 +1004,9 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS) sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals); /* - * Make it into TEXT for return to the client - */ - sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql))); - - /* * And send it */ - PG_RETURN_TEXT_P(sql_text); + PG_RETURN_TEXT_P(GET_TEXT(sql)); } @@ -1182,7 +1042,6 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS) int i; char *ptr; char *sql; - text *sql_text; int16 typlen; bool typbyval; char typalign; @@ -1247,14 +1106,9 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS) sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals); /* - * Make it into TEXT for return to the client - */ - sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql))); - - /* * And send it */ - PG_RETURN_TEXT_P(sql_text); + PG_RETURN_TEXT_P(GET_TEXT(sql)); } @@ -1299,7 +1153,6 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) int i; char *ptr; char *sql; - text *sql_text; int16 typlen; bool typbyval; char typalign; @@ -1395,14 +1248,9 @@ dblink_build_sql_update(PG_FUNCTION_ARGS) sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals); /* - * Make it into TEXT for return to the client - */ - sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql))); - - /* * And send it */ - PG_RETURN_TEXT_P(sql_text); + PG_RETURN_TEXT_P(GET_TEXT(sql)); } /* @@ -1415,10 +1263,7 @@ PG_FUNCTION_INFO_V1(dblink_current_query); Datum dblink_current_query(PG_FUNCTION_ARGS) { - text *result_text; - - result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(debug_query_string))); - PG_RETURN_TEXT_P(result_text); + PG_RETURN_TEXT_P(GET_TEXT(debug_query_string)); } @@ -1428,29 +1273,6 @@ dblink_current_query(PG_FUNCTION_ARGS) /* - * init_dblink_results - * - create an empty dblink_results data structure - */ -static dblink_results * -init_dblink_results(MemoryContext fn_mcxt) -{ - MemoryContext oldcontext; - dblink_results *retval; - - oldcontext = MemoryContextSwitchTo(fn_mcxt); - - retval = (dblink_results *) palloc0(sizeof(dblink_results)); - - retval->tup_num = -1; - retval->res_id_index = -1; - retval->res = NULL; - - MemoryContextSwitchTo(oldcontext); - - return retval; -} - -/* * get_pkey_attnames * * Get the primary key attnames for the given relation. @@ -1488,7 +1310,10 @@ get_pkey_attnames(Oid relid, int16 *numatts) /* we're only interested if it is the primary key */ if (index->indisprimary == TRUE) { - *numatts = index->indnatts; + i = 0; + while (index->indkey[i++] != 0) + (*numatts)++; + if (*numatts > 0) { result = (char **) palloc(*numatts * sizeof(char *)); @@ -1907,52 +1732,6 @@ get_relid_from_relname(text *relname_text) return relid; } -static dblink_results * -get_res_ptr(int32 res_id_index) -{ - List *ptr; - - /* - * short circuit empty list - */ - if (res_id == NIL) - return NULL; - - /* - * OK, should be good to go - */ - foreach(ptr, res_id) - { - dblink_results *this_res_id = (dblink_results *) lfirst(ptr); - - if (this_res_id->res_id_index == res_id_index) - return this_res_id; - } - return NULL; -} - -/* - * Add node to global List res_id - */ -static void -append_res_ptr(dblink_results * results) -{ - res_id = lappend(res_id, results); -} - -/* - * Remove node from global List - * using res_id_index - */ -static void -remove_res_ptr(dblink_results * results) -{ - res_id = lremove(results, res_id); - - if (res_id == NIL) - res_id_index = 0; -} - static TupleDesc pgresultGetTupleDesc(PGresult *res) { @@ -2039,3 +1818,91 @@ generate_relation_name(Oid relid) return result; } + + +static remoteConn * +getConnectionByName(const char *name) +{ + remoteConnHashEnt *hentry; + char key[NAMEDATALEN]; + + if(!remoteConnHash) + remoteConnHash=createConnHash(); + + MemSet(key, 0, NAMEDATALEN); + snprintf(key, NAMEDATALEN - 1, "%s", name); + hentry = (remoteConnHashEnt*) hash_search(remoteConnHash, + key, HASH_FIND, NULL); + + if(hentry) + return(hentry->rcon); + + return(NULL); +} + +static HTAB * +createConnHash(void) +{ + HASHCTL ctl; + HTAB *ptr; + + ctl.keysize = NAMEDATALEN; + ctl.entrysize = sizeof(remoteConnHashEnt); + + ptr=hash_create("Remote Con hash", NUMCONN, &ctl, HASH_ELEM); + + if(!ptr) + elog(ERROR,"Can not create connections hash table. Out of memory"); + + return(ptr); +} + +static bool +createNewConnection(const char *name, remoteConn *con) +{ + remoteConnHashEnt *hentry; + bool found; + char key[NAMEDATALEN]; + + if(!remoteConnHash) + remoteConnHash=createConnHash(); + + MemSet(key, 0, NAMEDATALEN); + snprintf(key, NAMEDATALEN - 1, "%s", name); + hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key, + HASH_ENTER, &found); + + if(!hentry) + elog(ERROR, "failed to create connection"); + + if(found) + { + elog(NOTICE, "cannot use a connection name more than once"); + return false; + } + + hentry->rcon = con; + strncpy(hentry->name, name, NAMEDATALEN - 1); + + return true; +} + +static void +deleteConnection(const char *name) +{ + remoteConnHashEnt *hentry; + bool found; + char key[NAMEDATALEN]; + + if(!remoteConnHash) + remoteConnHash=createConnHash(); + + MemSet(key, 0, NAMEDATALEN); + snprintf(key, NAMEDATALEN - 1, "%s", name); + + hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, + key, HASH_REMOVE, &found); + + if(!hentry) + elog(WARNING,"Trying to delete a connection that does not exist"); +} |