aboutsummaryrefslogtreecommitdiff
path: root/contrib/dblink/dblink.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/dblink/dblink.c')
-rw-r--r--contrib/dblink/dblink.c404
1 files changed, 309 insertions, 95 deletions
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 3405ddeaa1b..7a46673b6b1 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -8,7 +8,7 @@
* Darko Prenosil <Darko.Prenosil@finteh.hr>
* Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
*
- * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.57 2006/07/11 16:35:31 momjian Exp $
+ * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.58 2006/09/02 21:11:15 joe Exp $
* Copyright (c) 2001-2006, PostgreSQL Global Development Group
* ALL RIGHTS RESERVED;
*
@@ -73,6 +73,7 @@ typedef struct remoteConn
/*
* Internal declarations
*/
+static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get);
static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void);
static void createNewConnection(const char *name, remoteConn * rconn);
@@ -691,6 +692,26 @@ PG_FUNCTION_INFO_V1(dblink_record);
Datum
dblink_record(PG_FUNCTION_ARGS)
{
+ return dblink_record_internal(fcinfo, false, false);
+}
+
+PG_FUNCTION_INFO_V1(dblink_send_query);
+Datum
+dblink_send_query(PG_FUNCTION_ARGS)
+{
+ return dblink_record_internal(fcinfo, true, false);
+}
+
+PG_FUNCTION_INFO_V1(dblink_get_result);
+Datum
+dblink_get_result(PG_FUNCTION_ARGS)
+{
+ return dblink_record_internal(fcinfo, true, true);
+}
+
+static Datum
+dblink_record_internal(FunctionCallInfo fcinfo, bool is_async, bool do_get)
+{
FuncCallContext *funcctx;
TupleDesc tupdesc = NULL;
int call_cntr;
@@ -723,128 +744,187 @@ dblink_record(PG_FUNCTION_ARGS)
*/
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
- if (PG_NARGS() == 3)
+ if (!is_async)
{
- /* text,text,bool */
- DBLINK_GET_CONN;
- sql = GET_STR(PG_GETARG_TEXT_P(1));
- fail = PG_GETARG_BOOL(2);
- }
- else if (PG_NARGS() == 2)
- {
- /* text,text or text,bool */
- if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
+ if (PG_NARGS() == 3)
{
+ /* text,text,bool */
+ DBLINK_GET_CONN;
+ sql = GET_STR(PG_GETARG_TEXT_P(1));
+ fail = PG_GETARG_BOOL(2);
+ }
+ else if (PG_NARGS() == 2)
+ {
+ /* text,text or text,bool */
+ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
+ {
+ conn = pconn->conn;
+ sql = GET_STR(PG_GETARG_TEXT_P(0));
+ fail = PG_GETARG_BOOL(1);
+ }
+ else
+ {
+ DBLINK_GET_CONN;
+ sql = GET_STR(PG_GETARG_TEXT_P(1));
+ }
+ }
+ else if (PG_NARGS() == 1)
+ {
+ /* text */
conn = pconn->conn;
sql = GET_STR(PG_GETARG_TEXT_P(0));
- fail = PG_GETARG_BOOL(1);
}
else
+ /* shouldn't happen */
+ elog(ERROR, "wrong number of arguments");
+ }
+ else if (is_async && do_get)
+ {
+ /* get async result */
+ if (PG_NARGS() == 2)
{
+ /* text,bool */
DBLINK_GET_CONN;
- sql = GET_STR(PG_GETARG_TEXT_P(1));
+ fail = PG_GETARG_BOOL(2);
}
+ else if (PG_NARGS() == 1)
+ {
+ /* text */
+ DBLINK_GET_CONN;
+ }
+ else
+ /* shouldn't happen */
+ elog(ERROR, "wrong number of arguments");
}
- else if (PG_NARGS() == 1)
+ else
{
- /* text */
- conn = pconn->conn;
- sql = GET_STR(PG_GETARG_TEXT_P(0));
+ /* send async query */
+ if (PG_NARGS() == 2)
+ {
+ DBLINK_GET_CONN;
+ sql = GET_STR(PG_GETARG_TEXT_P(1));
+ }
+ else
+ /* shouldn't happen */
+ elog(ERROR, "wrong number of arguments");
}
- else
- /* shouldn't happen */
- elog(ERROR, "wrong number of arguments");
if (!conn)
DBLINK_CONN_NOT_AVAIL;
- res = PQexec(conn, sql);
- if (!res ||
- (PQresultStatus(res) != PGRES_COMMAND_OK &&
- PQresultStatus(res) != PGRES_TUPLES_OK))
+ if (!is_async || (is_async && do_get))
{
- if (fail)
- DBLINK_RES_ERROR("sql error");
+ /* synchronous query, or async result retrieval */
+ if (!is_async)
+ res = PQexec(conn, sql);
else
{
- DBLINK_RES_ERROR_AS_NOTICE("sql error");
- if (freeconn)
- PQfinish(conn);
- SRF_RETURN_DONE(funcctx);
+ res = PQgetResult(conn);
+ /* NULL means we're all done with the async results */
+ if (!res)
+ SRF_RETURN_DONE(funcctx);
}
- }
- if (PQresultStatus(res) == PGRES_COMMAND_OK)
- {
- is_sql_cmd = true;
-
- /* need a tuple descriptor representing one TEXT column */
- tupdesc = CreateTemplateTupleDesc(1, false);
- TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
- TEXTOID, -1, 0);
-
- /*
- * and save a copy of the command status string to return as our
- * result tuple
- */
- sql_cmd_status = PQcmdStatus(res);
- funcctx->max_calls = 1;
- }
- else
- funcctx->max_calls = PQntuples(res);
-
- /* got results, keep track of them */
- funcctx->user_fctx = res;
-
- /* if needed, close the connection to the database and cleanup */
- if (freeconn)
- PQfinish(conn);
-
- if (!is_sql_cmd)
- {
- /* get a tuple descriptor for our result type */
- switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+ if (!res ||
+ (PQresultStatus(res) != PGRES_COMMAND_OK &&
+ PQresultStatus(res) != PGRES_TUPLES_OK))
{
- case TYPEFUNC_COMPOSITE:
- /* success */
- break;
- case TYPEFUNC_RECORD:
- /* failed to determine actual type of RECORD */
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("function returning record called in context "
- "that cannot accept type record")));
- break;
- default:
- /* result type isn't composite */
- elog(ERROR, "return type must be a row type");
- break;
+ if (fail)
+ DBLINK_RES_ERROR("sql error");
+ else
+ {
+ DBLINK_RES_ERROR_AS_NOTICE("sql error");
+ if (freeconn)
+ PQfinish(conn);
+ SRF_RETURN_DONE(funcctx);
+ }
}
-
- /* make sure we have a persistent copy of the tupdesc */
- tupdesc = CreateTupleDescCopy(tupdesc);
+
+ if (PQresultStatus(res) == PGRES_COMMAND_OK)
+ {
+ is_sql_cmd = true;
+
+ /* need a tuple descriptor representing one TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1, false);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+ TEXTOID, -1, 0);
+
+ /*
+ * and save a copy of the command status string to return as our
+ * result tuple
+ */
+ sql_cmd_status = PQcmdStatus(res);
+ funcctx->max_calls = 1;
+ }
+ else
+ funcctx->max_calls = PQntuples(res);
+
+ /* got results, keep track of them */
+ funcctx->user_fctx = res;
+
+ /* if needed, close the connection to the database and cleanup */
+ if (freeconn)
+ PQfinish(conn);
+
+ if (!is_sql_cmd)
+ {
+ /* get a tuple descriptor for our result type */
+ switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+ {
+ case TYPEFUNC_COMPOSITE:
+ /* success */
+ break;
+ case TYPEFUNC_RECORD:
+ /* failed to determine actual type of RECORD */
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("function returning record called in context "
+ "that cannot accept type record")));
+ break;
+ default:
+ /* result type isn't composite */
+ elog(ERROR, "return type must be a row type");
+ break;
+ }
+
+ /* make sure we have a persistent copy of the tupdesc */
+ tupdesc = CreateTupleDescCopy(tupdesc);
+ }
+
+ /* check result and tuple descriptor have the same number of columns */
+ if (PQnfields(res) != tupdesc->natts)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("remote query result rowtype does not match "
+ "the specified FROM clause rowtype")));
+
+ /* fast track when no results */
+ if (funcctx->max_calls < 1)
+ {
+ if (res)
+ PQclear(res);
+ SRF_RETURN_DONE(funcctx);
+ }
+
+ /* store needed metadata for subsequent calls */
+ attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ funcctx->attinmeta = attinmeta;
+
+ MemoryContextSwitchTo(oldcontext);
}
-
- /* check result and tuple descriptor have the same number of columns */
- if (PQnfields(res) != tupdesc->natts)
- ereport(ERROR,
- (errcode(ERRCODE_DATATYPE_MISMATCH),
- errmsg("remote query result rowtype does not match "
- "the specified FROM clause rowtype")));
-
- /* fast track when no results */
- if (funcctx->max_calls < 1)
+ else
{
- if (res)
- PQclear(res);
- SRF_RETURN_DONE(funcctx);
+ /* async query send */
+ MemoryContextSwitchTo(oldcontext);
+ PG_RETURN_INT32(PQsendQuery(conn, sql));
}
+ }
- /* store needed metadata for subsequent calls */
- attinmeta = TupleDescGetAttInMetadata(tupdesc);
- funcctx->attinmeta = attinmeta;
+ if (is_async && !do_get)
+ {
+ /* async query send -- should not happen */
+ elog(ERROR, "async query send called more than once");
- MemoryContextSwitchTo(oldcontext);
}
/* stuff done on every call of the function */
@@ -903,6 +983,140 @@ dblink_record(PG_FUNCTION_ARGS)
}
/*
+ * List all open dblink connections by name.
+ * Returns an array of all connection names.
+ * Takes no params
+ */
+PG_FUNCTION_INFO_V1(dblink_get_connections);
+Datum
+dblink_get_connections(PG_FUNCTION_ARGS)
+{
+ HASH_SEQ_STATUS status;
+ remoteConnHashEnt *hentry;
+ ArrayBuildState *astate = NULL;
+
+ if (remoteConnHash)
+ {
+ hash_seq_init(&status, remoteConnHash);
+ while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
+ {
+ /* stash away current value */
+ astate = accumArrayResult(astate,
+ PointerGetDatum(GET_TEXT(hentry->name)),
+ false, TEXTOID, CurrentMemoryContext);
+ }
+ }
+
+ if (astate)
+ PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
+ CurrentMemoryContext));
+ else
+ PG_RETURN_NULL();
+}
+
+/*
+ * Checks if a given remote connection is busy
+ *
+ * Returns 1 if the connection is busy, 0 otherwise
+ * Params:
+ * text connection_name - name of the connection to check
+ *
+ */
+PG_FUNCTION_INFO_V1(dblink_is_busy);
+Datum
+dblink_is_busy(PG_FUNCTION_ARGS)
+{
+ char *msg;
+ PGconn *conn = NULL;
+ char *conname = NULL;
+ char *connstr = NULL;
+ remoteConn *rconn = NULL;
+ bool freeconn = false;
+
+ DBLINK_INIT;
+ DBLINK_GET_CONN;
+ if (!conn)
+ DBLINK_CONN_NOT_AVAIL;
+
+ PQconsumeInput(conn);
+ PG_RETURN_INT32(PQisBusy(conn));
+}
+
+/*
+ * Cancels a running request on a connection
+ *
+ * Returns text:
+ * "OK" if the cancel request has been sent correctly,
+ * an error message otherwise
+ *
+ * Params:
+ * text connection_name - name of the connection to check
+ *
+ */
+PG_FUNCTION_INFO_V1(dblink_cancel_query);
+Datum
+dblink_cancel_query(PG_FUNCTION_ARGS)
+{
+ char *msg;
+ int res = 0;
+ PGconn *conn = NULL;
+ char *conname = NULL;
+ char *connstr = NULL;
+ remoteConn *rconn = NULL;
+ bool freeconn = false;
+ PGcancel *cancel;
+ char errbuf[256];
+
+ DBLINK_INIT;
+ DBLINK_GET_CONN;
+ if (!conn)
+ DBLINK_CONN_NOT_AVAIL;
+ cancel = PQgetCancel(conn);
+
+ res = PQcancel(cancel, errbuf, 256);
+ PQfreeCancel(cancel);
+
+ if (res == 0)
+ PG_RETURN_TEXT_P(GET_TEXT("OK"));
+ else
+ PG_RETURN_TEXT_P(GET_TEXT(errbuf));
+}
+
+
+/*
+ * Get error message from a connection
+ *
+ * Returns text:
+ * "OK" if no error, an error message otherwise
+ *
+ * Params:
+ * text connection_name - name of the connection to check
+ *
+ */
+PG_FUNCTION_INFO_V1(dblink_error_message);
+Datum
+dblink_error_message(PG_FUNCTION_ARGS)
+{
+ char *msg;
+ PGconn *conn = NULL;
+ char *conname = NULL;
+ char *connstr = NULL;
+ remoteConn *rconn = NULL;
+ bool freeconn = false;
+
+ DBLINK_INIT;
+ DBLINK_GET_CONN;
+ if (!conn)
+ DBLINK_CONN_NOT_AVAIL;
+
+ msg = PQerrorMessage(conn);
+ if (!msg)
+ PG_RETURN_TEXT_P(GET_TEXT("OK"));
+ else
+ PG_RETURN_TEXT_P(GET_TEXT(msg));
+}
+
+/*
* Execute an SQL non-SELECT command
*/
PG_FUNCTION_INFO_V1(dblink_exec);