diff options
Diffstat (limited to 'contrib/dblink/dblink.c')
-rw-r--r-- | contrib/dblink/dblink.c | 404 |
1 files changed, 309 insertions, 95 deletions
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 3405ddeaa1b..7a46673b6b1 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -8,7 +8,7 @@ * Darko Prenosil <Darko.Prenosil@finteh.hr> * Shridhar Daithankar <shridhar_daithankar@persistent.co.in> * - * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.57 2006/07/11 16:35:31 momjian Exp $ + * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.58 2006/09/02 21:11:15 joe Exp $ * Copyright (c) 2001-2006, PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * @@ -73,6 +73,7 @@ typedef struct remoteConn /* * Internal declarations */ +static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get); static remoteConn *getConnectionByName(const char *name); static HTAB *createConnHash(void); static void createNewConnection(const char *name, remoteConn * rconn); @@ -691,6 +692,26 @@ PG_FUNCTION_INFO_V1(dblink_record); Datum dblink_record(PG_FUNCTION_ARGS) { + return dblink_record_internal(fcinfo, false, false); +} + +PG_FUNCTION_INFO_V1(dblink_send_query); +Datum +dblink_send_query(PG_FUNCTION_ARGS) +{ + return dblink_record_internal(fcinfo, true, false); +} + +PG_FUNCTION_INFO_V1(dblink_get_result); +Datum +dblink_get_result(PG_FUNCTION_ARGS) +{ + return dblink_record_internal(fcinfo, true, true); +} + +static Datum +dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get) +{ FuncCallContext *funcctx; TupleDesc tupdesc = NULL; int call_cntr; @@ -723,128 +744,187 @@ dblink_record(PG_FUNCTION_ARGS) */ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); - if (PG_NARGS() == 3) + if (!is_async) { - /* text,text,bool */ - DBLINK_GET_CONN; - sql = GET_STR(PG_GETARG_TEXT_P(1)); - fail = PG_GETARG_BOOL(2); - } - else if (PG_NARGS() == 2) - { - /* text,text or text,bool */ - if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) + if (PG_NARGS() == 3) { + /* text,text,bool */ + DBLINK_GET_CONN; + sql = GET_STR(PG_GETARG_TEXT_P(1)); + fail = PG_GETARG_BOOL(2); + } + else if (PG_NARGS() == 2) + { + /* text,text or text,bool */ + if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) + { + conn = pconn->conn; + sql = GET_STR(PG_GETARG_TEXT_P(0)); + fail = PG_GETARG_BOOL(1); + } + else + { + DBLINK_GET_CONN; + sql = GET_STR(PG_GETARG_TEXT_P(1)); + } + } + else if (PG_NARGS() == 1) + { + /* text */ conn = pconn->conn; sql = GET_STR(PG_GETARG_TEXT_P(0)); - fail = PG_GETARG_BOOL(1); } else + /* shouldn't happen */ + elog(ERROR, "wrong number of arguments"); + } + else if (is_async && do_get) + { + /* get async result */ + if (PG_NARGS() == 2) { + /* text,bool */ DBLINK_GET_CONN; - sql = GET_STR(PG_GETARG_TEXT_P(1)); + fail = PG_GETARG_BOOL(2); } + else if (PG_NARGS() == 1) + { + /* text */ + DBLINK_GET_CONN; + } + else + /* shouldn't happen */ + elog(ERROR, "wrong number of arguments"); } - else if (PG_NARGS() == 1) + else { - /* text */ - conn = pconn->conn; - sql = GET_STR(PG_GETARG_TEXT_P(0)); + /* send async query */ + if (PG_NARGS() == 2) + { + DBLINK_GET_CONN; + sql = GET_STR(PG_GETARG_TEXT_P(1)); + } + else + /* shouldn't happen */ + elog(ERROR, "wrong number of arguments"); } - else - /* shouldn't happen */ - elog(ERROR, "wrong number of arguments"); if (!conn) DBLINK_CONN_NOT_AVAIL; - res = PQexec(conn, sql); - if (!res || - (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK)) + if (!is_async || (is_async && do_get)) { - if (fail) - DBLINK_RES_ERROR("sql error"); + /* synchronous query, or async result retrieval */ + if (!is_async) + res = PQexec(conn, sql); else { - DBLINK_RES_ERROR_AS_NOTICE("sql error"); - if (freeconn) - PQfinish(conn); - SRF_RETURN_DONE(funcctx); + res = PQgetResult(conn); + /* NULL means we're all done with the async results */ + if (!res) + SRF_RETURN_DONE(funcctx); } - } - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - is_sql_cmd = true; - - /* need a tuple descriptor representing one TEXT column */ - tupdesc = CreateTemplateTupleDesc(1, false); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", - TEXTOID, -1, 0); - - /* - * and save a copy of the command status string to return as our - * result tuple - */ - sql_cmd_status = PQcmdStatus(res); - funcctx->max_calls = 1; - } - else - funcctx->max_calls = PQntuples(res); - - /* got results, keep track of them */ - funcctx->user_fctx = res; - - /* if needed, close the connection to the database and cleanup */ - if (freeconn) - PQfinish(conn); - - if (!is_sql_cmd) - { - /* get a tuple descriptor for our result type */ - switch (get_call_result_type(fcinfo, NULL, &tupdesc)) + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) { - case TYPEFUNC_COMPOSITE: - /* success */ - break; - case TYPEFUNC_RECORD: - /* failed to determine actual type of RECORD */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("function returning record called in context " - "that cannot accept type record"))); - break; - default: - /* result type isn't composite */ - elog(ERROR, "return type must be a row type"); - break; + if (fail) + DBLINK_RES_ERROR("sql error"); + else + { + DBLINK_RES_ERROR_AS_NOTICE("sql error"); + if (freeconn) + PQfinish(conn); + SRF_RETURN_DONE(funcctx); + } } - - /* make sure we have a persistent copy of the tupdesc */ - tupdesc = CreateTupleDescCopy(tupdesc); + + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + is_sql_cmd = true; + + /* need a tuple descriptor representing one TEXT column */ + tupdesc = CreateTemplateTupleDesc(1, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", + TEXTOID, -1, 0); + + /* + * and save a copy of the command status string to return as our + * result tuple + */ + sql_cmd_status = PQcmdStatus(res); + funcctx->max_calls = 1; + } + else + funcctx->max_calls = PQntuples(res); + + /* got results, keep track of them */ + funcctx->user_fctx = res; + + /* if needed, close the connection to the database and cleanup */ + if (freeconn) + PQfinish(conn); + + if (!is_sql_cmd) + { + /* get a tuple descriptor for our result type */ + switch (get_call_result_type(fcinfo, NULL, &tupdesc)) + { + case TYPEFUNC_COMPOSITE: + /* success */ + break; + case TYPEFUNC_RECORD: + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + break; + default: + /* result type isn't composite */ + elog(ERROR, "return type must be a row type"); + break; + } + + /* make sure we have a persistent copy of the tupdesc */ + tupdesc = CreateTupleDescCopy(tupdesc); + } + + /* check result and tuple descriptor have the same number of columns */ + if (PQnfields(res) != tupdesc->natts) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + + /* fast track when no results */ + if (funcctx->max_calls < 1) + { + if (res) + PQclear(res); + SRF_RETURN_DONE(funcctx); + } + + /* store needed metadata for subsequent calls */ + attinmeta = TupleDescGetAttInMetadata(tupdesc); + funcctx->attinmeta = attinmeta; + + MemoryContextSwitchTo(oldcontext); } - - /* check result and tuple descriptor have the same number of columns */ - if (PQnfields(res) != tupdesc->natts) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("remote query result rowtype does not match " - "the specified FROM clause rowtype"))); - - /* fast track when no results */ - if (funcctx->max_calls < 1) + else { - if (res) - PQclear(res); - SRF_RETURN_DONE(funcctx); + /* async query send */ + MemoryContextSwitchTo(oldcontext); + PG_RETURN_INT32(PQsendQuery(conn, sql)); } + } - /* store needed metadata for subsequent calls */ - attinmeta = TupleDescGetAttInMetadata(tupdesc); - funcctx->attinmeta = attinmeta; + if (is_async && !do_get) + { + /* async query send -- should not happen */ + elog(ERROR, "async query send called more than once"); - MemoryContextSwitchTo(oldcontext); } /* stuff done on every call of the function */ @@ -903,6 +983,140 @@ dblink_record(PG_FUNCTION_ARGS) } /* + * List all open dblink connections by name. + * Returns an array of all connection names. + * Takes no params + */ +PG_FUNCTION_INFO_V1(dblink_get_connections); +Datum +dblink_get_connections(PG_FUNCTION_ARGS) +{ + HASH_SEQ_STATUS status; + remoteConnHashEnt *hentry; + ArrayBuildState *astate = NULL; + + if (remoteConnHash) + { + hash_seq_init(&status, remoteConnHash); + while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL) + { + /* stash away current value */ + astate = accumArrayResult(astate, + PointerGetDatum(GET_TEXT(hentry->name)), + false, TEXTOID, CurrentMemoryContext); + } + } + + if (astate) + PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate, + CurrentMemoryContext)); + else + PG_RETURN_NULL(); +} + +/* + * Checks if a given remote connection is busy + * + * Returns 1 if the connection is busy, 0 otherwise + * Params: + * text connection_name - name of the connection to check + * + */ +PG_FUNCTION_INFO_V1(dblink_is_busy); +Datum +dblink_is_busy(PG_FUNCTION_ARGS) +{ + char *msg; + PGconn *conn = NULL; + char *conname = NULL; + char *connstr = NULL; + remoteConn *rconn = NULL; + bool freeconn = false; + + DBLINK_INIT; + DBLINK_GET_CONN; + if (!conn) + DBLINK_CONN_NOT_AVAIL; + + PQconsumeInput(conn); + PG_RETURN_INT32(PQisBusy(conn)); +} + +/* + * Cancels a running request on a connection + * + * Returns text: + * "OK" if the cancel request has been sent correctly, + * an error message otherwise + * + * Params: + * text connection_name - name of the connection to check + * + */ +PG_FUNCTION_INFO_V1(dblink_cancel_query); +Datum +dblink_cancel_query(PG_FUNCTION_ARGS) +{ + char *msg; + int res = 0; + PGconn *conn = NULL; + char *conname = NULL; + char *connstr = NULL; + remoteConn *rconn = NULL; + bool freeconn = false; + PGcancel *cancel; + char errbuf[256]; + + DBLINK_INIT; + DBLINK_GET_CONN; + if (!conn) + DBLINK_CONN_NOT_AVAIL; + cancel = PQgetCancel(conn); + + res = PQcancel(cancel, errbuf, 256); + PQfreeCancel(cancel); + + if (res == 0) + PG_RETURN_TEXT_P(GET_TEXT("OK")); + else + PG_RETURN_TEXT_P(GET_TEXT(errbuf)); +} + + +/* + * Get error message from a connection + * + * Returns text: + * "OK" if no error, an error message otherwise + * + * Params: + * text connection_name - name of the connection to check + * + */ +PG_FUNCTION_INFO_V1(dblink_error_message); +Datum +dblink_error_message(PG_FUNCTION_ARGS) +{ + char *msg; + PGconn *conn = NULL; + char *conname = NULL; + char *connstr = NULL; + remoteConn *rconn = NULL; + bool freeconn = false; + + DBLINK_INIT; + DBLINK_GET_CONN; + if (!conn) + DBLINK_CONN_NOT_AVAIL; + + msg = PQerrorMessage(conn); + if (!msg) + PG_RETURN_TEXT_P(GET_TEXT("OK")); + else + PG_RETURN_TEXT_P(GET_TEXT(msg)); +} + +/* * Execute an SQL non-SELECT command */ PG_FUNCTION_INFO_V1(dblink_exec); |